Invoke a Step Functions workflow from a DynamoDB stream
AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Template to launch StepFunctions with DynamoDB Stream (uksb-1tthgi812) (tag:eventbridge-pipes-ddbstream-sfn)
Parameters:
NationalTeam:
Description: National Team Name
Type: String
Default: Argentina
Resources:
# DynamoDB Table Creation with Stream Enabled
DynamoDBWCTable:
Type: AWS::DynamoDB::Table
Properties:
TableName: WorldCupTable
AttributeDefinitions:
- AttributeName: PlayerName
AttributeType: S
KeySchema:
- AttributeName: PlayerName
KeyType: HASH
ProvisionedThroughput:
ReadCapacityUnits: 1
WriteCapacityUnits: 1
StreamSpecification:
StreamViewType: NEW_AND_OLD_IMAGES
#Logs for StepFunctions
TargetStateMachineLogGroup:
Type: AWS::Logs::LogGroup
Properties:
RetentionInDays: 7
LogGroupName: ddb-stream-pipes-sf/StateMachine
#Execution Role for StepFunctions
TargetStateMachineRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- states.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: CloudWatchLogs
PolicyDocument:
Version: '2012-10-17'
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogDelivery'
- 'logs:GetLogDelivery'
- 'logs:UpdateLogDelivery'
- 'logs:DeleteLogDelivery'
- 'logs:ListLogDeliveries'
- 'logs:PutResourcePolicy'
- 'logs:DescribeResourcePolicies'
- 'logs:DescribeLogGroups'
Resource: '*'
#Target StepFunction state machine
TargetStateMachine:
Type: AWS::Serverless::StateMachine
Properties:
Type: EXPRESS
DefinitionUri: workflow/ddb-pipes-sfn.asl.json
Logging:
Destinations:
- CloudWatchLogsLogGroup:
LogGroupArn: !GetAtt TargetStateMachineLogGroup.Arn
Level: ALL
IncludeExecutionData: true
Role: !GetAtt TargetStateMachineRole.Arn
# Role for EventBridge Pipes to read from DynamoDB Stream and launch SFN
EventBridgePipesRole:
Type: AWS::IAM::Role
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- pipes.amazonaws.com
Action:
- sts:AssumeRole
Policies:
- PolicyName: CloudWatchLogs
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'logs:CreateLogGroup'
- 'logs:CreateLogStream'
- 'logs:PutLogEvents'
Resource: '*'
- PolicyName: SourcePolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "dynamodb:DescribeStream"
- "dynamodb:GetRecords"
- "dynamodb:GetShardIterator"
- "dynamodb:ListStreams"
Resource: !GetAtt DynamoDBWCTable.StreamArn
- PolicyName: ExecuteSFN
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- 'states:StartExecution'
Resource: !Ref TargetStateMachine
# Pipes definition
DDBStreamToSFN:
Type: AWS::Pipes::Pipe
Properties:
Name: 'DDbStreamToSfnPipe'
RoleArn: !GetAtt EventBridgePipesRole.Arn
Source: !GetAtt DynamoDBWCTable.StreamArn
SourceParameters:
FilterCriteria:
Filters:
- Pattern: !Sub '{"dynamodb": {"NewImage": {"Nationality": {"S": [{"prefix": "${NationalTeam}"}]}}}}'
DynamoDBStreamParameters:
StartingPosition: LATEST
BatchSize: 1
Target: !Ref TargetStateMachine
TargetParameters:
StepFunctionStateMachineParameters:
InvocationType: FIRE_AND_FORGET
Outputs:
DynamoDBSourceTableName:
Description: 'DynamoDB Table Name'
Value:
Ref: DynamoDBWCTable
SFNLog:
Description: 'StepFunctions LogGroup Name'
Value: !Ref TargetStateMachineLogGroup
Visit the GitHub repo for this pattern.
git clone https://github.com/aws-samples/serverless-patterns/ cd serverless-patterns/eventbridge-pipes-ddbstream-sfn
sam deploy --guided