【以下的问题经过翻译处理】 我正在尝试使用 Basic Ingest 创建一个 IoT 核心规则,该规则触发 Kafka 操作以将我们的设备数据直接发送到我们的 Kafka 集群。我已成功设置此操作所需的 VPC 目标,指定包含 Kafka 代理的子网;但是,当 KafkaAction 失败时,我在 CloudWatch 中收到以下日志:
{
"ruleName": "testKafkaAction",
"topic": "",
"cloudwatchTraceId": "<id>",
"clientId": "test",
"base64OriginalPayload": "<payload>",
"failures": [
{
"failedAction": "KafkaAction",
"failedResource": "iot-core-sensordata-stream",
"errorMessage": "KafkaAction failed to send a message to the specified bootstrap servers. SSL handshake failed. Message arrived on: , Action: kafka, topic: test-kafka-action, bootstrap.servers: <bootstrap_servers>"
}
]
}
我们的 Kafka 集群是自托管的,并使用自签名 CA。我在 Secrets Manager 中创建了一个二进制机密,其中包含 pkcs12 格式的信任库。信任库包含我们的自签名 CA。我使用 SASL_SSL 作为规则的安全协议,并使用 SCRAM-SHA-512 作为机制。用户名和密码也作为单独的密钥存储在 Secrets Manager 中。我的规则的 IAM 策略已正确设置以访问这些机密。另一件需要注意的事情:我的规则引导服务器列表是我们 VPC 中运行 Kafka 代理的节点的私有 ip:port,我确保这些 IP 在证书的 SAN 列表中。
这是我的规则的 json 模板作为参考:
{
"sql": "SELECT *",
"ruleDisabled": false,
"awsIotSqlVersion": "2016-03-23",
"actions": [
{
"kafka": {
"destinationArn": "<VPC_DESTINATION_ARN>",
"topic": "test-kafka-action",
"clientProperties": {
"bootstrap.servers": "<KAFKA_BOOTSTRAP_SERVERS>",
"key.serializer": "org.apache.kafka.common.serialization.StringSerializer",
"value.serializer": "org.apache.kafka.common.serialization.ByteBufferSerializer",
"security.protocol": "SASL_SSL",
"ssl.truststore": "${get_secret('<SECRET_NAME>', 'SecretBinary', '<KAFKA_RULE_ROLE_ARN>')}",
"ssl.truststore.password": "{{SSL_TRUSTSTORE_PASSWORD}}",
"sasl.mechanism": "SCRAM-SHA-512",
"sasl.scram.username": "${get_secret('<SECRET_NAME>', 'SecretString', 'kafkaUser', '<KAFKA_RULE_ROLE_ARN>')}",
"sasl.scram.password": "${get_secret('<SECRET_NAME>', 'SecretString', 'kafkaPassword', '<KAFKA_RULE_ROLE_ARN>')}"
}
}
}
],
"errorAction": {
"cloudwatchLogs": {
"logGroupName": "AWSIotLogsV2",
"<KAFKA_RULE_LOGS_ROLE_ARN>"
}
}
}
此错误消息未提供有关握手失败原因的任何原因,因此我唯一的猜测是 IoT Core 不允许规则中的 KafkaAction 使用自签名 CA。
这是真的,还是我可能遗漏了其他地方的东西?
在必须将我们服务器的 IP 地址添加到证书上的 SAN 列表后,
我能够使用 python 客户端毫无问题地连接并向我们的 Kafka 集群发布消息,
为其提供相同的凭据和自签名 CA。