Greengrass流管理器 | 重试Kinesis导出时丢失消息
【以下的问题经过翻译处理】 介绍
我正在验证 aws.greengrass.StreamManager == 2.1.1
的重试能力,发现重试期间丢失了一些消息。
设置
下图描述了我正在尝试做的事情:
我在可通过以太网访问互联网的设备上运行 greengrass。我的自定义组件每 15 秒向 StreamManager 的消息流附加约 400 字节的数据。消息流配置了 kinesis 导出,如下所示:
kinesis_export = ExportDefinition(
kinesis=[KinesisConfig(identifier="KinesisExport" + self.modbus_data_stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=5)]
)
client.create_message_stream(
消息流定义(
name=self.modbus_data_stream_name,strategy_on_full=StrategyOnFull.OverwriteOldestData,export_definition=kinesis_export
)
)
aws.greengrass.StreamManager
正在运行版本 2.1.1
和所有默认配置。
在我的 PC 上,我正在运行一个使用 boto3
的脚本来持续从运动中读取数据。在编写程序时,每当它从 Kinesis 读取记录时,它都会打印两件事:a) 当前时间戳和 b) 从设备生成数据时的时间戳。脚本附在帖子的末尾。
实验
标称运行
我使用与 PC 上的脚本并行连接的以太网运行我的设备,日志显示如下所示。
一些符合我预期的观察结果:
- 每个记录有 ~ 5 条消息(因为批量大小 = 5)
- 每条消息之间的时间间隔约为 15 秒(因为消息每 15 秒附加到流中)
- 记录每隔约 75 秒就会触发一次 Kinesis(我相信这来自 批量大小 [5 条消息] x 追加率 [15 秒/条消息])
日志的格式是这样的:
在 [PC 在 YY-MM-DD_hh:mm:ss 中收到记录的时间] 在 Kinesis 中找到 MyDevice 的记录
[在 YY-MM-DD_hh:mm:ss 中在设备上创建消息的时间]
故障注入
然后我从设备上拔下以太网,有效地断开了设备与互联网的连接。
正如我所预料的那样,这段时间PC不再收到来自云端的任何记录。
恢复
然后我将以太网重新连接到设备。大约 4 分钟后,我观察到大量记录击中了 PC 程序,该程序的消息时间与互联网断开连接时连续,这意味着之前失败的 kinesis 导出正在重试(好!)。
但是,我还观察到重试消息之间存在很大的时间间隔(相对于 15 秒的预期时间间隔),表明某些消息丢失了。
在我下面捕获的日志中,互联网在 ~22-12-14_09:33:xx 断开并在 ~22-12-14_10:28:xx 恢复
我用 !!! 手动标记了时间间隔以获得更好的可见性
我注意到设备时间戳中存在以下差距:
A。 22-12-14_09:34:54 到 22-12-14_09:37:39 - 2 分 45 秒间隔 = 11 条消息丢失
b. 22-12-14_09:39:55 到 22-12-14_09:41:25 - 1 分 30 秒间隔 = 6 条消息丢失
C。 22-12-14_09:44:56 到 22-12-14_09:51:27 - 6 分 31 秒间隔 = 26 条消息丢失
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:31:23
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:31:38
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:31:53
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:08
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:24
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:09
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:24
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:34:09
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:34:24
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:34:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:34:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:37:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:37:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:38:10
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:38:25
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:38:40
为 MyDe 找到记录在 22-12-14_10:32:54 副在运动
22-12-14_09:38:55
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:39:10
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:39:25
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:39:40
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:39:55
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:41:25
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:41:40
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:41:55
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:10
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:25
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:40
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:55
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:10
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:25
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:40
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:56
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:44:11
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:44:26
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:44:41
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:44:56
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:51:27
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:51:42
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:51:57
继续
我让 PC 程序再运行 5 个小时,想着 StreamManager 是否会再次重试丢失的消息,但我没有看到它们的任何踪迹。
问题
根据我的观察,aws.greengrass.StreamManager
无法可靠地将所有先前导出失败的数据重新传输到 Kinesis。我是否误解了 StreamManager 的功能?或者这是一个错误?
Kinesis 读取脚本
将日志记录导入为记录器 从 botocore.exceptions 导入 ClientError 导入boto3 从日期时间导入日期时间
类 KinesisStream: """封装一个 Kinesis 流。""" def init(自我,kinesis_client): """ :param kinesis_client:Boto3 Kinesis 客户端。 """ self.kinesis_client = kinesis_client self.name = 无 self.details = 无 self.stream_exists_waiter = kinesis_client.get_waiter('stream_exists')
def get_records(自我,max_records):
"""
从流中获取记录。这个函数是一个生成器,首先得到
流的分片迭代器,然后使用分片迭代器获取记录
从流中分批处理。每批记录都返回给
调用者,直到检索到指定的最大记录数。
:param max_records:要检索的最大记录数。
:return: 产生当前批次的检索记录。
"""
尝试:
响应 = self.kinesis_client.get_shard_iterator(
StreamName=self.name, ShardId=self.details['Shards'][0]['ShardId'],
ShardIteratorType='最新')
shard_iter = response['ShardIterator']
记录数 = 0
而 record_count < max_records:
响应 = self.kinesis_client.get_records(
ShardIterator=shard_iter, Limit=10)
shard_iter = response['NextShardIterator']
记录 = 响应 ['记录']
logger.info("得到 %s 条记录。", len(records))
record_count += len(记录)
产量记录
除了客户端错误:
logger.exception("无法从流 %s 中获取记录。", self.name)
增加
def 描述(自我,姓名):
"""
获取有关流的元数据。
:param name: 流的名称。
:return: 关于流的元数据。
"""
尝试:
response = self.kinesis_client.describe_stream(StreamName=名称)
self.name = 名字
self.details = response['StreamDescription']
logger.info("得到流 %s.", name)
除了客户端错误:
logger.exception("无法获取 %s.", name)
增加
别的:
返回 self.details
def loads_compressed_data(compressed_data: bytes) -> 字典: 导入 zlib 导入 json 返回 json.loads(zlib.decompress(compressed_data).decode())
def read_kinesis(DeviceThingName): 而真实的: 对于 kinesis.get_records(10) 中的记录: 如果记录: 对于记录中的 r: data = loads_compressed_data(r['数据']) 如果数据[“device_id"] == 设备名称: print(f"在 {datetime.now().strftime('%y-%m-%d_%H:%M:%S')}") 中找到 {DeviceThingName} 在 Kinesis 中的记录 ts = 数据['measurement_ts'] measurement_ts = datetime.fromtimestamp(ts).strftime("%y-%m-%d_%H:%M:%S") 打印(f“{measurement_ts}”)
记得使用 aws-cli configure 更新凭证
session = boto3.Session(profile_name="ecotec-dev", region_name='us-west-2') client = session.client('运动') kinesis = KinesisStream(客户端)
在获取记录之前需要描述运动
细节= kinesis.describe(“原始数据”) 打印(f“细节{细节}”) read_kinesis("Moxa-UC-8112A-TBBCB1098353")
编辑关于 ochoanel
于 2022 年 12 月 15 日发表的评论
感谢您的关注和建议。
我为更好的调试所做的更改
我将 read_message
API 添加到我的设备代码以确认我的消息已成功写入 StreamManger 持久文件。
这是我添加的代码片段:
measurement_ts
是消息创建时的时间戳。
我还将 append_message
的速率从 15 秒改为每 5 秒,只是为了加快调试速度。
日志收集
我在 ~12:30 启动了我的自定义组件和 PC 程序,然后我在 ~12:33:20 拔下以太网插头约 10 分钟。 以下是设备日志的一部分(由于文字限制,我无法全部粘贴在这里)
根据日志,我的消息确实到达了 StreamManager。
但是,我仍然在我的 PC 程序上看到丢失的消息。这可能是由以下任何原因引起的:
- StreamManager没有将数据导出到Kinesis
- 我的 PC 程序没有读取所有到达 Kinesis 的数据
老实说,我怀疑 No.2 是原因...我正在开发一种新工具来替换我的 PC 程序。
- 最新
- 投票最多
- 评论最多
【以下的回答经过翻译处理】 嘿,ictwist,
感谢详细的解释。我们需要进一步调查这个问题,确定它是否是StreamManager的一个bug。为了调试这个问题,您能否提供以下内容:
- 在出现间隙的时间段内和周围日志中看到的任何内容
- (如果可能,且您没有公开任何私人/专有信息)提供您组件中的代码(您发布了几行,但拥有完整代码可以帮助我们更轻松地在我们的端口上重新生成它)
看起来您的组件正在使用modbus从控制器读取(这只是基于“modbus_data_stream_name”的猜测),有没有可能该组件未能将新消息发布到StreamManager中?(看看自定义组件的操作将有所帮助)。在此期间,您可以在自定义组件上添加一个对“client.read_messages”的调用并记录其返回值。当Stream消息被添加到StreamManager流中时,它被存储到磁盘中,然后再导出到Kinesis中,这样我们就可以确定是否存在以下问题:
- 自定义组件无法将消息发布到设备上的本地StreamManager流中的问题
- StreamManager未能将消息保存到磁盘上
- 导出消息到Kinesis时出现故障。
相关内容
- AWS 官方已更新 1 年前