Select your cookie preferences

We use cookies and similar tools to enhance your experience, provide our services, deliver relevant advertising, and make improvements. Approved third parties also use these tools to help us deliver advertising and provide certain site features.

Typescript: Amazon Kinesis Data Streams, AWS Lambda, Amazon DynamoDB Example

This project contains examples of TypeScript tests written for AWS Lambda interacting with Amazon Kinesis Data Streams and Amazon DynamoDB.


Files:

Application Code

//Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
//SPDX-License-Identifier: MIT-0

/**
 * Lambda Handler for the typescript kinesis-lambda-dynamodb
 * This handler accepts a kinesis event with records that contain JSON object in data property
 * The DynamoDB Table used is passed as an environment variable "PROCESSED_RECORDS_TABLE_NAME"
*/

import { KinesisStreamEvent, KinesisStreamRecord } from 'aws-lambda';
import { DynamoDBClient } from "@aws-sdk/client-dynamodb";
import {
    BatchWriteCommand,
    DynamoDBDocumentClient,
} from "@aws-sdk/lib-dynamodb";

const ddbClient = new DynamoDBClient({});
const ddbDocumentClient = DynamoDBDocumentClient.from(ddbClient);

// Porcessed record is type stored in the DDB Table
export type ProcessedRecord = {
    PK: string,
    SK: string,
}

// Record type expected in the Kinesis Data Stream data paload
export type UnprocessedRecord = {
    batch: string,
    id: string,
}

export const lambdaHandler = async (event: KinesisStreamEvent): Promise => {
    // Getting the dynamoDB table name from environment variable
    const dynamoDBTableName = process.env.PROCESSED_RECORDS_TABLE_NAME;

    let itemBatch : ProcessedRecord[] = [];
    const batchPromises: Promise[] = [];

    for (let index = 0; index < event.Records.length; index++) {
        const item = createRecordItem(event.Records[index]);
        itemBatch.push(item);
        const isLastItem = index === event.Records.length - 1;

        // DDB BatchWriteItem is limited to 25 items
        if (isLastItem || itemBatch.length === 25) {
            // store batch in the DDB Table and reset itemBatch
            batchPromises.push(storeBatchInTable(itemBatch, dynamoDBTableName));
            itemBatch = [];
        }
    }

    await Promise.all(batchPromises);
};

const createRecordItem = (record: KinesisStreamRecord): ProcessedRecord => {
    const payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii');
    const data = JSON.parse(payload) as UnprocessedRecord;

    return {
        PK: data.batch,
        SK: data.id,
    };
}

const storeBatchInTable = async (records: ProcessedRecord[], tableName: string): Promise => {
    const writeCommand = new BatchWriteCommand({
        RequestItems: {
            [tableName]: records.map((r) => ({
                PutRequest: { Item: r }
            })),
        },
    });

    try {
        const response = await ddbDocumentClient.send(writeCommand);
        console.log(response)
    } catch (e) {
        console.error(e);
    }
};
                                                  


GitHub icon Visit the GitHub repo for this pattern.

Clone locally

git clone https://github.com/aws-samples/serverless-patterns/ cd typescript-test-samples/kinesis-lambda-dynamodb

Authors

Richard Vidis Cloud Application Architect at AWS
Richard Vidis

Presented by Richard Vidis

Cloud Application Architect at AWS

Follow on LinkedIn