I am able to send the data by using below code in python and not receiving the message at consumer end. I have enabled all the authentications. I am able to send and received with kafka commands. i need same thing through a python. I have tried with unauthorized mode and SASL_SSL protocol. both able to send but not received. Please find the below code and suggest me to resolve the issue.
Producer code:
from confluent_kafka import Producer
from datetime import datetime
from time import strftime
import json
bootstrap_servers = 'b-1.xxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096'
producer = Producer({
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.username': 'cccccc',
'sasl.password': 'ccccccccccccccc',
'sasl.mechanism': 'SCRAM-SHA-512'
})
data = {
'message': 'hello world',
'timestamp': datetime.now().strftime("%m/%d/%Y %H:%M:%S")
}
#print(producer.bootstrap_connected())
producer.produce('testTopic1', json.dumps(data).encode('utf-8'))
print('message sent')
producer.flush()
Consumer code:
from confluent_kafka import Consumer
from datetime import datetime
from time import strftime
import json
bootstrap_servers = 'b-1.xxxxxxxxxxxxxxxxxx.amazonaws.com:9096,b-2.xxxxxxxxxxxxxxxxxxxx.amazonaws.com:9096'
consumer = Consumer({
'bootstrap.servers': bootstrap_servers,
'security.protocol': 'SASL_SSL',
'sasl.username': 'cccccccccc',
'sasl.password': 'cccccccccccccc',
'sasl.mechanism': 'SCRAM-SHA-512'
})
print('start reading')
consumer.subscribe(['testTopic1'])
while True:
msg = consumer.poll(timeout=1.0)
if msg is None: continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
# End of partition event
print('topic {} in partition {} reached end at offset {}'.format(msg.topic(), msg.partition(), msg.offset()))
elif msg.error():
raise KafkaException(msg.error())
else:
print("message.offset={}, message.key={}, message.value={}".format(msg.offset(), msg.key(), msg.value()))