参考文档:Usage — kafka-python 2.0.2-dev documentation
同步方式
示例代码:
from kafka import KafkaProducer
from kafka.errors import kafka_errors
if __name__ == '__main__':
print('演示python的kafka API:生产者的同步发送模式')
# 创建kafkaProducer对象
producer = KafkaProducer(
bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
acks=-1
)
# 执行生产者同步方案
future = producer.send('test_kafka', '我爱python'.encode('utf-8')) # 默认是异步
try:
record_metadata = future.get() # 同步发送模式,当发送失败(已经重试完成)后
# 如果发送成功
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
except kafka_errors:
print(kafka_errors)
finally:
producer.close()
运行结果:
异步方式
示例代码:
from kafka import KafkaProducer
if __name__ == '__main__':
print('演示python的kafka API:生产者的异步发送模式')
# 1.创建kafkaProducer对象
producer = KafkaProducer(
bootstrap_servers=['node1:9092', 'node2:9093', 'node3:9094'],
acks=-1,
linger_ms=5000
)
# 2.执行生产者异步方案
# 2.1 异步无返回值方案
# producer.send('test_kafka', '我爱python'.encode('utf-8')) # 默认是异步
# producer.flush()
# 2.2 异步有返回值方案
future = producer.send('test_kafka', '我爱python'.encode('utf-8')) # 默认是异步
# 表示当底层每一批数据发送成功后,都会调用此函数:add_callback
def on_send_success(record_metadata):
print(record_metadata.topic)
print(record_metadata.partition)
print(record_metadata.offset)
# 表示当底层发送一批数据失败时,就会调用此函数:add_errback
def on_send_error(excp):
print(excp)
future.add_callback(on_send_success).add_errback(on_send_error)
# 为了演示效果,不使用flush,而是采用休眠的方式
# 注意:2.2时,如果此时程序运行结束,数据是没法发送到Kafka的
import time
time.sleep(100)
运行结果: