kafka-python
的安装、基本使用与zookeeper启动等请参考:使用python连接kafka
自定义consumer读取的offset写法
注意在kafka-python
中使用消费者自定义offset的读取顺序时,消费者的写法:
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 参数是[topic名称,partition]
consumer.assign([tp]) # 这里是声明我要手动管理这个consumer的这个partition啦
读取数据的时候使用:
consumer.seek(tp, 3) # 这里可以手动设置偏移量,比如设置从第3个数开始读
consumer.seek(tp, 50) # 从第50个数开始读取
next(consumer)
ATTENTION:初始化consumer时,不能够使用这种一次性把topic什么的都传入进来的形式:
# 不能这么初始化 consumer
consumer = KafkaConsumer(self.topic, bootstrap_servers='localhost:9092', auto_offset_reset='latest', api_version=(0, 10, 2))
示例代码
1. 启动生产者,创建数据
首先启动生产者:
from kafka import KafkaProducer
import datetime
import json
# 启动生产者
producer = KafkaProducer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
my_topic = "python_test"
for i in range(100):
data = {'num': i, 'data': datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
producer.send(my_topic, json.dumps(data).encode('utf-8')).get(timeout=30)
这里启动一个python_test
的topic,然后往里面写入100个数据
2. 消费者自定义offset读取
from kafka import KafkaConsumer
from kafka.structs import TopicPartition
import random
consumer = KafkaConsumer(bootstrap_servers='localhost:9092', api_version=(0, 10, 2))
tp = TopicPartition("python_test", 0) # 参数是[topic名称,partition]
consumer.assign([tp]) # 这里是声明我要手动管理这个consumer的这个partition啦
for i in range(10):
random_seek = random.randint(0, 100) # 这里是随机生成从0-100的随机整数,用于设置偏移量
consumer.seek(tp, random_seek) # 这里是设置偏移量
consumer_data = next(consumer) # 这个读取consumer的内容,注意:使用next后,偏移量自动+1
print(consumer_data)
打印结果:
ConsumerRecord(topic='python_test', partition=0, offset=66, timestamp=1646473931338, timestamp_type=0, key=None, value=b'{"num": 61, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1900770981, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=87, timestamp=1646473931355, timestamp_type=0, key=None, value=b'{"num": 82, "data": "2022-03-05 17:52:11"}', headers=[], checksum=3522714781, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'{"num": 66, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=50, timestamp=1646473931318, timestamp_type=0, key=None, value=b'{"num": 45, "data": "2022-03-05 17:52:11"}', headers=[], checksum=4078097399, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=11, timestamp=1646473931271, timestamp_type=0, key=None, value=b'{"num": 6, "data": "2022-03-05 17:52:11"}', headers=[], checksum=2130074065, serialized_key_size=-1, serialized_value_size=41, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=71, timestamp=1646473931342, timestamp_type=0, key=None, value=b'{"num": 66, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1871660722, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=83, timestamp=1646473931352, timestamp_type=0, key=None, value=b'{"num": 78, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1428608549, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=25, timestamp=1646473931285, timestamp_type=0, key=None, value=b'{"num": 20, "data": "2022-03-05 17:52:11"}', headers=[], checksum=1811806078, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=36, timestamp=1646473931296, timestamp_type=0, key=None, value=b'{"num": 31, "data": "2022-03-05 17:52:11"}', headers=[], checksum=330696171, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)
ConsumerRecord(topic='python_test', partition=0, offset=18, timestamp=1646473931278, timestamp_type=0, key=None, value=b'{"num": 13, "data": "2022-03-05 17:52:11"}', headers=[], checksum=719857123, serialized_key_size=-1, serialized_value_size=42, serialized_header_size=-1)