Greengrass流管理器 | 重试Kinesis导出时丢失消息

0

【以下的问题经过翻译处理】 介绍

我正在验证 aws.greengrass.StreamManager == 2.1.1 的重试能力,发现重试期间丢失了一些消息。

设置

下图描述了我正在尝试做的事情:

我在可通过以太网访问互联网的设备上运行 greengrass。我的自定义组件每 15 秒向 StreamManager 的消息流附加约 400 字节的数据。消息流配置了 kinesis 导出,如下所示:

kinesis_export = ExportDefinition(
            kinesis=[KinesisConfig(identifier="KinesisExport" + self.modbus_data_stream_name, kinesis_stream_name=kinesis_stream_name, batch_size=5)]
        )
client.create_message_stream(
            消息流定义(
                name=self.modbus_data_stream_name,strategy_on_full=StrategyOnFull.OverwriteOldestData,export_definition=kinesis_export
            )
        )

aws.greengrass.StreamManager 正在运行版本 2.1.1 和所有默认配置。

在我的 PC 上,我正在运行一个使用 boto3 的脚本来持续从运动中读取数据。在编写程序时,每当它从 Kinesis 读取记录时,它都会打印两件事:a) 当前时间戳和 b) 从设备生成数据时的时间戳。脚本附在帖子的末尾。 在此处输入图片描述

实验

标称运行

我使用与 PC 上的脚本并行连接的以太网运行我的设备,日志显示如下所示。

一些符合我预期的观察结果:

  1. 每个记录有 ~ 5 条消息(因为批量大小 = 5)
  2. 每条消息之间的时间间隔约为 15 秒(因为消息每 15 秒附加到流中)
  3. 记录每隔约 75 秒就会触发一次 Kinesis(我相信这来自 批量大小 [5 条消息] x 追加率 [15 秒/条消息]

日志的格式是这样的:

在 [PC 在 YY-MM-DD_hh:mm:ss 中收到记录的时间] 在 Kinesis 中找到 MyDevice 的记录

[在 YY-MM-DD_hh:mm:ss 中在设备上创建消息的时间]

在此处输入图片描述

故障注入

然后我从设备上拔下以太网,有效地断开了设备与互联网的连接。

正如我所预料的那样,这段时间PC不再收到来自云端的任何记录。

恢复

然后我将以太网重新连接到设备。大约 4 分钟后,我观察到大量记录击中了 PC 程序,该程序的消息时间与互联网断开连接时连续,这意味着之前失败的 kinesis 导出正在重试(好!)。

但是,我还观察到重试消息之间存在很大的时间间隔(相对于 15 秒的预期时间间隔),表明某些消息丢失了。

在我下面捕获的日志中,互联网在 ~22-12-14_09:33:xx 断开并在 ~22-12-14_10:28:xx 恢复

我用 !!! 手动标记了时间间隔以获得更好的可见性

我注意到设备时间戳中存在以下差距:

A。 22-12-14_09:34:54 到 22-12-14_09:37:39 - 2 分 45 秒间隔 = 11 条消息丢失

b. 22-12-14_09:39:55 到 22-12-14_09:41:25 - 1 分 30 秒间隔 = 6 条消息丢失

C。 22-12-14_09:44:56 到 22-12-14_09:51:27 - 6 分 31 秒间隔 = 26 条消息丢失

在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:31:23
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:31:38
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:31:53
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:08
在 22-12-14_09:32:27 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:24
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:32:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:09
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:24
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:33:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:34:09
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:34:24
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:34:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:34:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:37:39
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:37:54
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:38:10
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:38:25
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:38:40
为 MyDe 找到记录在 22-12-14_10:32:54 副在运动
22-12-14_09:38:55
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:39:10
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:39:25
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:39:40
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:39:55
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:41:25
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:41:40
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:41:55
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:10
在 22-12-14_10:32:54 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:25
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:40
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:42:55
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:10
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:25
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:40
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:43:56
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:44:11
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:44:26
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:44:41
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:44:56
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
!!! 22-12-14_09:51:27
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:51:42
在 22-12-14_10:32:55 找到 Kinesis 中 MyDevice 的记录
22-12-14_09:51:57

继续

我让 PC 程序再运行 5 个小时,想着 StreamManager 是否会再次重试丢失的消息,但我没有看到它们的任何踪迹。

问题

根据我的观察,aws.greengrass.StreamManager 无法可靠地将所有先前导出失败的数据重新传输到 Kinesis。我是否误解了 StreamManager 的功能?或者这是一个错误?


Kinesis 读取脚本

将日志记录导入为记录器 从 botocore.exceptions 导入 ClientError 导入boto3 从日期时间导入日期时间

类 KinesisStream: """封装一个 Kinesis 流。""" def init(自我,kinesis_client): """ :param kinesis_client:Boto3 Kinesis 客户端。 """ self.kinesis_client = kinesis_client self.name = 无 self.details = 无 self.stream_exists_waiter = kinesis_client.get_waiter('stream_exists')

def get_records(自我,max_records):
    """
    从流中获取记录。这个函数是一个生成器,首先得到
    流的分片迭代器,然后使用分片迭代器获取记录
    从流中分批处理。每批记录都返回给
    调用者,直到检索到指定的最大记录数。

    :param max_records:要检索的最大记录数。
    :return: 产生当前批次的检索记录。
    """
    尝试:
        响应 = self.kinesis_client.get_shard_iterator(
            StreamName=self.name, ShardId=self.details['Shards'][0]['ShardId'],
            ShardIteratorType='最新')

        shard_iter = response['ShardIterator']
        记录数 = 0
        而 record_count < max_records:
            响应 = self.kinesis_client.get_records(
                ShardIterator=shard_iter, Limit=10)
            shard_iter = response['NextShardIterator']
            记录 = 响应 ['记录']
            logger.info("得到 %s 条记录。", len(records))
            record_count += len(记录)
            产量记录
    除了客户端错误:
        logger.exception("无法从流 %s 中获取记录。", self.name)
        增加

def 描述(自我,姓名):
    """
    获取有关流的元数据。
    :param name: 流的名称。
    :return: 关于流的元数据。
    """
    尝试:
        response = self.kinesis_client.describe_stream(StreamName=名称)
        self.name = 名字
        self.details = response['StreamDescription']
        logger.info("得到流 %s.", name)
    除了客户端错误:
        logger.exception("无法获取 %s.", name)
        增加
    别的:
        返回 self.details

def loads_compressed_data(compressed_data: bytes) -> 字典: 导入 zlib 导入 json 返回 json.loads(zlib.decompress(compressed_data).decode())

def read_kinesis(DeviceThingName): 而真实的: 对于 kinesis.get_records(10) 中的记录: 如果记录: 对于记录中的 r: data = loads_compressed_data(r['数据']) 如果数据[“device_id"] == 设备名称: print(f"在 {datetime.now().strftime('%y-%m-%d_%H:%M:%S')}") 中找到 {DeviceThingName} 在 Kinesis 中的记录 ts = 数据['measurement_ts'] measurement_ts = datetime.fromtimestamp(ts).strftime("%y-%m-%d_%H:%M:%S") 打印(f“{measurement_ts}”)

记得使用 aws-cli configure 更新凭证

session = boto3.Session(profile_name="ecotec-dev", region_name='us-west-2') client = session.client('运动') kinesis = KinesisStream(客户端)

在获取记录之前需要描述运动

细节= kinesis.describe(“原始数据”) 打印(f“细节{细节}”) read_kinesis("Moxa-UC-8112A-TBBCB1098353")

编辑关于 ochoanel 于 2022 年 12 月 15 日发表的评论

感谢您的关注和建议。

我为更好的调试所做的更改

我将 read_message API 添加到我的设备代码以确认我的消息已成功写入 StreamManger 持久文件。

这是我添加的代码片段:

在此处输入图片描述

measurement_ts 是消息创建时的时间戳。

我还将 append_message 的速率从 15 秒改为每 5 秒,只是为了加快调试速度。

日志收集

我在 ~12:30 启动了我的自定义组件和 PC 程序,然后我在 ~12:33:20 拔下以太网插头约 10 分钟。 以下是设备日志的一部分(由于文字限制,我无法全部粘贴在这里) 在此处输入图片描述 在此处输入图片描述 在此处输入图片描述

根据日志,我的消息确实到达了 StreamManager。

但是,我仍然在我的 PC 程序上看到丢失的消息。这可能是由以下任何原因引起的:

  1. StreamManager没有将数据导出到Kinesis
  2. 我的 PC 程序没有读取所有到达 Kinesis 的数据

老实说,我怀疑 No.2 是原因...我正在开发一种新工具来替换我的 PC 程序。

profile picture
专家
已提问 6 个月前13 查看次数
1 回答
0

【以下的回答经过翻译处理】 嘿,ictwist,

感谢详细的解释。我们需要进一步调查这个问题,确定它是否是StreamManager的一个bug。为了调试这个问题,您能否提供以下内容:

  1. 在出现间隙的时间段内和周围日志中看到的任何内容
  2. (如果可能,且您没有公开任何私人/专有信息)提供您组件中的代码(您发布了几行,但拥有完整代码可以帮助我们更轻松地在我们的端口上重新生成它)

看起来您的组件正在使用modbus从控制器读取(这只是基于“modbus_data_stream_name”的猜测),有没有可能该组件未能将新消息发布到StreamManager中?(看看自定义组件的操作将有所帮助)。在此期间,您可以在自定义组件上添加一个对“client.read_messages”的调用并记录其返回值。当Stream消息被添加到StreamManager流中时,它被存储到磁盘中,然后再导出到Kinesis中,这样我们就可以确定是否存在以下问题:

  1. 自定义组件无法将消息发布到设备上的本地StreamManager流中的问题
  2. StreamManager未能将消息保存到磁盘上
  3. 导出消息到Kinesis时出现故障。
profile picture
专家
已回答 6 个月前

您未登录。 登录 发布回答。

一个好的回答可以清楚地解答问题和提供建设性反馈,并能促进提问者的职业发展。

回答问题的准则