I'm working with a fleet of 300+ IoT devices sending telemetry data to a Kinesis Data Stream at a rate of 25 million messages per day. The data is stored in a MongoDB database using a schema with three collections:
- Devices
- Variables
- Values (timeseries)
The data is ingested pipeline is as follows:
IoT core -> IoT core rules engine -> Kinesis data streams -> lambda function -> mongodb
A kinesis firehose delivery stream is attached to data stream for storing data in s3.
Problem:
The devices can have their payload dynamically changed during operation, adding or removing variables. For example, a device named "machine-2" initially sends the following payload:
{
"currentPowerConsumption": 6.7,
"upTime": 456789,
"motorHealth": 100
}
However, after a reconfiguration, it now sends:
{
"currentPowerConsumption": 6.7,
"upTime": 456789,
"motorHealth": 100,
"motorTemperature": 45
}
Current approach:
I'm currently using Amazon ElastiCache for Redis to store the number of variables each device sends with a key-value pair like device-id: number-of-variables
. For example:
"machine-2": 3
When a payload arrives, In the lambda function, I:
- Check the Redis store for the corresponding device ID.
- If the number of variables hasn't changed, I directly insert the data into the values collection.
- If the number of variables has changed, I update the variables collection and then insert the data into the values collection.
Questions:
- Is this approach efficient and scalable for handling such a large volume of data?
- Are there any potential improvements or alternative solutions I should consider for dynamically handling changes in the data payload?
- Is storing the number of variables in Redis the best way to track schema changes, or are there other options?