Datastore in AWS IoT Analytics not receiving any new messages when any activity is added in pipeline

0

Working Scenario

I am working on creating a scenario using AWS IoT Analytics, I have a Raspberry Pi acting as a IoT Core device which simulates temperature, humidity and wind speed data and sends it to a MQTT topic.

JSON array example which is being published to the topic:

{
  "temperature": 20.68,
  "humidity": 56.58,
  "wind": 7
}

In the AWS IoT Analytics console, I have created a Channel, Pipeline (without any activity as of now), Datastore and Dataset.

I have created a rule in AWS IoT Core which sends all data to an AWS IoT Analytics Channel. Rule query is as following: SELECT * FROM 'topic/name'

Now, when I start simulating the data from the Raspberry Pi it stores the data into the datastore without any issues and I can access data from the dataset and after running a query and can be visible at below destination.

Dataset Query: SELECT * FROM simple_temp_108_datastore

AWS IoT Analytics > Datasets > 'sensor_dataset' > content.

Not Working Scenario

When I update the pipeline created in above step and add a "Calculate a message attribute" activity then it stops sending the data (with calculated field or without calculated field) to the Datastore. I have also tried using a lambda function activity and it does the same.

Pipeline Activity Image

The simulation code running in the Raspberry PI:

import paho.mqtt.client as mqtt
import ssl
import json
import time
import random

# AWS IoT Core endpoint
host = "<aws-endpoint>"
port = 8883

# Paths to the certificate and key files
rootCAPath = "/path/to/AmazonRootCA1.pem"
certificatePath = "/path/to/certificate.pem.crt"
privateKeyPath = "/path/to/private.pem.key"

# MQTT client configuration
clientId = "iot_analytics"
topic = "sensor/1"

# Callback function on connect
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("Connected to AWS IoT Core")
    else:
        print(f"Failed to connect, return code {rc}")

# Initialize the MQTT client
mqttClient = mqtt.Client(mqtt.CallbackAPIVersion.VERSION1, clientId)

# Configure the client with SSL/TLS settings
mqttClient.tls_set(ca_certs=rootCAPath, certfile=certificatePath, keyfile=privateKeyPath, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_TLSv1_2, ciphers=None)
mqttClient.tls_insecure_set(False)

# Assign the on_connect callback function
mqttClient.on_connect = on_connect

# Connect to AWS IoT Core
mqttClient.connect(host, port, keepalive=60)

# Start the loop
mqttClient.loop_start()

# Publish data to the topic
while True:
    temperature = round(random.uniform(20.0, 30.0), 2) 
    humidity = round(random.uniform(30.0, 70.0), 2)     
    wind = random.randint(5, 50)
    
    msg = {
        "temperature": temperature,
        "humidity": humidity,
        "wind": wind
    }
    
    messageJson = json.dumps(msg)
    mqttClient.publish(topic, messageJson, qos=1)
    print(f"Published: {messageJson}")
    print("--------------------------")
    time.sleep(5)

# Stop the loop and disconnect (this part will never be reached in this example)
mqttClient.loop_stop()
mqttClient.disconnect()

In short description:

The data is being stored into the Datastore when there are not activities present the pipeline and not getting stored when there is any kind of activity present in the pipleline.

asked 3 months ago172 views
1 Answer
0

Please ensure that the “Calculate a message attribute” activity or the Lambda function activity is correctly set up and the data format remains consistent after the activity is performed. Are permissions to and from IoT spot on ?

Remember to re-run (reprocess) the pipeline whenever you make changes to it for the changes to take effect on the data already in the channel.

profile picture
EXPERT
answered 3 months ago
  • When I try to re-process the messages (Last 30 days or a custom date range) in the pipeline it gave below error: Failed to submit reprocessing request for pipeline, <pipeline-name> Invalid range: you can only reprocess data in hours

  • For the “Calculate a message attribute” activity it worked without changing anything. I have no idea how 🤷🏼‍♂️. I had followed the same steps as before and now the data is being stored into the data store. For “Calculate a message attribute” only and not for Lambda activity.

You are not logged in. Log in to post an answer.

A good answer clearly answers the question and provides constructive feedback and encourages professional growth in the question asker.

Guidelines for Answering Questions