Select your cookie preferences

We use cookies and similar tools to enhance your experience, provide our services, deliver relevant advertising, and make improvements. Approved third parties also use these tools to help us deliver advertising and provide certain site features.

Amazon API Gateway to Amazon Kinesis Data Stream to AWS Lambda to Amazon DynamoDB

Created with SnapAmazon API GatewayKinesis Data StreamsAWS LambdaAmazon DynamoDB

This pattern deploy an AWS SAM application with Amazon API Gateway, Amazon Kinesis Data Stream, AWS Lambda, and Amazon DynamoDB.

This pattern deploys an Amazon API Gateway REST API with route /prod/submit/{streamName}/{eventId} configured with basic authentication.
On receiving a request, API Gateway invokes a Lambda authorizer which validates the request by checking basic auth credentials from AWS Secrets Manager and returns a policy informing API Gateway to accept or deny the request.
When the request is accepted, API Gateway sends the message payload to Kinesis Data Stream. This pattern is useful in cases of large payloads since Kinesis Data Stream can support up to 1MB payload size.
Lambda process messages from the Kinesis Data Streams. Lambda uses a SQS queue as Dead Letter Queue to send the messages in case of continued failures to process the messages.
Lambda saves the received messages into a DynamoDB table.

AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: A rest endpoint hosted by an API Gateway which invokes Kinesis data stream, consumed by lambda function and persisted in a dynamoDB table. (uksb-1tthgi812) (tag:apigw-kinesis-lambda-ddb)

