This pattern deploy an AWS SAM application with Amazon API Gateway, Amazon Kinesis Data Stream, AWS Lambda, and Amazon DynamoDB.
AWSTemplateFormatVersion: "2010-09-09"
Transform: AWS::Serverless-2016-10-31
Description: A rest endpoint hosted by an API Gateway which invokes Kinesis data stream, consumed by lambda function and persisted in a dynamoDB table. (uksb-1tthgi812) (tag:apigw-kinesis-lambda-ddb)
Resources:
##########################################################################
# REST API - Entrypoint #
##########################################################################
RestApi:
Type: AWS::Serverless::Api
Properties:
AccessLogSetting:
DestinationArn: !GetAtt RestApiGatewayAccessLogs.Arn
Format: '{ "requestId":"$context.requestId", "ip": "$context.identity.sourceIp", "requestTime":"$context.requestTime", "httpMethod":"$context.httpMethod","routeKey":"$context.routeKey", "status":"$context.status","protocol":"$context.protocol", "responseLength":"$context.responseLength" }'
DefinitionBody:
"Fn::Transform":
Name: "AWS::Include"
Parameters:
Location: "api.yaml"
StageName: prod
Auth:
DefaultAuthorizer: GatewayAuthorizerFunction
Authorizers:
GatewayAuthorizerFunction:
FunctionArn: !GetAtt GatewayAuthorizerFunction.Arn
FunctionPayloadType: REQUEST
Identity:
Headers:
- Authorization
ReauthorizeEvery: 0
##########################################################################
# Kinesis Data Streams #
##########################################################################
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: GatewayEventsStream
ShardCount: 1
StreamEncryption:
EncryptionType: KMS
KeyId: alias/aws/kinesis
##########################################################################
# SQS Queues #
##########################################################################
# SQS DLQ
DeadLetterQueue:
Type: AWS::SQS::Queue
Properties:
QueueName: LambdaDeadLetterQueue
SqsManagedSseEnabled: true
##########################################################################
# Lambda Functions #
##########################################################################
# Api Gateway Authorizer
GatewayAuthorizerFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: GatewayAuthorizerFunction
Description: Lambda to be invoked by API Gateway for authorization
CodeUri: lambda/authorizer/
Handler: index.lambda_handler
Runtime: python3.10
Timeout: 3
MemorySize: 128
Role: !GetAtt AuthorizerLambdaExecutionRole.Arn
Layers:
[
!Sub "arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV2:32",
]
# Kinesis Handler
KinesisHandlerFunction:
Type: AWS::Serverless::Function
Properties:
FunctionName: KinesisHandlerFunction
Description: Lambda to consumer from kinesis data stream
CodeUri: lambda/kinesis-handler/
Runtime: python3.10
Handler: index.lambda_handler
MemorySize: 128
Layers:
[
!Sub "arn:aws:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV2:32",
]
DeadLetterQueue:
TargetArn: !GetAtt DeadLetterQueue.Arn
Type: SQS
Policies:
- Statement:
- Sid: AllowDbConnect
Effect: Allow
Action:
- "dynamodb:PutItem"
Resource:
- !GetAtt EventTable.Arn
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt KinesisStream.Arn
StartingPosition: LATEST
BatchSize: 10
##########################################################################
# Roles #
##########################################################################
ApiGatewayRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- apigateway.amazonaws.com
Action:
- "sts:AssumeRole"
Policies:
- PolicyName: CustomPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "kinesis:PutRecord"
- "kinesis:PutRecords"
- "kinesis:GetShardIterator"
- "kinesis:GetRecords"
Resource: !Sub
- "${varStreamArn}*"
- varStreamArn: !GetAtt KinesisStream.Arn
- Effect: Allow
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:DescribeLogGroups"
- "logs:DescribeLogStreams"
- "logs:PutLogEvents"
- "logs:GetLogEvents"
- "logs:FilterLogEvents"
Resource: !GetAtt RestApiGatewayAccessLogs.Arn
AuthorizerLambdaExecutionRole:
Type: "AWS::IAM::Role"
Properties:
AssumeRolePolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Principal:
Service:
- lambda.amazonaws.com
Action:
- "sts:AssumeRole"
Policies:
- PolicyName: CustomPolicy
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- "logs:CreateLogGroup"
- "logs:CreateLogStream"
- "logs:PutLogEvents"
Resource: "*"
- Effect: Allow
Action:
- "secretsmanager:GetSecretValue"
Resource: [!Ref BasicAuthUsername, !Ref BasicAuthPassword]
##########################################################################
# DynamoDB Table #
##########################################################################
EventTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: eventId
AttributeType: S
BillingMode: PROVISIONED
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 10
KeySchema:
- AttributeName: eventId
KeyType: HASH
TableName: EventTable
SSESpecification:
SSEEnabled: true
##########################################################################
# Cloudwatch Logs #
##########################################################################
RestApiGatewayAccessLogs:
Type: AWS::Logs::LogGroup
Properties:
LogGroupName: Rest-ApiGw-Kinesis-Access-Logs
RetentionInDays: 1
##########################################################################
# Secrets - Generated for Endpoint Auth #
##########################################################################
BasicAuthUsername:
Type: AWS::SecretsManager::Secret
Properties:
Description: Username to be used in basic auth
Name: gateway/username
GenerateSecretString:
ExcludeCharacters: ""
ExcludeLowercase: false
ExcludeNumbers: false
ExcludePunctuation: false
ExcludeUppercase: false
GenerateStringKey: UserName
IncludeSpace: false
PasswordLength: 32
RequireEachIncludedType: true
SecretStringTemplate: !Sub '{"UserName": "REPLACED"}'
BasicAuthPassword:
Type: AWS::SecretsManager::Secret
Properties:
Description: Password to be used in basic auth
Name: gateway/password
GenerateSecretString:
ExcludeCharacters: ""
ExcludeLowercase: false
ExcludeNumbers: false
ExcludePunctuation: false
ExcludeUppercase: false
GenerateStringKey: Password
IncludeSpace: false
PasswordLength: 32
RequireEachIncludedType: true
SecretStringTemplate: !Sub '{"Password": "REPLACED"}'
##########################################################################
# Outputs #
##########################################################################
Outputs:
RestApiEndpoint:
Description: "Rest API endpoint"
Value: !Sub "https://${RestApi}.execute-api.${AWS::Region}.amazonaws.com"
Visit the GitHub repo for this pattern.
git clone https://github.com/aws-samples/serverless-patterns/ cd serverless-patterns/apigw-kinesis-lambda-ddb
sam deploy
sam delete
.