Greengrass IoT Core Topic Subscription Failure

0

Introduction

I'm running greengrass with aws.greengrass.Nucleus=2.9.3 and ran into a problem with IoT Core topic subscription.

My code is pasted below and here's what the code is doing in English:

  1. publishe to the reserved topic $aws/things/{thing_name}/jobs/get.
  2. subscribe to both $aws/things/{thing_name}/jobs/get/accepted and $aws/things/{thing_name}/jobs/get/rejected

The goal is to get the response from any of the two subscriptions.

try:

     # publish to jobs/get
      topic = f"$aws/things/{self.device_name}/jobs/get"
      self.logger.debug(f"pub to {topic}")
      self.ipc_client_v2.publish_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE, payload="")

    # subscribe to jobs/get/accepted
    topic = f"$aws/things/{self.device_name}/jobs/get/accepted"
    self.logger.info(f"sub to {topic}")
    _, subscribe_accepted_op = self.ipc_client_v2.subscribe_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE, on_stream_event=self.on_iot_core_event, on_stream_error=self.on_iot_core_error, on_stream_closed=self.on_iot_core_closed)

    # subscribe to jobs/get/rejected
    topic = f"$aws/things/{self.device_name}/jobs/get/rejected"
    self.logger.info(f"sub to {topic}")
    _, subscribe_rejected_op = self.ipc_client_v2.subscribe_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE, on_stream_event=self.on_iot_core_event, on_stream_error=self.on_iot_core_error, on_stream_closed=self.on_iot_core_closed)

    while True:
        # loop forever
        time.sleep(1)

except Exception:
    self.logger.exception("Exception while running")
    exit_code = 1

Here are the event handlers

def on_iot_core_event(self, event: IoTCoreMessage) -> None:
    try:
        # use monotonic timestamp for same reason as in check_timeout
        self.logger.info("got response!")
    except:
        self.logger.exception("Error handling get response")

def on_iot_core_error(self, error: Exception) -> bool:
    # Handle error.
    self.logger.error(f"IoT Core stream error = {error}")
    return False  # Return True to close stream, False to keep stream open.

def on_iot_core_closed(self) -> None:
    self.logger.info("IoT Core stream closed")
    # Handle close.
    pass

Problem

When I deploy this component using the CLI, I see the following in the log:

2023-02-10 15:07:28,801 - sc - DEBUG - pub to $aws/things/**ubuntu/jobs/get
2023-02-10 15:07:28,801 - sc - INFO - sub to $aws/things/**-ubuntu/jobs/get/accepted
2023-02-10 15:07:29,177 - sc - ERROR - IoT Core stream error = ('Unexpected response stream event type: aws.greengrass#SubscribeToIoTCoreResponse, expected: aws.greengrass#IoTCoreMessage', b'{}')
2023-02-10 15:07:29,177 - sc - INFO - IoT Core stream closed
2023-02-10 15:07:29,177 - sc - ERROR - Exception while running
Traceback (most recent call last):
  File "/home/me/greengrass/v2/packages/artifacts/error/1.0.0/main.py", line 61, in main
    _, subscribe_accepted_op = self.ipc_client_v2.subscribe_to_iot_core(topic_name=topic, qos=QOS.AT_LEAST_ONCE,on_stream_event=self.on_iot_core_event,
  File "/usr/local/lib/python3.8/dist-packages/awsiot/greengrasscoreipc/clientv2.py", line 893, in subscribe_to_iot_core
    return fut.result(), op
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 444, in result
    return self.__get_result()
  File "/usr/lib/python3.8/concurrent/futures/_base.py", line 389, in __get_result
    raise self._exception
  File "/usr/local/lib/python3.8/dist-packages/awsiot/eventstreamrpc.py", line 711, in _on_continuation_message
    self._handle_data(model_name, payload)
  File "/usr/local/lib/python3.8/dist-packages/awsiot/eventstreamrpc.py", line 739, in _handle_data
    raise UnmappedDataError(msg, payload)
awsiot.eventstreamrpc.UnmappedDataError: ('Unexpected response type: aws.greengrass#IoTCoreMessage, expected: aws.greengrass#SubscribeToIoTCoreResponse', b'{"message":{"topicName":"$aws/things/**-ubuntu/jobs/get/accepted","payload":"eyJ0aW1lc3RhbX*****"}}')

Analysis & Question

The log is suggesting that
my event handler got the aws.greengrass#SubscribeToIoTCoreResponse message while it's expecting aws.greengrass#IoTCoreMessage
and my subscribe operation got the aws.greengrass#IoTCoreMessage while it's expecting aws.greengrass#SubscribeToIoTCoreResponse

If I change the order of subscription and publish (subscribe first, then publish) then the code works as expected. But this feels like a bug to me that subscription fails when a message is being published to the topic I'm trying to subscribe to. Is this a bug?

ictwist
asked a year ago337 views
1 Answer
1
Accepted Answer

You are correct that this is a race due to the asynchronous nature of Greengrass IPC. The subscription has received a message and forwarded it to your listener before the initial subscription response has been sent to tell you that the subscription was successfully completed. We will look at addressing this in a future version.

You should always be subscribing before publishing, not to avoid a race, but because your publish has a high likelihood of being missed if you haven't subscribed yet.

Cheers,

Michael

AWS
EXPERT
answered a year ago
profile picture
EXPERT
reviewed a month ago

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions