By using AWS re:Post, you agree to the AWS re:Post Terms of Use

Use lambda, kinesis data firehose, opensearch / when kdf send demo data to opensearch, data set index

0

pipeLine : In order to index data to opensearch, kinesis is called from lambda and the data is forwarded to opensearch.

when sending demo data to 'Opensearch' from 'kdf', 'Opensearch' set index data. but when my 'lambda' code send data to opensearch, 'Opensearch' can't set data..

json data(record) in lambda : {'recordId': '1', 'result': 'Ok', 'data': '{"data_seq": 1, "data_type": "test2", "level_type": "test", "title": "Airplane", "month": "Month 1", "week": "Week 1", "reg_ts": "20201110", "mod_ts": "20230515"}'}

1 Answer
1

Hello, I see that you are facing the problem due to the mentioned usecase :-

Problem: When using a subscription filter to send logs from CloudWatch to Kinesis Data Firehose, the CloudWatch logs are first compressed before being sent. This results in several CloudWatch logs being stored in the same Firehose record, in a compressed format, when they are received by Firehose. Additionally, this Firehose record contains metadata passed on by CloudWatch, such as "messageType", "owner", "logGroup", "logStream", and "subscriptionFilters", leaving the actual logs stored in "logEvents".

If you try to send CloudWatch logs to Kinesis Data Firehose and then on to OpenSearch without any data transformation, you will receive count exception error as the logs are still compressed, and have not been extracted from the "logEvents" field.


Solution:

The proposed solution uses the data transformation function to construct its own bulk request using the decompressed logs, and leaving space for the line { "index" : { "_index" : "test", "_id" : "1" } } that Firehose will add upon sending the bulk request to OpenSearch.

For the code, if we use the AWS-provided blueprint kinesis-firehose-cloudwatch-logs-processor-python as a starting point and added the required logic to it, particularly to the processRecords and transformLogEvent functions. You can find this blueprint at reference [2]. Please see the changes I made to this code below:

Firstly, in the transformLogEvent function, set the index name:

indexName = "my-index"

In the transformLogEvent function, add the line '{ "index" : { "_index" : "<indexName>" } }' before each CloudWatch log, with a new line in between, and a new line after:

return '{ "index" : { "_index" : "'+ indexName +'" } }' + '\n' + log_event['message'] + '\n'

This then returns a result similar to the following once all CloudWatch logs are passed:

{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }

In the processRecords function, after receiving the result from transformLogEvent, you need to remove the first index specification line as Firehose will add this when sending the bulk request to Elasticsearch. You can do this by first splitting the data into a list based on the new line: joinedDataList = joinedData.split('\n')

Then delete the first item from this list, which would be the first entry of { "index" : { "_index" : "<indexName>" } } in the list:

del(joinedDataList[0])

Finally, create an empty string, and add each remaining item from the list:

joinedData = ""
for line in joinedDataList:
joinedData += line + '\n'
print("joinedDataListLine " + line)

This would leave you with the following:

{ "field1" : "value1" }
{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }
{ "index" : { "_index" : "<indexName>" } }
{ "field1" : "value1" }

Optionally, you can also add functionality to the code that will rotate the index you specify. You can add either a daily or monthly rotation to the index by adding the below code to the transformLogEvent function. Please note that you should only keep one option, daily or monthly in your code, or if you want one long-lived index you can exclude all of the below lines. Additionally, please ensure that the index rotation configuration set for the Firehose stream matches what you have set in the code.

The below applies a daily index rotation:

indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:] + "-" + ("0" + str(datetime.now().day))[-2:]

The below applies a montly index rotation:

indexName = indexName + "-" + str(datetime.now().year) + "-" + ("0" + str(datetime.now().month))[-2:]

Kindly note, the above detail are not intended to be used as **production code without proper testing ** in your environment.

Alternative solutions that avoid using Firehose

The best practice for sending CloudWatch logs to OpenSearch is to use the direct CloudWatch >>> OpenSearch subscription filter [3]. This setup is designed specifically for this CloudWatch logs to OpenSearch setup, and therefore is the recommended approach to take when possible for the use case.

An alternative to this is to use a Lambda subscription filter [4] to decompress the CloudWatch logs and put each log into its own record, and then send these records from Lambda to a Firehose stream that has an OpenSearch destination. This setup does not have the same limitations as the Firehose data transformation function regarding the Firehose record count and the "recordId" of each Firehose record returned matching what Kinesis Data Firehose sent.


If you need further details or additional help with usecase, reachout to AWS support team through a support case and any engineer will be able to help you with details.


References:

[1] Kinesis Data Firehose - Data Transformation and Status Model - https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html#data-transformation-status-model

[2] AWS-provided blueprint kinesis-firehose-cloudwatch-logs-processor-python - https://eu-west-1.console.aws.amazon.com/lambda/home?region=eu-west-1#/create/function/configure/blueprint?blueprint=kinesis-firehose-cloudwatch-logs-processor-python

[3] Streaming CloudWatch Logs data to Amazon OpenSearch Service - https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_OpenSearch_Stream.html

[4] Subscription filters with AWS Lambda - https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#LambdaFunctionExamp

AWS
SUPPORT ENGINEER
answered a year ago
  • Will this still work if the logEvents contains multiple records that has to be submitted seperately under one firehose recordId? I tried that and getting error: "The response from the Bulk API contained more entries than the number of records sent. Ensure that each record contains only one JSON object and that there are no newlines." The error arises because our log records consist of multiple log events for each Firehose record, leading Firehose to interpret this as a single record during submission to OpenSearch. Consequently, the discrepancy between the single record submission and multiple records identified in OpenSearch's response triggers the mentioned error. Although this results in successful submissions to OpenSearch but also causes the entirety of the data to be submitted to S3 due to the error. This issue disrupts some Firehose monitoring metrics, such as DeliveryToAmazonOpenSearchServerless.Success and they stop reporting.

  • This is working for me to submit multiple OpenSearch records from a single Firehose recordId. I'm not seeing any errors about the bulk Api containing more entries than the number of records sent.

    Note that the Firehose "Records delivered to OpenSearch" count will be equal to the number of records sent by Firehose, not the number of records inserted in the constructed bulk request.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions