AWS Greengrass不会将数据发送到AWS Kinesis
【以下的问题经过翻译处理】 我的程序的主要目的是连接到一个传入的MQTT通道,并将接收到的数据发送到我的AWS Kinesis流中,名称为"MyKinesisStream"。 下面是我的代码: import argparse import logging import random
from paho.mqtt import client as mqtt_client from stream_manager import ( ExportDefinition, KinesisConfig, MessageStreamDefinition, ResourceNotFoundException, StrategyOnFull, StreamManagerClient, ReadMessagesOptions, )
broker = 'localhost' port = 1883 topic = "clients/test/hello/world" client_id = f'python-mqtt-{random.randint(0, 100)}' username = '...' password = '...'
logging.basicConfig(level=logging.INFO) logger = logging.getLogger()
args = ""
def connect_mqtt() -> mqtt_client: def on_connect(client, userdata, flags, rc): if rc == 0: print("Connected to MQTT Broker!") else: print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def sendDataToKinesis( stream_name: str, kinesis_stream_name: str, payload, batch_size: int = None, ): try: print("Debug: sendDataToKinesis with params:", stream_name + " | ", kinesis_stream_name, " | ", batch_size) print("payload:", payload) print("type payload:", type(payload)) except Exception as e: print("Error while printing out the parameters", str(e)) logger.exception(e) try: # Create a client for the StreamManager kinesis_client = StreamManagerClient()
# Try deleting the stream (if it exists) so that we have a fresh start
try:
kinesis_client.delete_message_stream(stream_name=stream_name)
except ResourceNotFoundException:
pass
exports = ExportDefinition(
kinesis=[KinesisConfig(
identifier="KinesisExport" + stream_name,
kinesis_stream_name=kinesis_stream_name,
batch_size=batch_size,
)]
)
kinesis_client.create_message_stream(
MessageStreamDefinition(
name=stream_name,
strategy_on_full=StrategyOnFull.OverwriteOldestData,
export_definition=exports
)
)
sequence_no = kinesis_client.append_message(stream_name=stream_name, data=payload)
print(
"Successfully appended message to stream with sequence number ", sequence_no
)
readValue = kinesis_client.read_messages(stream_name, ReadMessagesOptions(min_message_count=1, read_timeout_millis=1000))
print("DEBUG read test: ", readValue)
except Exception as e:
print("Exception while running: " + str(e))
logger.exception(e)
finally:
# Always close the client to avoid resource leaks
print("closing connection")
if kinesis_client:
kinesis_client.close()
def subscribe(client: mqtt_client, args):
def on_message(client, userdata, msg):
print(f"Received {msg.payload.decode()}
from {msg.topic}
topic")
sendDataToKinesis(args.greengrass_stream, args.kinesis_stream, msg.payload, args.batch_size)
client.subscribe(topic)
client.on_message = on_message
def run(args): mqtt_client_instance = connect_mqtt() subscribe(mqtt_client_instance, args) mqtt_client_instance.loop_forever()
def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument('--greengrass-stream', required=False, default='...') parser.add_argument('--kinesis-stream', required=False, default='MyKinesisStream') parser.add_argument('--batch-size', required=False, type=int, default=500) return parser.parse_args()
if name == 'main':
args = parse_args()
run(args)
(敏感信息的部分已用省略号代替,但它们的值是正确的)
问题在于它无法将任何数据发送到我们的Kinesis流中。我从运行时获取到以下的STDOUT输出:
2022-11-25T12:13:47.640Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Connected to MQTT Broker!. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Received {"machineId":2, .... "timestamp":"2022-10-24T12:21:34.8777249Z","value":true}
from clients/test/hello/world
topic. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Debug: sendDataToKinesis with params: test | MyKinesisStream | 100. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. payload: b'{"machineId":2,... ,"timestamp":"2022-10-24T12:21:34.8777249Z","value":true}'. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. type payload: <class 'bytes'>. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. Successfully appended message to stream with sequence number 0. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. DEBUG read test: [<Class Message. stream_name: 'test', sequence_number: 0, ingest_time: 1669376980985, payload: b'{"machineId":2,"mach'>]. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
2022-11-25T12:13:47.641Z [INFO] (Copier) jp.co.xyz.StreamManagerKinesis: stdout. closing connection. {scriptName=services.jp.co.xyz.StreamManagerKinesis.lifecycle.Run, serviceName=jp.co.xyz.StreamManagerKinesis, currentState=RUNNING}
所以我们可以看到数据从MQTT中到达,Python代码执行了追加消息的操作,而且似乎我的Kinesis流中有这些信息,因为它可以在下一步中读取到,然后在没有任何错误的情况下关闭连接。
但问题是,从AWS的角度来看,我们无法在流中看到数据到达: AWS控制台的屏幕截图https://i.stack.imgur.com/wN5I4.png
这里可能出现了什么问题?我们的Greengrass核心已正确配置,可以从AWS访问,并且组件也正在运行且正常: IoT Core状态的屏幕截图https://i.stack.imgur.com/JMJmn.png StreamManager组件的状态的屏幕截图https://i.stack.imgur.com/2qnfn.png
- 新しい順
- 投票が多い順
- コメントが多い順
【以下的回答经过翻译处理】 Hi,
每次追加消息时,您都会删除流。由于流中只包含一条消息,您可能没有达到StreamManager上传所需的批量大小的最低要求。
您应该只创建一次StreamManager客户端和流初始化,并在追加数据时重复使用它们。您还可以考虑减小批处理大小。
祝你好运,- Joe
関連するコンテンツ
- 質問済み 1年前lg...
- 質問済み 7ヶ月前lg...
- 質問済み 8ヶ月前lg...
- AWS公式更新しました 9ヶ月前