Hello all, I recently asked a question on how to save messages from topics in MQTT in the pubsub.py program itself, I figured out how to do this thanks to the help of a very helpful user on here. However, I receive the error message ""TypeError: string indices must be integers" when I parse the data I send from my topic via MQTT using JSON in the python file. I realized this is due to the fact that all messages I send from the program to the topic are then sent back down due to being subscribed to that same topic. Line 14 in the sample I'm basing my project off of (pubsub.py) it states that "The device should receive those same messages back from the message broker, since it is subscribed to that same topic". I am not sure how to differentiate between messages that are bounce backs from messages I send up to the topic, and messages I deliberately send down.
My project code
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0.
from awscrt import mqtt
import sys
import threading
import time
from uuid import uuid4
import json
# This sample uses the Message Broker for AWS IoT to send and receive messages
# through an MQTT connection. On startup, the device connects to the server,
# subscribes to a topic, and begins publishing messages to that topic.
# The device should receive those same messages back from the message broker,
# since it is subscribed to that same topic.
# Parse arguments
import command_line_utils;
cmdUtils = command_line_utils.CommandLineUtils("PubSub - Send and recieve messages through an MQTT connection.")
cmdUtils.add_common_mqtt_commands()
cmdUtils.add_common_topic_message_commands()
cmdUtils.add_common_proxy_commands()
cmdUtils.add_common_logging_commands()
cmdUtils.register_command("key", "<path>", "Path to your key in PEM format.", True, str)
cmdUtils.register_command("cert", "<path>", "Path to your client certificate in PEM format.", True, str)
cmdUtils.register_command("port", "<int>", "Connection port. AWS IoT supports 443 and 8883 (optional, default=auto).", type=int)
cmdUtils.register_command("client_id", "<str>", "Client ID to use for MQTT connection (optional, default='test-*').", default="test-" + str(uui$
cmdUtils.register_command("count", "<int>", "The number of messages to send (optional, default='10').", default=10, type=int)
cmdUtils.register_command("is_ci", "<str>", "If present the sample will run in CI mode (optional, default='None')")
# Needs to be called so the command utils parse the commands
cmdUtils.get_args()
received_count = 0
received_all_event = threading.Event()
is_ci = cmdUtils.get_command("is_ci", None) != None
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
print("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()
# Cannot synchronously wait for resubscribe result because we're on the connection's event-loop thread,
# evaluate result with a callback instead.
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
print("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
global received_count
print("Received message from topic '{}': {}".format(topic, payload))
if topic.endswith('/pi'):
rawData = (payload.decode())
data = json.loads(rawData)
for person in data['message']:
print(person)
received_count += 1
if received_count == cmdUtils.get_command("count"):
received_all_event.set()
if __name__ == '__main__':
mqtt_connection = cmdUtils.build_mqtt_connection(on_connection_interrupted, on_connection_resumed)
if is_ci == False:
print("Connecting to {} with client ID '{}'...".format(
cmdUtils.get_command(cmdUtils.m_cmd_endpoint), cmdUtils.get_command("client_id")))
else:
print("Connecting to endpoint with client ID")
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
message_count = cmdUtils.get_command("count")
message_topic = cmdUtils.get_command(cmdUtils.m_cmd_topic)
message_string = cmdUtils.get_command(cmdUtils.m_cmd_message)
# Subscribe
print("Subscribing to topic '{}'...".format(message_topic))
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=message_topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message_received)
subscribe_result = subscribe_future.result()
print("Subscribed with {}".format(str(subscribe_result['qos'])))
# Publish message to server desired number of times.
# This step is skipped if message is blank.
# This step loops forever if count was set to 0.
if message_string:
if message_count == 0:
print ("Sending messages until program killed")
else:
print ("Sending {} message(s)".format(message_count))
publish_count = 1
while (publish_count <= message_count) or (message_count == 0):
message = "{} [{}]".format(message_string, publish_count)
print("Publishing message to topic '{}': {}".format(message_topic, message))
message_json = json.dumps(message)
mqtt_connection.publish(
topic=message_topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
time.sleep(300)
publish_count += 1
# Wait for all messages to be received.
# This waits forever if count was set to 0.
if message_count != 0 and not received_all_event.is_set():
print("Waiting for all messages to be received...")
received_all_event.wait()
print("{} message(s) received.".format(received_count))
# Disconnect
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
What I'm sending down
{
"message": [
{
"name": "Jake",
"color": "blue"
}
]
}
my output
Connected!
Subscribing to topic 'topic/pi'...
Subscribed with QoS.AT_LEAST_ONCE
Sending 10 message(s)
Publishing message to topic 'topic/pi': Test Message [1]
Received message from topic 'topic/pi': b'"Test Message [1]"'
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
File "/home/pi/.local/lib/python3.7/site-packages/awscrt/mqtt.py", line 541, in callback_wrapper
callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
File "Editpubsub.py", line 71, in on_message_received
for person in data['message']:
TypeError: string indices must be integers
Received message from topic 'topic/pi': b'{\n "message": [\n {\n "name": "Jake",\n "color": "blue"\n }\n ]\n}\n'
{'name': 'Jake', 'color': 'blue'}
Received message from topic 'topic/pi': b'{\n "message": [\n {\n "name": "Jake",\n "color": "blue"\n }\n ]\n}\n'
{'name': 'Jake', 'color': 'blue'}
Received message from topic 'topic/pi': b'{\n "message": [\n {\n "name": "Jake",\n "color": "blue"\n }\n ]\n}\n'
{'name': 'Jake', 'color': 'blue'}
Publishing message to topic 'topic/pi': Test Message [2]
Received message from topic 'topic/pi': b'"Test Message [2]"'
Exception ignored in: <class 'TypeError'>
Traceback (most recent call last):
File "/home/pi/.local/lib/python3.7/site-packages/awscrt/mqtt.py", line 541, in callback_wrapper
callback(topic=topic, payload=payload, dup=dup, qos=QoS(qos), retain=retain)
File "Editpubsub.py", line 71, in on_message_received
for person in data['message']:
TypeError: string indices must be integers
Is there anything I can do to differentiate the two? And if not, will this error mess with the functionality of my program?
First, I want to thank you so much. You have helped me out so much while I was struggling with this issue. I found a quick solution to keep using one topic. I do plan to try to use two different topics to not have to use this method. Here is what I did
Callback when the subscribed topic receives a message
def on_message_received(topic, payload, dup, qos, retain, **kwargs): global received_count print("Received message from topic '{}': {}".format(topic, payload)) if topic.endswith('/pi'): rawData = (payload.decode()) data = json.loads(rawData) Validate = rawData.split() check1 = (Validate[1]) if check1 == check1: print(rawData) for person in data['message']: name = (person['name']) color = (person['color']) print(name) print(color) received_count += 1 if received_count == cmdUtils.get_command("count"): received_all_event.set()