Resources:
  ##########################################################################
  #   REST API - Entrypoint                                                #
  ##########################################################################
  RestApi:
    Type: AWS::Serverless::Api
    Properties:
      AccessLogSetting:
        DestinationArn: !GetAtt RestApiGatewayAccessLogs.Arn
        Format: '{ "requestId":"$context.requestId", "ip": "$context.identity.sourceIp", "requestTime":"$context.requestTime", "httpMethod":"$context.httpMethod","routeKey":"$context.routeKey", "status":"$context.status","protocol":"$context.protocol", "responseLength":"$context.responseLength" }'
      DefinitionBody:
        "Fn::Transform":
          Name: "AWS::Include"
          Parameters:
            Location: "api.yaml"
      StageName: prod
      Auth:
        DefaultAuthorizer: GatewayAuthorizerFunction
        Authorizers:
          GatewayAuthorizerFunction:
            FunctionArn: !GetAtt GatewayAuthorizerFunction.Arn
            FunctionPayloadType: REQUEST
            Identity:
              Headers:
                - Authorization
              ReauthorizeEvery: 0

  ##########################################################################
  #   Kinesis Data Streams                                                 #
  ##########################################################################
  KinesisStream:
    Type: AWS::Kinesis::Stream
    Properties:
      Name: GatewayEventsStream
      ShardCount: 1
      StreamEncryption:
        EncryptionType: KMS
        KeyId: alias/aws/kinesis

  ##########################################################################
  #   SQS Queues                                                           #
  ##########################################################################
  # SQS DLQ
  DeadLetterQueue:
    Type: AWS::SQS::Queue
    Properties:
      QueueName: LambdaDeadLetterQueue
      SqsManagedSseEnabled: true

  ##########################################################################
  #   Lambda Functions                                                     #
  ##########################################################################
  # Api Gateway Authorizer
  GatewayAuthorizerFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: GatewayAuthorizerFunction
      Description: Lambda to be invoked by API Gateway for authorization
      CodeUri: lambda/authorizer/
      Handler: index.lambda_handler
      Runtime: python3.10
      Timeout: 3
      MemorySize: 128
      Role: !GetAtt AuthorizerLambdaExecutionRole.Arn
      Layers:
        [
          !Sub "arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV2:32",
        ]

  # Kinesis Handler
  KinesisHandlerFunction:
    Type: AWS::Serverless::Function
    Properties:
      FunctionName: KinesisHandlerFunction
      Description: Lambda to consumer from kinesis data stream
      CodeUri: lambda/kinesis-handler/
      Runtime: python3.10
      Handler: index.lambda_handler
      MemorySize: 128
      Layers:
        [
          !Sub "arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV2:32",
        ]
      DeadLetterQueue:
        TargetArn: !GetAtt DeadLetterQueue.Arn
        Type: SQS
      Policies:
        - Statement:
            - Sid: AllowDbConnect
              Effect: Allow
              Action:
                - "dynamodb:PutItem"
              Resource:
                - !GetAtt EventTable.Arn
      Events:
        Stream:
          Type: Kinesis
          Properties:
            Stream: !GetAtt KinesisStream.Arn
            StartingPosition: LATEST
            BatchSize: 10

  ##########################################################################
  #   Roles                                                                #
  ##########################################################################
  ApiGatewayRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - apigateway.amazonaws.com
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: CustomPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "kinesis:PutRecord"
                  - "kinesis:PutRecords"
                  - "kinesis:GetShardIterator"
                  - "kinesis:GetRecords"
                Resource: !Sub
                  - "${varStreamArn}*"
                  - varStreamArn: !GetAtt KinesisStream.Arn
              - Effect: Allow
                Action:
                  - "logs:CreateLogGroup"
                  - "logs:CreateLogStream"
                  - "logs:DescribeLogGroups"
                  - "logs:DescribeLogStreams"
                  - "logs:PutLogEvents"
                  - "logs:GetLogEvents"
                  - "logs:FilterLogEvents"
                Resource: !GetAtt RestApiGatewayAccessLogs.Arn

  AuthorizerLambdaExecutionRole:
    Type: "AWS::IAM::Role"
    Properties:
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service:
                - lambda.amazonaws.com
            Action:
              - "sts:AssumeRole"
      Policies:
        - PolicyName: CustomPolicy
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - "logs:CreateLogGroup"
                  - "logs:CreateLogStream"
                  - "logs:PutLogEvents"
                Resource: "*"
              - Effect: Allow
                Action:
                  - "secretsmanager:GetSecretValue"
                Resource: [!Ref BasicAuthUsername, !Ref BasicAuthPassword]

  ##########################################################################
  #   DynamoDB Table                                                       #
  ##########################################################################
  EventTable:
    Type: AWS::DynamoDB::Table
    Properties:
      AttributeDefinitions:
        - AttributeName: eventId
          AttributeType: S
      BillingMode: PROVISIONED
      ProvisionedThroughput:
        ReadCapacityUnits: 5
        WriteCapacityUnits: 10
      KeySchema:
        - AttributeName: eventId
          KeyType: HASH
      TableName: EventTable
      SSESpecification:
        SSEEnabled: true

  ##########################################################################
  #   Cloudwatch Logs                                                      #
  ##########################################################################

  RestApiGatewayAccessLogs:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: Rest-ApiGw-Kinesis-Access-Logs
      RetentionInDays: 1

  ##########################################################################
  #   Secrets - Generated for Endpoint Auth                                #
  ##########################################################################

  BasicAuthUsername:
    Type: AWS::SecretsManager::Secret
    Properties:
      Description: Username to be used in basic auth
      Name: gateway/username
      GenerateSecretString:
        ExcludeCharacters: ""
        ExcludeLowercase: false
        ExcludeNumbers: false
        ExcludePunctuation: false
        ExcludeUppercase: false
        GenerateStringKey: UserName
        IncludeSpace: false
        PasswordLength: 32
        RequireEachIncludedType: true
        SecretStringTemplate: !Sub '{"UserName": "REPLACED"}'

  BasicAuthPassword:
    Type: AWS::SecretsManager::Secret
    Properties:
      Description: Password to be used in basic auth
      Name: gateway/password
      GenerateSecretString:
        ExcludeCharacters: ""
        ExcludeLowercase: false
        ExcludeNumbers: false
        ExcludePunctuation: false
        ExcludeUppercase: false
        GenerateStringKey: Password
        IncludeSpace: false
        PasswordLength: 32
        RequireEachIncludedType: true
        SecretStringTemplate: !Sub '{"Password": "REPLACED"}'

##########################################################################
#   Outputs                                                              #
##########################################################################
Outputs:
  RestApiEndpoint:
    Description: "Rest API endpoint"
    Value: !Sub "https://${RestApi}.execute-api.${AWS::Region}.amazonaws.com"

< Back to all patterns


GitHub icon Visit the GitHub repo for this pattern.

Download

git clone https://github.com/aws-samples/serverless-patterns/ cd serverless-patterns/apigw-kinesis-lambda-ddb

Deploy

sam deploy


Testing

See the GitHub repo for detailed testing instructions.

Cleanup

Delete the stack: sam delete.

Ravi Kiran Ganji

Presented by Ravi Kiran Ganji

I am a Senior Cloud Application Architect at AWS Professional Services, and Serverless Enthusiast.

Follow on LinkedIn