Greengrass StreamManager export error

0

I have a custom gg component that collects multiple data streams and publishes to a single Kinesis stream. Stream manager ingests data correctly (I can see the data in the work folder), but I also have a lot of log errors that prevent data from coming to Kinesis.

  • All data streams are newly created and were not used before
  • 3 data streams that collect different data in the same gg component
  • StreamManagerClient instance is reused among them

Error log that I see a lot for a different streams (including new ones)

2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. 2023 Oct 06 14:04:39,680 [ERROR] (pool-7-thread-2) com.amazonaws.iot.greengrass.streammanager.export.upload.MessageUploaderTask: Encountered Throwable when exporting messages. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. com.amazonaws.iot.greengrass.streammanager.store.exceptions.InvalidStreamPositionException: Unable to find sequence number 2888279 in log file for stream iot. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.store.log.FileLogSegment.read(FileLogSegment.java:274) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.store.log.MessageStreamLog.read(MessageStreamLog.java:379) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.store.log.MessageInputStreamHandleLogImpl.read(MessageInputStreamHandleLogImpl.java:32) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at com.amazonaws.iot.greengrass.streammanager.export.upload.MessageUploaderTask.upload(MessageUploaderTask.java:66) ~[AWSGreengrassGreenlake-1.0-super.jar:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}
2023-10-06T14:04:39.681Z [INFO] (Copier) aws.greengrass.StreamManager: stdout. at java.lang.Thread.run(Thread.java:829) [?:?]. {scriptName=services.aws.greengrass.StreamManager.lifecycle.startup.script, serviceName=aws.greengrass.StreamManager, currentState=RUNNING}

Component logic


STREAM_MANAGER_CLIENT = StreamManagerClient()

def on_a():
    ...
    STREAM_MANAGER_CLIENT.append_message("Av1", data=encoded_msg)

def on_b():
    ...
    STREAM_MANAGER_CLIENT.append_message("Bv1", data=encoded_msg)

def on_c():
    ...
    STREAM_MANAGER_CLIENT.append_message("Cv1", data=encoded_msg)


def create_stream_definition(identifier, version):
    kinesisConfig = KinesisConfig(identifier="KinesisExport" + identifier + version, kinesis_stream_name=STREAM_NAME)
    exportDefinition = ExportDefinition(kinesis=[kinesisConfig])

    streamDefinition = MessageStreamDefinition(
        name=identifier + version,  # Required.
        stream_segment_size=1024,
        strategy_on_full=StrategyOnFull.OverwriteOldestData,  # Required.
        export_definition=exportDefinition
    )

    return streamDefinition


def init_stream_client():
    try:
        STREAM_MANAGER_CLIENT.create_message_stream(create_stream_definition("A", "v1"))
        STREAM_MANAGER_CLIENT.create_message_stream(create_stream_definition("B", "v1"))
        STREAM_MANAGER_CLIENT.create_message_stream(create_stream_definition("C", "v1"))
    except InvalidRequestException as e:
        print("Exception occurred", file=sys.stderr)
        traceback.print_exc()

        if "the message stream already exists" in str(e):
            print("Message stream exists. Ignoring exception. ")
        else:
            raise

if __name__ == '__main__':
    init_stream_client()
  • Hello, did you manually delete anything from the disk? Did the device lose power or shutdown unsafely? This error indicates that data was lost due to corruption, but ultimately it should not really be a problem. Stream manager will retry forever and will eventually recover as old data is overwritten. You may want to delete the stream using the delete stream API or reset the Kinesis export for the effected stream to start from sequence number 0.

  • Hi,

    An external battery powers my device, and I may lose power without notification. When this happens, StreamManager will not upload existing data until the "corrupted" block is overwritten, which may never happen due to new "corrupted" blocks happening.

    Is there any way to configure StreamManager to flush data on disk after each insert (to prevent a mismatch between data and log) or ignore corrupted blocks?

  • Not at the moment, no. We are working on resiliency for shadow manager, but you may want to make some changes on your end as well. Some things to consider; change the filesystem options to use data=journal for ext filesystems. You may also consider if you need shadow manager or if you can just use shadow directly over MQTT using the Greengrass IPC APIs which allow you access to MQTT.

Ruslan
asked 7 months ago105 views
No Answers

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions