Select your cookie preferences

We use cookies and similar tools to enhance your experience, provide our services, deliver relevant advertising, and make improvements. Approved third parties also use these tools to help us deliver advertising and provide certain site features.

Serverless Stream Processing with Lambda Amazon Kinesis Data Streams - Integration with AWS Lambda 5 min

Learning Goals

  • Amazon KDS integration with AWS Lambda for Serverless stream processing

Amazon KDS as Lambda ESM

AWS Lambda can serve as a consumer that can read events from Amazon KDS and process events in a serverless fashion. You can use Lambda event source mappings (ESM) to read events from Amazon KDS and process the events.

KDS poller design

A consumer reads events from a shard in Amazon KDS. By default, the read throughput of a shard (2Mb/sec) is shared across all the consumers that are reading from a given shard. Even if there are multiple consumers reading from the same shard, the 2Mb/sec throughput is fixed. This is called as shared-throughput consumer (standard iterator). When Lambda is configured as a shared throughput-consumer, it polls each shard using HTTP protocol.

For high read throughput and low latency use cases, a consumer can use enhanced fan-out which allows multiple consumers to read data from the same stream in parallel with dedicated throughput of up to 2MB/sec for each consumer. Each consumer that is registered to use enhanced fan-out receives its own read throughput per shard (up to 2MB/sec). The limit for number of consumers using enhanced fan-out is twenty per stream.

KDS EFO

Batching behavior

AWS Lambda has a default behavior, when integrated with Amazon KDS using event source mappings (ESM), that batches records together into a single payload that Lambda sends to your function during invocation. Lambda polls each shard in the Kinesis Data Stream and invokes your Lambda function synchronously with a payload that contains a stream of records. Lambda reads the events in batches and invokes the function to process each batch. Each batch contain records from a single shard or Kinesis Data Stream.

Batching Window

As soon as records are available, the Lambda function is invoked irrespective of number of records available. In order to avoid overly frequent invocations of a Lambda function with small batches of records, you can configure a batching window (MaximumBatchingWindowInSeconds), which is the maximum amount of time (up to 5minutes) to gather the records into a single payload. You can also configure the size of the batch using the parameter BatchSize.

KDS ESM Batching

Lambda invokes your function when any of the following criteria is met:

  • batching window expires
  • batch size is met
  • Lambda Payload size limit is reached (6MB)

Scaling and Concurrency

By default, there is only one instance of synchronous Lambda invocation per batch per shard. You can control the concurrency with ParallelizationFactor enabling you to process multiple batches (up to 10 batches) from each shard in parallel. Lambda still ensures in-order processing of events at the partition-key level.

KDS ESM Concurrency KDS ESM Concurrency-2

Please refer to this blog to dive deep into the scaling controls to handle high-traffic as well as low-traffic scenarios.

Event filtering

When you configure Amazon KDS as a source using AWS Lambda ESM, you can make use of Lambda event filtering to control which records from the stream is sent to your function by Lambda. For example, you can filter on certain parameters contained in the messages and process only those messages that satisfies the criteria. Up to five different event filters can be applied, which are then evaluated as logical OR over messages. The messages that satisfy the criteria are sent to the Lambda function in the next payload and the messages that do not satisfy any criteria are discarded.

A filter is defined using a FilterCriteria object which has a list of Filters. Each Filter is defined as a Pattern that filters the events. The structure of the FilterCriteria is given below.

{
   "Filters": [
        {
            "Pattern": "{ \"Metadata1\": [ rule1 ], \"data\": { \"Data1\": [ rule2 ] }}"
        }
    ]
}

In JSON representation, the above Pattern looks like below.

{
    "Metadata1": [ rule1 ],
    "data": {
        "Data1": [ rule2 ]
    }
}

The events can be filtered on metadata parameters and data parameters individually or together. Please note that the format of these parameters vary by the AWS service being used.

Handling of records that don't meet the filter criteria

When using Lambda filters, the events that don't satisfy the filter criteria are handled differently depending on the event source.

  • In Amazon SQS, Lambda removes the messages from the queue if they do not match any criteria defined.
  • In Kinesis and DynamoDB, the messages that do not satisfy the any criteria, once processed, are not deleted from the source. These records follow the retention period configured and are deleted after the retention period.
  • In Amazon MSK, self-managed Apache Kafka and Amazon MQ, Lambda drops messages that don't match all fields included in the filter. For self-managed Apache Kafka, Lambda commits offsets for matched and unmatched messages after successfully invoking the function. For Amazon MQ, Lambda acknowledges matched messages after successfully invoking the function and acknowledges unmatched messages when filtering them.

For more details, please refer to Lambda Event Filtering documentation.

Watch the following video for a better understanding of Kinesis as Lambda Event Source Mapping.