AWS IoT core MQTT "TypeError: string indices must be integers" with JSON

0

Hello all, I recently asked a question on how to save messages from topics in MQTT in the pubsub.py program itself, I figured out how to do this thanks to the help of a very helpful user on here. However, I receive the error message ""TypeError: string indices must be integers" when I parse the data I send from my topic via MQTT using JSON in the python file. I realized this is due to the fact that all messages I send from the program to the topic are then sent back down due to being subscribed to that same topic. Line 14 in the sample I'm basing my project off of (pubsub.py) it states that "The device should receive those same messages back from the message broker, since it is subscribed to that same topic". I am not sure how to differentiate between messages that are bounce backs from messages I send up to the topic, and messages I deliberately send down.

My project code

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.

from awscrt import mqtt
import sys
import threading
import time
from uuid import uuid4
import json

# This sample uses the Message Broker for AWS IoT to send and receive messages
# through an MQTT connection. On startup, the device connects to the server,
# subscribes to a topic, and begins publishing messages to that topic.
# The device should receive those same messages back from the message broker,
# since it is subscribed to that same topic.

# Parse arguments
import command_line_utils;
cmdUtils = command_line_utils.CommandLineUtils("PubSub - Send and recieve messages through an MQTT connection.")
cmdUtils.add_common_mqtt_commands()
cmdUtils.add_common_topic_message_commands()
cmdUtils.add_common_proxy_commands()
cmdUtils.add_common_logging_commands()
cmdUtils.register_command("key", "<path>", "Path to your key in PEM format.", True, str)
cmdUtils.register_command("cert", "<path>", "Path to your client certificate in PEM format.", True, str)
cmdUtils.register_command("port", "<int>", "Connection port. AWS IoT supports 443 and 8883 (optional, default=auto).", type=int)
cmdUtils.register_command("client_id", "<str>", "Client ID to use for MQTT connection (optional, default='test-*').", default="test-" + str(uui$
cmdUtils.register_command("count", "<int>", "The number of messages to send (optional, default='10').", default=10, type=int)
cmdUtils.register_command("is_ci", "<str>", "If present the sample will run in CI mode (optional, default='None')")
# Needs to be called so the command utils parse the commands
cmdUtils.get_args()

received_count = 0
received_all_event = threading.Event()
is_ci = cmdUtils.get_command("is_ci", None) != None

# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()

        # Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
        # evaluate result with a callback instead.
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
        resubscribe_results = resubscribe_future.result()
        print("Resubscribe results: {}".format(resubscribe_results))

        for topic, qos in resubscribe_results['topics']:
            if qos is None:
                sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    global received_count
    print("Received message from topic '{}': {}".format(topic, payload))
    if topic.endswith('/pi'):
        rawData = (payload.decode())
        data = json.loads(rawData)
        for person in data['message']:
            print(person)
    received_count += 1
    if received_count == cmdUtils.get_command("count"):
        received_all_event.set()

if __name__ == '__main__':
    mqtt_connection = cmdUtils.build_mqtt_connection(on_connection_interrupted, on_connection_resumed)

    if is_ci == False:
        print("Connecting to {} with client ID '{}'...".format(
            cmdUtils.get_command(cmdUtils.m_cmd_endpoint), cmdUtils.get_command("client_id")))
    else:
        print("Connecting to endpoint with client ID")
    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    message_count = cmdUtils.get_command("count")
    message_topic = cmdUtils.get_command(cmdUtils.m_cmd_topic)
    message_string = cmdUtils.get_command(cmdUtils.m_cmd_message)

    # Subscribe
    print("Subscribing to topic '{}'...".format(message_topic))
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=message_topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message_received)

    subscribe_result = subscribe_future.result()
    print("Subscribed with {}".format(str(subscribe_result['qos'])))


    # Publish message to server desired number of times.
    # This step is skipped if message is blank.
    # This step loops forever if count was set to 0.
    if message_string:
        if message_count == 0:
            print ("Sending messages until program killed")
        else:
            print ("Sending {} message(s)".format(message_count))

        publish_count = 1
        while (publish_count <= message_count) or (message_count == 0):
            message = "{} [{}]".format(message_string, publish_count)
            print("Publishing message to topic '{}': {}".format(message_topic, message))
            message_json = json.dumps(message)
            mqtt_connection.publish(
                topic=message_topic,
                payload=message_json,
                qos=mqtt.QoS.AT_LEAST_ONCE)
            time.sleep(300)
            publish_count += 1

    # Wait for all messages to be received.
    # This waits forever if count was set to 0.
    if message_count != 0 and not received_all_event.is_set():
        print("Waiting for all messages to be received...")

    received_all_event.wait()
    print("{} message(s) received.".format(received_count))

    # Disconnect
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()
    print("Disconnected!")

What I'm sending down

{
  "message": [
    {
      "name": "Jake",
      "color": "blue"
    }
  ]
}

my output

Connected!
Subscribing to topic 'topic/pi'...
Subscribed with QoS.AT_LEAST_ONCE
Sending 10 message(s)
Publishing message to topic 'topic/pi': Test Message [1]
Received message from topic 'topic/pi': b'"Test Message [1]"'
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/home/pi/.local/lib/python3.7/site-packages/awscrt/mqtt.py", line 541, in callback_wrapper
    callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
  File "Editpubsub.py", line 71, in on_message_received
    for person in data['message']:
TypeError: string indices must be integers
Received message from topic 'topic/pi': b'{\n  "message": [\n    {\n      "name": "Jake",\n      "color": "blue"\n    }\n  ]\n}\n'
{'name': 'Jake', 'color': 'blue'}
Received message from topic 'topic/pi': b'{\n  "message": [\n    {\n      "name": "Jake",\n      "color": "blue"\n    }\n  ]\n}\n'
{'name': 'Jake', 'color': 'blue'}
Received message from topic 'topic/pi': b'{\n  "message": [\n    {\n      "name": "Jake",\n      "color": "blue"\n    }\n  ]\n}\n'
{'name': 'Jake', 'color': 'blue'}
Publishing message to topic 'topic/pi': Test Message [2]
Received message from topic 'topic/pi': b'"Test Message [2]"'
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
  File "/home/pi/.local/lib/python3.7/site-packages/awscrt/mqtt.py", line 541, in callback_wrapper
    callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
  File "Editpubsub.py", line 71, in on_message_received
    for person in data['message']:
TypeError: string indices must be integers

Is there anything I can do to differentiate the two? And if not, will this error mess with the functionality of my program?

asked a year ago321 views
2 Answers
1
Accepted Answer

Hi AWS-currydem,

a best practice in Python is to have an error handling in place. You probably get the error message because you are sending a message which is not in JSON format (Test Message) but you treat it as JSON. To distinguish between devices you could subscribe to different topics. Each device publishes on it's own topic. We do have a whitepaper for designing MQTT topics.

Cheers,
Philipp

AWS
EXPERT
answered a year ago
  • First, I want to thank you so much. You have helped me out so much while I was struggling with this issue. I found a quick solution to keep using one topic. I do plan to try to use two different topics to not have to use this method. Here is what I did

    Callback when the subscribed topic receives a message

    def on_message_received(topic, payload, dup, qos, retain, **kwargs): global received_count print("Received message from topic '{}': {}".format(topic, payload)) if topic.endswith('/pi'): rawData = (payload.decode()) data = json.loads(rawData) Validate = rawData.split() check1 = (Validate[1]) if check1 == check1: print(rawData) for person in data['message']: name = (person['name']) color = (person['color']) print(name) print(color) received_count += 1 if received_count == cmdUtils.get_command("count"): received_all_event.set()

1

Hi!

If I understand, you are subscribed to the same topic you publish to. A couple things here. If the intent is to publish on one topic and get a response back (request/response pattern), you can use separate topics for that. See this pattern for more details. That will keep you published and subscribed messages separate.

The TypeError is being raised because of the data being sent (Test Message [1]):

Publishing message to topic 'topic/pi': Test Message [1]
Received message from topic 'topic/pi': b'"Test Message [1]"'
Exception ignored in: <class 'TypeError'>

I don't see it in your code, but somewhere the message Test Message is being introduced.

AWS
Gavin_A
answered a year ago
  • Thank you for your help, you guys have helped me a lot, I am gonna try to use this method to use two different topics that I publish to and subscribe to. I posted in a comment above what I did to fix my issue in the mean time

  • Always glad to help. Best of luck with the project, please post any other questions that might come up!

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions