AWS Step Functions
Service d'orchestration visuel permettant de construire des workflows sans serveur, de coordonner plusieurs services AWS et de simplifier la création d'applications distribuées.
Qu'est-ce que AWS Step Functions ?
Imaginez que vous dirigez une entreprise qui traite des commandes en ligne. Pour chaque commande, plusieurs étapes doivent se dérouler dans un ordre précis : validation des informations, traitement du paiement, vérification du stock, expédition du produit, et enfin envoi d'une confirmation au client.
AWS Step Functions est comme un chef d'orchestre qui coordonne toute cette séquence. Il s'assure que chaque étape est effectuée au bon moment, dans le bon ordre, et qu'en cas de problème à une étape (par exemple, le paiement échoue), des actions appropriées sont prises (comme informer le client).
Pourquoi utiliser Step Functions ?
Simplicité visuelle
Step Functions propose une interface visuelle qui permet de voir exactement comment les différentes étapes s'enchaînent, facilitant la compréhension et la gestion des processus complexes.
Automatisation
Une fois configuré, le workflow s'exécute automatiquement sans intervention humaine, sauf si vous l'avez spécifiquement prévu à certaines étapes.
En résumé, Step Functions est un service qui permet d'automatiser et de visualiser des séquences d'opérations complexes dans le cloud, en s'assurant que chaque action est exécutée au bon moment et dans le bon ordre, avec la possibilité de gérer les erreurs et les exceptions de manière élégante.
Fonctionnement technique
AWS Step Functions est un service d'orchestration sans serveur qui permet de coordonner plusieurs services AWS à l'aide de workflows visuels appelés "machines d'état". Basé sur la logique des machines à états finis, il offre un moyen puissant et flexible de construire des applications distribuées.
Concepts fondamentaux
Machines d'état
Une machine d'état est la définition d'un workflow, décrite en JSON utilisant Amazon States Language (ASL). Elle définit une séquence d'étapes, la logique de branchement, les conditions, les gestions d'erreur et les transitions entre états.
- Types de machines d'état : Standard (exécution pouvant durer jusqu'à 1 an) et Express (exécution rapide, jusqu'à 5 minutes)
- Immutabilité : Une fois créées, les définitions des machines d'état ne changent pas pendant l'exécution
- Visualisation graphique : Interface visuelle pour concevoir et suivre les workflows
- Historique d'exécution : Suivi détaillé de chaque étape pour les machines Standard
États (States)
Les états sont les éléments fondamentaux d'une machine d'état, chacun représentant une étape du workflow. Step Functions propose plusieurs types d'états pour différentes opérations.
- Task : Exécute une tâche (Lambda, une action API AWS, etc.)
- Choice : Implémente une logique de branchement conditionnel
- Parallel : Exécute des branches en parallèle
- Map : Exécute le même ensemble d'étapes pour chaque élément d'un tableau
- Wait : Pause l'exécution pendant une durée spécifiée
- Succeed/Fail : Termine l'exécution avec succès ou échec
- Pass : Transmet son entrée à sa sortie, avec transformation optionnelle
Intégrations natives
Step Functions s'intègre nativement à de nombreux services AWS, permettant de les appeler directement sans code intermédiaire.
- Compute :
AWS Lambda,
Amazon ECS, AWS Batch, AWS Fargate
- Application Integration :
Amazon SQS,
Amazon SNS,
Amazon EventBridge
- Database :
Amazon DynamoDB
- Analytics : Amazon EMR,
Amazon Athena,
AWS Glue
- Machine Learning : Amazon SageMaker
- Autres services :
Amazon API Gateway,
AWS Step Functions (appels imbriqués)
Patterns d'intégration
Step Functions propose plusieurs patterns pour s'intégrer avec d'autres services AWS.
- Request Response : Appel synchrone attendant une réponse immédiate
- Run a Job : Démarrage d'une tâche et attente de sa complétion
- Wait for Callback with Task Token : Pattern asynchrone où Step Functions attend un callback
Gestion des erreurs et nouvelles tentatives
Step Functions offre des mécanismes robustes pour gérer les erreurs et les exceptions dans les workflows.
- Retry : Configuration de stratégies de nouvelle tentative avec intervalle exponentiel
- Catch : Définition d'états alternatifs à exécuter en cas d'erreur
- Types d'erreurs prédéfinis : States.ALL, States.Timeout, States.TaskFailed, etc.
- Erreurs personnalisées : Définition et gestion d'erreurs spécifiques à l'application
Implémentation technique
Voyons maintenant comment implémenter et utiliser AWS Step Functions dans différents scénarios.
1. Création d'une machine d'état basique
Voici comment définir et créer une machine d'état pour le traitement de commandes :
// Définition d'une machine d'état Step Functions simple avec SDK AWS v3
const { SFNClient, CreateStateMachineCommand } = require("@aws-sdk/client-sfn");
const client = new SFNClient({ region: "eu-west-1" });
async function createBasicStateMachine(stateMachineName, lambdaFunctionArn, roleArn) {
// Définition de la machine d'état en utilisant Amazon States Language (ASL)
const definition = {
Comment: "Un workflow simple de traitement de commande",
StartAt: "ProcessOrder",
States: {
"ProcessOrder": {
Type: "Task",
Resource: lambdaFunctionArn,
Retry: [
{
ErrorEquals: ["States.ALL"],
IntervalSeconds: 1,
MaxAttempts: 3,
BackoffRate: 2.0
}
],
Catch: [
{
ErrorEquals: ["States.ALL"],
ResultPath: "$.error",
Next: "HandleError"
}
],
Next: "CheckInventory"
},
"CheckInventory": {
Type: "Choice",
Choices: [
{
Variable: "$.availableInStock",
BooleanEquals: true,
Next: "ShipOrder"
}
],
Default: "NotifyOutOfStock"
},
"ShipOrder": {
Type: "Task",
Resource: lambdaFunctionArn,
Parameters: {
action: "ship",
"orderId.$": "$.orderId"
},
End: true
},
"NotifyOutOfStock": {
Type: "Task",
Resource: lambdaFunctionArn,
Parameters: {
action: "notify",
"orderId.$": "$.orderId",
message: "Item is out of stock"
},
End: true
},
"HandleError": {
Type: "Task",
Resource: lambdaFunctionArn,
Parameters: {
action: "logError",
"error.$": "$.error"
},
End: true
}
}
};
try {
const command = new CreateStateMachineCommand({
name: stateMachineName,
definition: JSON.stringify(definition),
roleArn: roleArn,
type: "STANDARD", // STANDARD ou EXPRESS
loggingConfiguration: {
level: "ALL", // ALL, ERROR, FATAL, OFF
includeExecutionData: true
},
tags: [
{
key: "Environment",
value: "Production"
},
{
key: "Application",
value: "OrderProcessing"
}
]
});
const response = await client.send(command);
console.log("Machine d'état créée avec succès:", response);
return response;
} catch (error) {
console.error("Erreur lors de la création de la machine d'état:", error);
throw error;
}
}
2. Démarrage d'une exécution
Comment démarrer une exécution de machine d'état avec des données d'entrée :
// Démarrage d'une exécution de machine d'état avec SDK AWS v3
const { SFNClient, StartExecutionCommand } = require("@aws-sdk/client-sfn");
const client = new SFNClient({ region: "eu-west-1" });
async function startStateMachineExecution(stateMachineArn, orderId) {
// Données d'entrée pour l'exécution
const input = {
orderId: orderId,
orderDate: new Date().toISOString(),
customer: {
id: "CUST-12345",
name: "Jean Dupont",
email: "jean.dupont@example.com"
},
items: [
{
id: "PROD-001",
name: "Écran 27 pouces",
quantity: 2,
price: 249.99
},
{
id: "PROD-002",
name: "Clavier mécanique",
quantity: 1,
price: 129.95
}
],
totalAmount: 629.93,
availableInStock: true
};
try {
const command = new StartExecutionCommand({
stateMachineArn: stateMachineArn,
name: `Order-${orderId}-${Date.now()}`, // Nom unique pour l'exécution
input: JSON.stringify(input)
});
const response = await client.send(command);
console.log("Exécution démarrée avec succès:", response);
return response;
} catch (error) {
console.error("Erreur lors du démarrage de l'exécution:", error);
throw error;
}
}
3. Traitement parallèle avec machine d'état Express
Configuration d'une machine d'état Express pour le traitement d'images en parallèle :
// Exemple de machine d'état express asynchrone pour traitement parallèle
const { SFNClient, CreateStateMachineCommand } = require("@aws-sdk/client-sfn");
const client = new SFNClient({ region: "eu-west-1" });
async function createImageProcessingWorkflow(stateMachineName, roleArn) {
// Services fictifs - à remplacer par des ARN réels
const lambdaMetadataExtractorArn = "arn:aws:lambda:eu-west-1:123456789012:function:extractImageMetadata";
const lambdaResizeArn = "arn:aws:lambda:eu-west-1:123456789012:function:resizeImage";
const lambdaApplyFiltersArn = "arn:aws:lambda:eu-west-1:123456789012:function:applyImageFilters";
const lambdaFacialRecognitionArn = "arn:aws:lambda:eu-west-1:123456789012:function:detectFaces";
const snsThumbnailsCreatedArn = "arn:aws:sns:eu-west-1:123456789012:thumbnail-creation-complete";
// Définition de la machine d'état avec traitement parallèle
const definition = {
Comment: "Workflow de traitement d'images parallèle",
StartAt: "ExtractMetadata",
States: {
"ExtractMetadata": {
Type: "Task",
Resource: lambdaMetadataExtractorArn,
Next: "ParallelProcessing"
},
"ParallelProcessing": {
Type: "Parallel",
Branches: [
// Branche 1: Redimensionnement pour différentes tailles
{
StartAt: "CreateThumbnails",
States: {
"CreateThumbnails": {
Type: "Map",
ItemsPath: "$.sizes",
Parameters: {
"imageId.$": "$.imageId",
"bucketName.$": "$.bucketName",
"size.$": "$$.Map.Item.Value"
},
Iterator: {
StartAt: "ResizeImage",
States: {
"ResizeImage": {
Type: "Task",
Resource: lambdaResizeArn,
End: true
}
}
},
ResultPath: "$.thumbnails",
Next: "NotifyThumbnailsCreated"
},
"NotifyThumbnailsCreated": {
Type: "Task",
Resource: "arn:aws:states:::sns:publish",
Parameters: {
"TopicArn": snsThumbnailsCreatedArn,
"Message": {
"imageId.$": "$.imageId",
"thumbnailCount.$": "$.thumbnails..size | length"
}
},
End: true
}
}
},
// Branche 2: Appliquer des filtres
{
StartAt: "ApplyFilters",
States: {
"ApplyFilters": {
Type: "Task",
Resource: lambdaApplyFiltersArn,
End: true
}
}
},
// Branche 3: Reconnaissance faciale
{
StartAt: "DetectFaces",
States: {
"DetectFaces": {
Type: "Task",
Resource: lambdaFacialRecognitionArn,
End: true
}
}
}
],
OutputPath: "$[0]", // Utilisez uniquement la sortie de la première branche
Next: "FinalizeProcessing"
},
"FinalizeProcessing": {
Type: "Task",
Resource: "arn:aws:lambda:eu-west-1:123456789012:function:finalizeImageProcessing",
End: true
}
}
};
try {
const command = new CreateStateMachineCommand({
name: stateMachineName,
definition: JSON.stringify(definition),
roleArn: roleArn,
type: "EXPRESS", // Type EXPRESS pour des exécutions rapides à volume élevé
loggingConfiguration: {
level: "ALL",
includeExecutionData: true
}
});
const response = await client.send(command);
console.log("Machine d'état express créée avec succès:", response);
return response;
} catch (error) {
console.error("Erreur lors de la création de la machine d'état express:", error);
throw error;
}
}
4. Infrastructure complète avec AWS CDK
Définition d'une machine d'état et de son infrastructure avec AWS CDK :
// Définition d'une machine d'état avec AWS CDK (TypeScript)
import * as cdk from 'aws-cdk-lib';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as tasks from 'aws-cdk-lib/aws-stepfunctions-tasks';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as s3 from 'aws-cdk-lib/aws-s3';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as sns from 'aws-cdk-lib/aws-sns';
import * as subs from 'aws-cdk-lib/aws-sns-subscriptions';
import { Construct } from 'constructs';
export class OrderProcessingWorkflowStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Table DynamoDB pour stocker les commandes
const ordersTable = new dynamodb.Table(this, 'OrdersTable', {
partitionKey: { name: 'orderId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
removalPolicy: cdk.RemovalPolicy.DESTROY // Uniquement pour les environnements de développement
});
// Bucket S3 pour les reçus
const receiptsBucket = new s3.Bucket(this, 'ReceiptsBucket', {
removalPolicy: cdk.RemovalPolicy.DESTROY, // Uniquement pour les environnements de développement
autoDeleteObjects: true // Uniquement pour les environnements de développement
});
// SNS Topic pour les notifications
const orderNotificationTopic = new sns.Topic(this, 'OrderNotifications');
// Ajouter un abonnement email au topic SNS
orderNotificationTopic.addSubscription(
new subs.EmailSubscription('orders-notification@example.com')
);
// Fonctions Lambda pour les différentes étapes
const validateOrderFn = new lambda.Function(this, 'ValidateOrderFunction', {
runtime: lambda.Runtime.NODEJS_16_X,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/validate-order'),
environment: {
ORDERS_TABLE: ordersTable.tableName
}
});
const processPaymentFn = new lambda.Function(this, 'ProcessPaymentFunction', {
runtime: lambda.Runtime.NODEJS_16_X,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/process-payment')
});
const updateInventoryFn = new lambda.Function(this, 'UpdateInventoryFunction', {
runtime: lambda.Runtime.NODEJS_16_X,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/update-inventory')
});
const generateReceiptFn = new lambda.Function(this, 'GenerateReceiptFunction', {
runtime: lambda.Runtime.NODEJS_16_X,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/generate-receipt'),
environment: {
RECEIPTS_BUCKET: receiptsBucket.bucketName
}
});
// Accorder les permissions nécessaires
ordersTable.grantReadWriteData(validateOrderFn);
receiptsBucket.grantWrite(generateReceiptFn);
orderNotificationTopic.grantPublish(generateReceiptFn);
// Définition des tâches Step Functions
const validateOrder = new tasks.LambdaInvoke(this, 'Validate Order', {
lambdaFunction: validateOrderFn,
outputPath: '$.Payload'
});
const processPayment = new tasks.LambdaInvoke(this, 'Process Payment', {
lambdaFunction: processPaymentFn,
outputPath: '$.Payload'
});
const updateInventory = new tasks.LambdaInvoke(this, 'Update Inventory', {
lambdaFunction: updateInventoryFn,
outputPath: '$.Payload'
});
const generateReceipt = new tasks.LambdaInvoke(this, 'Generate Receipt', {
lambdaFunction: generateReceiptFn,
outputPath: '$.Payload'
});
const sendNotification = new tasks.SnsPublish(this, 'Send Notification', {
topic: orderNotificationTopic,
message: sfn.TaskInput.fromJsonPathAt('$.notificationMessage'),
resultPath: '$.notificationResult'
});
// États pour la gestion des erreurs
const handleError = new sfn.Pass(this, 'Handle Error', {
parameters: {
'status': 'FAILED',
'errorMessage.$': '$.errorInfo'
}
});
// État de réussite
const orderSucceeded = new sfn.Pass(this, 'Order Succeeded', {
parameters: {
'status': 'SUCCEEDED',
'orderId.$': '$.orderId',
'message': 'Order has been successfully processed'
}
});
// Définition du workflow
const definition = validateOrder
.next(new sfn.Choice(this, 'Is Order Valid?')
.when(sfn.Condition.booleanEquals('$.valid', true), processPayment
.next(updateInventory)
.next(generateReceipt)
.next(sendNotification)
.next(orderSucceeded))
.otherwise(new sfn.Pass(this, 'Invalid Order', {
parameters: {
'status': 'REJECTED',
'reason.$': '$.reason'
}
}))
);
// Créer la machine d'état
const stateMachine = new sfn.StateMachine(this, 'OrderProcessingStateMachine', {
definition,
timeout: cdk.Duration.minutes(5),
tracingEnabled: true,
stateMachineType: sfn.StateMachineType.STANDARD
});
// Ajouter une configuration de nouvelle tentative pour l'état de traitement du paiement
(processPayment.next as tasks.LambdaInvoke).addRetry({
maxAttempts: 3,
interval: cdk.Duration.seconds(2),
backoffRate: 2,
errors: ['ServiceUnavailable', 'ThrottlingException']
});
// Ajouter un gestionnaire d'erreur global pour la machine d'état
stateMachine.addCatch(handleError, {
errors: ['States.ALL'],
resultPath: '$.errorInfo'
});
// Sorties
new cdk.CfnOutput(this, 'StateMachineArn', {
value: stateMachine.stateMachineArn
});
new cdk.CfnOutput(this, 'OrdersTableName', {
value: ordersTable.tableName
});
new cdk.CfnOutput(this, 'ReceiptsBucketName', {
value: receiptsBucket.bucketName
});
}
}
5. Workflow avec approbation humaine
Implémentation d'un workflow d'approbation de document avec intervention humaine :
// Script pour créer une machine d'état Step Functions via AWS SDK
// Exemple d'un processus d'approbation avec tâches humaines
const { SFNClient, CreateStateMachineCommand } = require("@aws-sdk/client-sfn");
const client = new SFNClient({ region: "eu-west-1" });
async function createApprovalWorkflow(stateMachineName, roleArn) {
// Définition de la machine d'état avec une tâche d'approbation humaine
const definition = {
Comment: "Workflow d'approbation de document avec intervention humaine",
StartAt: "ValidateDocument",
States: {
"ValidateDocument": {
Type: "Task",
Resource: "arn:aws:lambda:eu-west-1:123456789012:function:validateDocument",
Next: "IsDocumentValid"
},
"IsDocumentValid": {
Type: "Choice",
Choices: [
{
Variable: "$.isValid",
BooleanEquals: true,
Next: "RequestManagerApproval"
}
],
Default: "RejectDocument"
},
"RequestManagerApproval": {
Type: "Task",
Resource: "arn:aws:states:::lambda:invoke.waitForTaskToken",
Parameters: {
"FunctionName": "arn:aws:lambda:eu-west-1:123456789012:function:sendApprovalRequest",
"Payload": {
"documentId.$": "$.documentId",
"documentType.$": "$.documentType",
"documentUrl.$": "$.documentUrl",
"approverEmail.$": "$.approverEmail",
"taskToken.$": "$$.Task.Token"
}
},
Next: "ProcessApprovalResult"
},
"ProcessApprovalResult": {
Type: "Choice",
Choices: [
{
Variable: "$.approved",
BooleanEquals: true,
Next: "PublishDocument"
}
],
Default: "SendRejectionNotification"
},
"PublishDocument": {
Type: "Parallel",
Branches": [
{
"StartAt": "UpdateDatabase",
"States": {
"UpdateDatabase": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456789012:function:updateDocumentDatabase",
"End": true
}
}
},
{
"StartAt": "GenerateAuditRecord",
"States": {
"GenerateAuditRecord": {
"Type": "Task",
"Resource": "arn:aws:lambda:eu-west-1:123456789012:function:generateAuditRecord",
"End": true
}
}
}
],
Next: "SendApprovalNotification"
},
"SendApprovalNotification": {
Type: "Task",
Resource: "arn:aws:states:::sns:publish",
Parameters: {
"TopicArn": "arn:aws:sns:eu-west-1:123456789012:document-approvals",
"Message": {
"documentId.$": "$.documentId",
"status": "APPROVED",
"message": "Your document has been approved and published"
}
},
End: true
},
"SendRejectionNotification": {
Type: "Task",
Resource: "arn:aws:states:::sns:publish",
Parameters: {
"TopicArn": "arn:aws:sns:eu-west-1:123456789012:document-approvals",
"Message": {
"documentId.$": "$.documentId",
"status": "REJECTED",
"message": "Your document has been rejected",
"reason.$": "$.rejectionReason"
}
},
End: true
},
"RejectDocument": {
Type: "Task",
Resource": "arn:aws:lambda:eu-west-1:123456789012:function:handleInvalidDocument",
Next: "SendRejectionNotification"
}
}
};
try {
const command = new CreateStateMachineCommand({
name: stateMachineName,
definition: JSON.stringify(definition),
roleArn: roleArn,
type: "STANDARD"
});
const response = await client.send(command);
console.log("Machine d'état d'approbation créée avec succès:", response);
return response;
} catch (error) {
console.error("Erreur lors de la création de la machine d'état d'approbation:", error);
throw error;
}
}
Bonnes pratiques
- Idempotence : Concevez vos états pour être idempotents afin de gérer les exécutions répétées
- Gestionnaires d'erreurs : Ajoutez toujours des stratégies Retry et Catch pour une résilience robuste
- Limiter les données : Minimisez la taille des données entre états pour éviter les limitations
- Granularité : Préférez plusieurs petites machines d'état plutôt qu'une seule très complexe
- InputPath et OutputPath : Utilisez ces filtres pour contrôler le flux de données entre états
- Express vs Standard : Choisissez le type adapté selon la durée et le volume d'exécutions
- Monitoring : Configurez des alarmes CloudWatch pour surveiller les exécutions
- Suivre l'historique d'exécution : Activez la journalisation pour les diagnostics et l'audit
Cas d'usage
Traitement des commandes e-commerce
Orchestration du processus complet de commande, depuis la validation initiale jusqu'à l'expédition, en passant par le traitement des paiements et la gestion des stocks. Step Functions permet de gérer de manière robuste les cas d'échec de paiement, de rupture de stock et d'autres exceptions.
Pipelines de traitement de médias
Création de workflows d'ingestion, de transformation et de distribution de contenus médias. Par exemple, un pipeline qui reçoit une vidéo, la transcrit, la traduit en plusieurs langues, génère des sous-titres, redimensionne pour différentes résolutions, et distribue les résultats sur plusieurs plateformes.
Processus d'approbation avec intervention humaine
Workflows qui incluent des étapes nécessitant une intervention humaine, comme l'approbation de prêts, la validation de contenu, ou les examens de conformité. Step Functions suspend l'exécution jusqu'à réception d'un callback, permettant à un humain d'examiner les données et de prendre une décision.
ETL et pipelines de données
Orchestration de workflows ETL complexes qui extraient, transforment et chargent des données depuis diverses sources vers des entrepôts de données ou des lacs de données. Step Functions coordonne les différentes étapes du traitement, gère les dépendances et fournit une visibilité sur l'avancement global.
Applications par industrie
Services financiers
Les institutions financières utilisent Step Functions pour automatiser des processus comme le traitement des demandes de prêt, les vérifications KYC (Know Your Customer), la détection de fraude, et les rapports réglementaires. Ces workflows nécessitent souvent l'orchestration de plusieurs microservices et des étapes d'approbation humaine.
Soins de santé et sciences de la vie
Dans le secteur de la santé, Step Functions orchestre des workflows de traitement de données médicales, d'analyse génomique et de gestion de dossiers patients. Il permet d'intégrer des analyses algorithmiques avec des révisions médicales humaines lorsque nécessaire.
Intelligence Artificielle et Machine Learning
Step Functions est largement utilisé pour orchestrer des pipelines de ML complexes, en coordonnant des étapes comme la préparation des données, l'entraînement des modèles, l'évaluation, et le déploiement. Il intègre naturellement les tâches SageMaker et gère les flux conditionnels basés sur les métriques du modèle.
Ressources complémentaires
Toutes les compétencesDocumentation officielle
Guide du développeur complet pour AWS Step Functions avec tutoriels détaillés
Spécification ASL
Spécification complète d'Amazon States Language pour définir les machines d'état
Blog AWS Step Functions
Articles techniques et études de cas sur l'utilisation avancée de Step Functions