Kinesis Analytics for SQL Application Issue

0

Hello, I am having trouble to properly handle query with tumbling window. My application sends 15 sensor data messages per second to Kinesis Data Stream, which is used as an input stream for Kinesis Analytics application. I am trying to run an aggregation query using a GROUP BY clause to process rows in a tumbling window by 60 second interval. The output stream then sends data to a lambda function. I expect that the messages should arrive at lambda every 60 seconds but instead, they arrive much faster, almost every second, and the aggregations don't work as expected. Here is the CloudFormation template that I am using:

ApplicationCode: CREATE OR REPLACE STREAM "SENSORCALC_STREAM" (
"name" VARCHAR(16),
"facilityId" INTEGER, "processId" BIGINT, "sensorId" INTEGER NOT NULL, "min_value" REAL, "max_value" REAL, "stddev_value" REAL);

    CREATE OR REPLACE PUMP "SENSORCALC_STREAM_PUMP" AS 
    INSERT INTO "SENSORCALC_STREAM" 
    SELECT STREAM      
        "name", 
        "facilityId",             
        "processId",             
        "sensorId",        
        MIN("sensorData") AS "min_value",
        MAX("sensorData") AS "max_value",
        STDDEV_SAMP("sensorData") AS "stddev_value"
        
    FROM "SOURCE_SQL_STREAM_001"
    GROUP BY "facilityId","processId", "sensorId", "name",
         STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);

KinesisAnalyticsSensorApplicationOutput: Type: "AWS::KinesisAnalytics::ApplicationOutput" DependsOn: KinesisAnalyticsSensorApplication Properties: ApplicationName: !Ref KinesisAnalyticsSensorApplication Output: Name: "SENSORCALC_STREAM" LambdaOutput: ResourceARN: !GetAtt SensorStatsFunction.Arn RoleARN: !GetAtt KinesisAnalyticsSensorRole.Arn DestinationSchema: RecordFormatType: "JSON"

I would really appreciate your help in pointing what I am missing. Thank you, Serge

Serge
質問済み 2年前104ビュー
回答なし

ログインしていません。 ログイン 回答を投稿する。

優れた回答とは、質問に明確に答え、建設的なフィードバックを提供し、質問者の専門分野におけるスキルの向上を促すものです。

質問に答えるためのガイドライン

関連するコンテンツ