Subscribing to when a shadow changes and retrieving that change

0

I am having issues using this mostly default code to get a change in shadow. The code is very slightly adapted from AWS tutorial code. I just removed the arg section and changes it to hard coded variables. I also added a print statement in the callback.

import sys
import time
import traceback

from awsiot.greengrasscoreipc.clientv2 import GreengrassCoreIPCClientV2
from awsiot.greengrasscoreipc.model import (
    SubscriptionResponseMessage,
    UnauthorizedError
)


def main():

    try:
        message = 'script initialized'
        topic = 'dsvvefvfq'
        ipc_client = GreengrassCoreIPCClientV2()
        # Subscription operations return a tuple with the response and the operation.
        _, operation = ipc_client.subscribe_to_topic(topic=topic, on_stream_event=on_stream_event,
                                                     on_stream_error=on_stream_error, on_stream_closed=on_stream_closed)
        print('Successfully subscribed to topic: ' + topic)

        # Keep the main thread alive, or the process will exit.
        try:
            while True:
                time.sleep(10)
        except InterruptedError:
            print('Subscribe interrupted.')

        # To stop subscribing, close the stream.
        operation.close()
    except UnauthorizedError:
        print('Unauthorized error while subscribing to topic')
        traceback.print_exc()
        exit(1)
    except Exception:
        print('Exception occurred', file=sys.stderr)
        traceback.print_exc()
        exit(1)


def on_stream_event(event: SubscriptionResponseMessage) -> None:
    try:
        print("Callback was involked")
        message = str(event.binary_message.message, 'utf-8')
        topic = event.binary_message.context.topic
        print('Received new message on topic %s: %s' % (topic, message))
    except:
        traceback.print_exc()


def on_stream_error(error: Exception) -> bool:
    print('Received a stream error.', file=sys.stderr)
    traceback.print_exc()
    return False  # Return True to close stream, False to keep stream open.


def on_stream_closed() -> None:
    print('Subscribe to topic stream closed.')


if __name__ == '__main__':
    main()

My shadowmanager configuration looks like this:

{
  "reset": [],
  "merge": {
    "reset": [],
    "merge": {
      "strategy": {
        "type": "realTime"
      },
      "synchronize": {
        "coreThing": {
          "classic": true
        },
        "namedShadows": [
          "test",
          "dsvvefvfq"
        ],
        "direction": "betweenDeviceAndCloud"
      }
    }
  }
}

My component code looks like this:

{
  "accessControl": {
    "aws.greengrass.ShadowManager": {
      "com.xxxxxxxx.productivity.cycle_count:shadow:1": {
        "policyDescription": "allow access to shadows",
        "operations": [
          "aws.greengrass#GetThingShadow",
          "aws.greengrass#UpdateThingShadow",
          "aws.greengrass#ListNamedShadowsForThing"
        ],
        "resources": [
          "*"
        ]
      }
    },
    "aws.greengrass.ipc.pubsub": {
      "com.xxxxxxxx.productivity.cycle_count:pubsub:1": {
        "policyDescription": "Allows access to publish/subscribe to all topics.",
        "operations": [
          "aws.greengrass#PublishToTopic",
          "aws.greengrass#SubscribeToTopic"
        ],
        "resources": [
          "*"
        ]
      }
    }
  }
}

I am expecting that python code to "see" that the shadow has changed and execute the callback function on_stream_event. It is not.

Why?

flycast
asked 2 years ago481 views
2 Answers
1

Hi again @flycast,

A couple things I notice:

  1. Your shadow manager configuration is not in the correct format
{
  "reset": [],
  "merge": {
    "reset": [],
    "merge": {

You have merge inside of another merge. Please make sure that the configuration is correctly set.

  1. You are subscribing to the topic named dsvvefvfq. Who or what do you expect to be publishing to dsvvefvfq?

Shadow manager publishes to local pubsub topics using the device shadow topics (https://docs.aws.amazon.com/iot/latest/developerguide/device-shadow-mqtt.html) as explained here: https://docs.aws.amazon.com/greengrass/v2/developerguide/interact-with-shadows-in-components.html#react-shadow-events.

Therefore, you should be subscribing to $aws/things/<your GG thing name here>/shadow/name/dsvvefvfq/#

You can subscribe to $aws/things/# which is a wildcard, you will then be able to see all traffic to the topics beginning with $aws/things/ and update your code accordingly.

Cheers,

Michael

AWS
EXPERT
answered 2 years ago
  • Whoa...I see your #1 above. I'll fix that. On #2 I'm confused. I am subscribing to * in the component as an allow everything. I will tighten that up when I get things working. The component will be able to see anything that way and that will eliminate misconfiguration as a problem correct?

  • Are you sure the wildcard wouldn't be $aws/things/* with an asterisk instead of hash?

  • Yes, I am quite sure that using * is not correct. Please see the documentation here: https://docs.aws.amazon.com/greengrass/v2/developerguide/ipc-publish-subscribe.html#ipc-operation-subscribetotopic which includes a note about using + or #.

    I think you're confusing the access control policy and the actual subscription. The access control policy that you set in the component configuration simply means that the component is allowed to perform an action. It does not mean that the component will do that action.

0

Too long for a comment: To answer your question #2 - I expect to publish data in a shadow for the core device to use to publish with.

When I publish an update to a shadow I am seeing the updates get logged in the greengrass.log so I know shadowmanager is getting them.

My current component config looks like:

{
  "accessControl": {
    "aws.greengrass.ShadowManager": {
      "com.xxxxxxxx.productivity.cycle_count:shadow:1": {
        "policyDescription": "",
        "operations": [
          "aws.greengrass#GetThingShadow",
          "aws.greengrass#UpdateThingShadow",
          "aws.greengrass#DeleteThingShadow",
          "aws.greengrass#ListNamedShadowsForThing"
        ],
        "resources": [
          "*"
        ]
      }
    },
    "aws.greengrass.ipc.pubsub": {
      "com.xxxxxxxx.productivity.cycle_count:pubsub:1": {
        "policyDescription": "",
        "operations": [
          "aws.greengrass#SubscribeToTopic"
        ],
        "resources": [
          "*"
        ]
      }
    }
  }
}

As I understand this the component configuration will get all shadow traffic from shadowManager and any mqtt packets from pubsub.

When I update the shadow dsvvefvfq I see the changes logged by shadowManager but nothing happens in my custom component log. I am expecting to see the callback execute. Bare minimum I expect to see the print("Callback was involked") message in the component log.

def on_stream_event(event: SubscriptionResponseMessage) -> None:
    try:
        print("Callback was involked")
        message = str(event.binary_message.message, 'utf-8')
        topic = event.binary_message.context.topic
        print('Received new message on topic %s: %s' % (topic, message))
    except:
        traceback.print_exc()

flycast
answered 2 years ago
  • In your comment below my answer you said that you are subscribing to *. This is not correct based on the code you provided. You are subscribing to a single topic in the call to subscribe_to_topic. The topic you are subscribing to is dsvvefvfq.

    Your component is allowed to subscribe to any topic, but at least in your code, you aren't subscribing to all topics, only a very specific topic. Please use my recommendation of subscribing to $aws/things/# instead.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions