如何使用 Python 从我的设备向 AWS IoT Core 发布 MQTT 消息?
我无法在 AWS IoT Core 和我的设备或 MQTT 客户端之间发送或接收 MQTT(消息队列遥测传输)消息。
简短描述
验证您的 AWS IoT 事物是否已配置正确且其证书是否已正确附加。要测试您的设置,可使用 AWS IoT MQTT 客户端和本文中提供的 Python 代码示例。
解决方法
设置一个目录来测试 MQTT 发布
- 在开发环境中创建工作目录。例如,创建一个名为 iot-test-publish 的目录。
- 在新工作目录中为证书创建子目录。例如,创建一个名为 certificates 的子目录。
- 在命令行中,将该目录更改为新工作目录。
安装 pip 和适用于 Python 的 AWS IoT SDK
注意: 在开始之前,请先安装用于 Python 3 打包的 pip。有关更多信息,请参阅 Python Packaging Authority (PyPA) 网站上的 Installation(安装)。
-
要安装适用于 Python v2 的 AWS IoT SDK,请使用以下命令:
pip install awsiotsdk-or-
要安装适用于 Python 的 AWS IoT Device SDK(之前的 SDK 版本,v1),请使用以下命令:
pip install AWSIoTPythonSDK
**注意:**最佳做法是使用适用于 Python 的 AWS IoT SDK 的最新更新版本。有关更多信息,请参阅 GitHub 上的 AWS IoT SDK for Python v2(适用于 Python v2 的 AWS IoT SDK)或 AWS IoT Device SDK for Python(适用于 Python 的 AWS IoT Device SDK)。使用这些 SDK 连接到 AWS IoT Core 同样也是最佳做法,但并非必须使用这些 SDK—您可以使用任何其他合规的第三方 MQTT 客户端连接到 AWS IoT Core。
创建 AWS IoT Core 策略
- 打开 AWS IoT Core 控制台。
- 在左侧导航窗格中,选择 Secure(安全)。
- 在 Secure(安全)下,选择 Policies(策略)。
- 如果您已有 AWS IoT Core 策略,请选择 Create(创建)以创建新策略。
-or-
如果您没有任何现有策略,则在 You don't have any policies yet page(您还没有任何策略)页面上,选择 Create a policy(创建策略)。 - 在 Create a policy(创建策略)页面上,输入您的策略名称。例如,输入名称 admin(管理)。
- 在 Add statements(添加语句)下,完成以下操作:
对于 Action(操作),输入 iot:*。
**重要事项:**出于测试目的,您可以允许所有 AWS IoT 操作 (iot:*)。但在生产设置中,最佳做法是提高安全性。有关更多安全策略示例,请参阅 AWS IoT Core 策略示例。
对于 Resource ARN(资源 ARN),输入 *。
对于 Effect(效果),选中 Allow(允许)复选框。 - 选择 Create(创建)。
有关更多信息,请参阅创建 AWS IoT 策略和 AWS IoT Core 策略。
创建 AWS IoT 事物
**注意:**您无需创建任何事物即可连接到 AWS IoT。但是,事物要允许您使用额外的安全控件和其他 AWS IoT 功能,例如实例集索引、作业或设备影子。
- 在 AWS IoT Core 控制台左侧的导航窗格中,选择 Manage(管理)。
- 如果您已有事物,则选择 Create(创建)以创建新事物。
-or-
如果您没有任何现有事物,则在 You don't have any things yet(您还没有任何事物)页面上,选择 Register a thing(注册事物)。 - 在 Creating AWS IoT things(创建 AWS IoT 事物)页面上,选择 Create a single thing(创建单个事物)。
- 在Add your device to the thing registry(将设备添加到事物注册表)页面上,完成以下操作:
为您的事物输入名称。例如,输入 Test-Thing。
(可选)在 Add a type to this thing(为此事物添加类型)下,选择或创建 thing type(事物类型)。
(可选)在 Add this thing to a group(将此事物添加到组中)下,选择或创建组。有关组的更多信息,请参阅静态事物组和动态事物组。
(可选)在 Set searchable thing attributes (optional)(设置可搜索的事物属性(可选))下,将属性添加为键值对。
选择 Next(下一步)。
**注意:**最佳做法是将事物名称与 clientId 相匹配。如果名称匹配,您可以使用实例集索引和事物策略变量等 AWS IoT Core 功能。 - 在 Add a certificate for your thing)为事物添加证书)页面上,选择 Create certificate(创建证书)。您会看到确认您的事物和已为您的事物创建证书的通知。
- 在 Certificate created(已创建证书)页面上,完成以下操作:
在 In order to connect a device, you need to download the following(要连接设备,您需要下载以下内容)下,为证书、公钥和私钥选择 Download(下载)。
将每个下载的文件保存到您之前创建的证书子目录中。
在 You also need to download a root CA for AWS IoT(您还需要为 AWS IoT 下载根 CA)下,选择 Download(下载)。Server authentication(服务器身份验证)页面会打开用于服务器身份验证的 CA 证书。 - 在 Amazon Trust Services Endpoints(Amazon 信任服务端点(首选))下,选择 Amazon Root CA 1(Amazon 根 CA 1)。证书将在浏览器中打开。
- 复制证书(从 -----BEGIN CERTIFICATE----- 到 -----END CERTIFICATE----- 的所有内容)并将其粘贴到文本编辑器中。
- 将证书以名为 root.pem 的 .pem 文件保存到证书子目录中。
- 在 AWS IoT Core 控制台的 Certificate created(已创建证书)页面上,选择 Activate(启用)。该按钮将变为 Deactivate(停用)。
- 选择 Attach a policy(附加策略)。
- 在Add a policy for your thing(为事物添加策略)页面上,完成以下操作:
选择您之前创建的 AWS IoT Core 策略。例如,选择 admin(管理)。
选择 Register Thing(注册事物)。
有关更多信息,请参阅创建事物对象。
复制 AWS IoT Core 端点 URL
- 在 AWS IoT Core 控制台的导航窗格中,选择 Settings(设置)。
- 在 Settings(设置)页面的 Custom endpoint(自定义端点)下,复制端点。此 AWS IoT Core 自定义端点 URL 对于您的 AWS 账户和区域是唯一的。
创建 Python 程序文件
将以下 Python 代码示例之一保存为名为 publish.py 的 Python 程序文件。
对于适用于 Python v2 的 AWS IoT SDK,请使用以下示例代码:
**重要事项:**请将 customEndpointUrl 替换为您的 AWS IoT Core 自定义端点 URL。将 certificates 替换为您证书子目录的名称。将 a1b23cd45e-certificate.pem.crt 替换为您的客户端 .crt 的名称。将 a1b23cd45e-private.pem.key 替换为您的私钥名称。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: MIT-0 from awscrt import io, mqtt, auth, http from awsiot import mqtt_connection_builder import time as t import json # Define ENDPOINT, CLIENT_ID, PATH_TO_CERTIFICATE, PATH_TO_PRIVATE_KEY, PATH_TO_AMAZON_ROOT_CA_1, MESSAGE, TOPIC, and RANGE ENDPOINT = "customEndpointUrl" CLIENT_ID = "testDevice" PATH_TO_CERTIFICATE = "certificates/a1b23cd45e-certificate.pem.crt" PATH_TO_PRIVATE_KEY = "certificates/a1b23cd45e-private.pem.key" PATH_TO_AMAZON_ROOT_CA_1 = "certificates/root.pem" MESSAGE = "Hello World" TOPIC = "test/testing" RANGE = 20 # Spin up resources event_loop_group = io.EventLoopGroup(1) host_resolver = io.DefaultHostResolver(event_loop_group) client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver) mqtt_connection = mqtt_connection_builder.mtls_from_path( endpoint=ENDPOINT, cert_filepath=PATH_TO_CERTIFICATE, pri_key_filepath=PATH_TO_PRIVATE_KEY, client_bootstrap=client_bootstrap, ca_filepath=PATH_TO_AMAZON_ROOT_CA_1, client_id=CLIENT_ID, clean_session=False, keep_alive_secs=6 ) print("Connecting to {} with client ID '{}'...".format( ENDPOINT, CLIENT_ID)) # Make the connect() call connect_future = mqtt_connection.connect() # Future.result() waits until a result is available connect_future.result() print("Connected!") # Publish message to server desired number of times. print('Begin Publish') for i in range (RANGE): data = "{} [{}]".format(MESSAGE, i+1) message = {"message" : data} mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE) print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'") t.sleep(0.1) print('Publish End') disconnect_future = mqtt_connection.disconnect() disconnect_future.result()
-or-
对于适用于 Python v1 的 AWS IoT Device SDK,请使用以下示例代码:
**重要事项:**请将 customEndpointUrl 替换为您的 AWS IoT Core 自定义端点 URL。将 certificates 替换为您证书子目录的名称。将 a1b23cd45e-certificate.pem.crt 替换为您的客户端 .crt 的名称。将 a1b23cd45e-private.pem.key 替换为您的私钥名称。
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.# SPDX-License-Identifier: MIT-0 import time as t import json import AWSIoTPythonSDK.MQTTLib as AWSIoTPyMQTT # Define ENDPOINT, CLIENT_ID, PATH_TO_CERTIFICATE, PATH_TO_PRIVATE_KEY, PATH_TO_AMAZON_ROOT_CA_1, MESSAGE, TOPIC, and RANGE ENDPOINT = "customEndpointUrl" CLIENT_ID = "testDevice" PATH_TO_CERTIFICATE = "certificates/a1b23cd45e-certificate.pem.crt" PATH_TO_PRIVATE_KEY = "certificates/a1b23cd45e-private.pem.key" PATH_TO_AMAZON_ROOT_CA_1 = "certificates/root.pem" MESSAGE = "Hello World" TOPIC = "test/testing" RANGE = 20 myAWSIoTMQTTClient = AWSIoTPyMQTT.AWSIoTMQTTClient(CLIENT_ID) myAWSIoTMQTTClient.configureEndpoint(ENDPOINT, 8883) myAWSIoTMQTTClient.configureCredentials(PATH_TO_AMAZON_ROOT_CA_1, PATH_TO_PRIVATE_KEY, PATH_TO_CERTIFICATE) myAWSIoTMQTTClient.connect() print('Begin Publish') for i in range (RANGE): data = "{} [{}]".format(MESSAGE, i+1) message = {"message" : data} myAWSIoTMQTTClient.publish(TOPIC, json.dumps(message), 1) print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'") t.sleep(0.1) print('Publish End') myAWSIoTMQTTClient.disconnect()
**注意:**适用于 Python 的 AWS IoT Device SDK 的较旧版本不支持 Python 3.12 或更高版本。确保使用适用于 Python 的 AWS IoT Device SDK 的最新版本。
测试设置
- 在 AWS IoT Core 控制台左侧的导航窗格中,选择 Test(测试)。
- 在 MQTT client(MQTT 客户端)页面的 Subscription topic(订阅主题)处,输入 test/testing。
- 选择 Subscribe to topic(订阅主题)。名为 test/testing 的测试主题可用于测试消息发布。有关更多信息,请参阅使用 AWS IoT MQTT 客户端查看 MQTT 消息。
- 从命令行运行以下命令:
python3 publish.py
Python 程序在 AWS IoT Core 控制台中向您创建的主题 test/testing 发布了 20 条测试消息。在控制台中查看该主题即可查看发布的消息。
**提示:**您还可以使用随附的 pubsub 示例测试 SDK 的其他功能,例如 WebSockets 订阅和连接。有关更多信息,请参阅 GitHub 上的 pubsub(适用于 Python v2 的 AWS IoT SDK)或 BasicPubSub(适用于 Python 的 AWS IoT Device SDK)。
(可选)启用对 Amazon CloudWatch 的 AWS IoT 日志记录
您可以监控发布到 AWS IoT Core 的 MQTT 消息的事件日志。有关设置说明,请参阅配置 AWS IoT 日志记录和使用 CloudWatch Logs 监控 AWS IoT。
相关信息
常见问题解答(MQTT 消息协议网站)
- 语言
- 中文 (简体)

相关内容
AWS 官方已更新 2 年前