Working Scenario
I am working on creating a scenario using AWS IoT Analytics, I have a Raspberry Pi acting as a IoT Core device which simulates temperature, humidity and wind speed data and sends it to a MQTT topic.
JSON array example which is being published to the topic:
{
"temperature": 20.68,
"humidity": 56.58,
"wind": 7
}
In the AWS IoT Analytics console, I have created a Channel, Pipeline (without any activity as of now), Datastore and Dataset.
I have created a rule in AWS IoT Core which sends all data to an AWS IoT Analytics Channel. Rule query is as following:
SELECT * FROM 'topic/name'
Now, when I start simulating the data from the Raspberry Pi it stores the data into the datastore without any issues and I can access data from the dataset and after running a query and can be visible at below destination.
Dataset Query:
SELECT * FROM simple_temp_108_datastore
AWS IoT Analytics > Datasets > 'sensor_dataset' > content.
Not Working Scenario
When I update the pipeline created in above step and add a "Calculate a message attribute" activity then it stops sending the data (with calculated field or without calculated field) to the Datastore. I have also tried using a lambda function activity and it does the same.
The simulation code running in the Raspberry PI:
import paho.mqtt.client as mqtt
import ssl
import json
import time
import random
# AWS IoT Core endpoint
host = "<aws-endpoint>"
port = 8883
# Paths to the certificate and key files
rootCAPath = "/path/to/AmazonRootCA1.pem"
certificatePath = "/path/to/certificate.pem.crt"
privateKeyPath = "/path/to/private.pem.key"
# MQTT client configuration
clientId = "iot_analytics"
topic = "sensor/1"
# Callback function on connect
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to AWS IoT Core")
else:
print(f"Failed to connect, return code {rc}")
# Initialize the MQTT client
mqttClient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, clientId)
# Configure the client with SSL/TLS settings
mqttClient.tls_set(ca_certs=rootCAPath, certfile=certificatePath, keyfile=privateKeyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
mqttClient.tls_insecure_set(False)
# Assign the on_connect callback function
mqttClient.on_connect = on_connect
# Connect to AWS IoT Core
mqttClient.connect(host, port, keepalive=60)
# Start the loop
mqttClient.loop_start()
# Publish data to the topic
while True:
temperature = round(random.uniform(20.0, 30.0), 2)
humidity = round(random.uniform(30.0, 70.0), 2)
wind = random.randint(5, 50)
msg = {
"temperature": temperature,
"humidity": humidity,
"wind": wind
}
messageJson = json.dumps(msg)
mqttClient.publish(topic, messageJson, qos=1)
print(f"Published: {messageJson}")
print("--------------------------")
time.sleep(5)
# Stop the loop and disconnect (this part will never be reached in this example)
mqttClient.loop_stop()
mqttClient.disconnect()
In short description:
The data is being stored into the Datastore when there are not activities present the pipeline and not getting stored when there is any kind of activity present in the pipleline.
When I try to re-process the messages (Last 30 days or a custom date range) in the pipeline it gave below error: Failed to submit reprocessing request for pipeline, <pipeline-name> Invalid range: you can only reprocess data in hours
For the “Calculate a message attribute” activity it worked without changing anything. I have no idea how 🤷🏼♂️. I had followed the same steps as before and now the data is being stored into the data store. For “Calculate a message attribute” only and not for Lambda activity.