环境安装
-
安装Python。(Python版本为2.7或3.X。)
-
安装依赖库。(使用公网连接需要安装confluent-kafka 1.9.2及以下版本的依赖库)
pip install confluent-kafka==1.9.2
-
下载Demo包kafka-confluent-python-demo.zip。
配置修改
-
如果是ssl连接,需要在控制台下载证书。并且解压压缩包得到ssl.client.truststore.jks,执行以下命令生成caRoot.pem文件。
keytool -importkeystore -srckeystore ssl.client.truststore.jks -destkeystore caRoot.p12 -deststoretype pkcs12 openssl pkcs12 -in caRoot.p12 -out caRoot.pem
-
修改setting.py文件。(ca_location仅在ssl连接时需要配置)
kafka_setting = { 'bootstrap_servers': 'XXX', 'topic_name': 'XXX', 'group_name': 'XXX' }
生产消息
发送以下命令发送消息。
python kafka_producer.py
生产消息示例代码如下:
from confluent_kafka import Producer
import setting
conf = setting.kafka_setting
"""初始化一个 Producer 对象"""
p = Producer({'bootstrap.servers': conf['bootstrap_servers']})
def delivery_report(err, msg):
""" Called once for each message produced to indicate delivery result.
Triggered by poll() or flush(). """
if err is not None:
print('Message delivery failed: {}'.format(err))
else:
print('Message delivered to {} [{}]'.format(msg.topic(), msg.partition()))
"""异步发送消息"""
p.produce(conf['topic_name'], "Hello".encode('utf-8'), callback=delivery_report)
p.poll(0)
"""在程序结束时,调用flush"""
p.flush()
消费消息
发送以下命令消费消息。
python kafka_consumer.py
消费消息示例代码如下:
from confluent_kafka import Consumer, KafkaError
import setting
conf = setting.kafka_setting
c = Consumer({
'bootstrap.servers': conf['bootstrap_servers'],
'group.id': conf['group_name'],
'auto.offset.reset': 'latest'
})
c.subscribe([conf['topic_name']])
while True:
msg = c.poll(1.0)
if msg is None:
continue
if msg.error():
if msg.error().code() == KafkaError._PARTITION_EOF:
continue
else:
print("Consumer error: {}".format(msg.error()))
continue
print('Received message: {}'.format(msg.value().decode('utf-8')))
c.close()