Select your cookie preferences

We use cookies and similar tools to enhance your experience, provide our services, deliver relevant advertising, and make improvements. Approved third parties also use these tools to help us deliver advertising and provide certain site features.

DynamoDB Stream to Step Functions

Created with SnapAmazon DynamoDBAmazon EventBridge PipeAWS Step FunctionsStreamTrigger workflow

Invoke a Step Functions workflow from a DynamoDB stream

This sample project demonstrates how to invoke an AWS Step Functions state machine from DynamoDB Stream without using Lambda.
This pattern deploys one Step Functions workflow, one DynamoDB table with a stream enabled and one EventBridge Pipe connection.

AWSTemplateFormatVersion: '2010-09-09'
Transform: AWS::Serverless-2016-10-31
Description: Template to launch StepFunctions with DynamoDB Stream (uksb-1tthgi812) (tag:eventbridge-pipes-ddbstream-sfn)

Parameters:
  NationalTeam:
    Description: National Team Name
    Type: String
    Default: Argentina

Resources:
  # DynamoDB Table Creation with Stream Enabled
  DynamoDBWCTable:
    Type: AWS::DynamoDB::Table
    Properties:
      TableName: WorldCupTable
      AttributeDefinitions:
        - AttributeName: PlayerName
          AttributeType: S
      KeySchema:
        - AttributeName: PlayerName
          KeyType: HASH
      ProvisionedThroughput:
        ReadCapacityUnits: 1
        WriteCapacityUnits: 1
      StreamSpecification:
        StreamViewType: NEW_AND_OLD_IMAGES
  #Logs for StepFunctions
  TargetStateMachineLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      RetentionInDays: 7
      LogGroupName: ddb-stream-pipes-sf/StateMachine
  #Execution Role for StepFunctions
  TargetStateMachineRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - states.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: CloudWatchLogs
          PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogDelivery'
                  - 'logs:GetLogDelivery'
                  - 'logs:UpdateLogDelivery'
                  - 'logs:DeleteLogDelivery'
                  - 'logs:ListLogDeliveries'
                  - 'logs:PutResourcePolicy'
                  - 'logs:DescribeResourcePolicies'
                  - 'logs:DescribeLogGroups'
                Resource: '*'

  #Target StepFunction state machine
  TargetStateMachine:
    Type: AWS::Serverless::StateMachine
    Properties:
      Type: EXPRESS
      DefinitionUri: workflow/ddb-pipes-sfn.asl.json
      Logging:
        Destinations:
          - CloudWatchLogsLogGroup:
              LogGroupArn: !GetAtt TargetStateMachineLogGroup.Arn
        Level: ALL
        IncludeExecutionData: true
      Role: !GetAtt TargetStateMachineRole.Arn

  # Role for EventBridge Pipes to read from DynamoDB Stream and launch SFN
  EventBridgePipesRole:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - pipes.amazonaws.com
            Action:
              - sts:AssumeRole
      Policies:
        - PolicyName: CloudWatchLogs
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'logs:CreateLogGroup'
                  - 'logs:CreateLogStream'
                  - 'logs:PutLogEvents'
                Resource: '*'
        - PolicyName: SourcePolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "dynamodb:DescribeStream"
                  - "dynamodb:GetRecords"
                  - "dynamodb:GetShardIterator"
                  - "dynamodb:ListStreams"
                Resource: !GetAtt DynamoDBWCTable.StreamArn
        - PolicyName: ExecuteSFN
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - 'states:StartExecution'
                Resource: !Ref TargetStateMachine
  # Pipes definition
  DDBStreamToSFN:
    Type: AWS::Pipes::Pipe
    Properties:
      Name: 'DDbStreamToSfnPipe'
      RoleArn: !GetAtt EventBridgePipesRole.Arn
      Source: !GetAtt DynamoDBWCTable.StreamArn
      SourceParameters:
        FilterCriteria:
          Filters:
            - Pattern: !Sub '{"dynamodb": {"NewImage": {"Nationality": {"S": [{"prefix": "${NationalTeam}"}]}}}}'
        DynamoDBStreamParameters:
          StartingPosition: LATEST
          BatchSize: 1
      Target: !Ref TargetStateMachine
      TargetParameters:
        StepFunctionStateMachineParameters:
          InvocationType: FIRE_AND_FORGET

Outputs:
  DynamoDBSourceTableName:
    Description: 'DynamoDB Table Name'
    Value:
      Ref: DynamoDBWCTable
  SFNLog:
    Description: 'StepFunctions LogGroup Name'
    Value: !Ref TargetStateMachineLogGroup

< Back to all patterns


GitHub icon Visit the GitHub repo for this pattern.

Download

git clone https://github.com/aws-samples/serverless-patterns/ cd serverless-patterns/eventbridge-pipes-ddbstream-sfn

Deploy

sam deploy --guided


Testing

See the GitHub repo for detailed testing instructions.

Cleanup

Delete the stack: aws cloudformation delete-stack --stack-name STACK_NAME .

Presented by Sai Katakam

AWS Cloud Consultant

Follow on LinkedIn