5.设计与实现(IMPLEMENTATION)
5.1 API 设计
生产者 APIS
生产者API包含2个producers-kafka.producer.SyncProducer
和
kafka.producer.async.AsyncProducer。
示例代码如下:
class Producer {
/* Sends the data, partitioned by key to the topic using either the */
/* synchronous or the asynchronous producer */
/*使用同步或者异步的生产者,发送单条消息至由key所对应的topic的分区*/
public void send(kafka.javaapi.producer.ProducerData<K,V> producerData);
/* Sends a list of data, partitioned by key to the topic using either */
/* the synchronous or the asynchronous producer */
/*使用同步或者异步的生产者,发送一系列数据至由key所对应的topic的分区*/
public void send(java.util.List<kafka.javaapi.producer.ProducerData<K,V>> producerData);
/* Closes the producer and cleans up */
/*关闭生产者并做相应清理*/
public void close();
}
其目的就是通过一个单一的API向客户端暴露所有的生产者功能。kafka生产者
-
可以处理多个生产者的排队以及缓冲请求以及异步地分发批量的数据:
kafka.producer.Producer
对于多个生产者的请求数据(producer.type=async
),在序列化和分发它们至相应的kafka节点分区之前,其有能力对它们进行批量处理。而批量处理的大小可由少量的配置参数完成。当数据进入至队列,它们将被缓冲在队列里面,直到queue.time
超时或者达到了配置(batch.size
)的批量处理的最大值.后台的异步线程(kafka.producer.async.ProducerSendThread
)负责将队列里的数据批量取出并让kafka.producer.EventHandler
进行序列化工作,且将数据发送至kafka相应的节点分区。通过设置event.handler
配置参数,即可实现一个自定义的事件处理器( event handler )。不论对于植入自定义日志/跟踪代码,还是自定义监控逻辑,能在生产者队列管道的不同阶段注入回调函数是极其有帮助的。一种可能的方案是通过实现kafka.producer.async.CallbackHandler
接口并且对该类设置callback.handler
配置参数。 -
通过用户自定义的
Encoder
实现对数据的序列化操作:
interface Encoder<T> {
public Message toMessage(T data);
}
默认的 Encoder``是
kafka.serializer.DefaultEncoder“`
-
通过用户设置(可选)的
Partitioner
提供基于软件层面的负载均衡(slb):kafka.producer.Partitioner
会影响到数据传输时的路由策略。
interface Partitioner<T> {
int partition(T key, int numPartitions);
}
分区API使用key以及可用节点分区来返回一个分区id。这个id通常用作有序 broker_ids
的索引,同时节点分区( partitions )将会用这个id挑选出一个分区去处理生产者的请求。默认的分区策略是是对key进行hash,并对分区数目取余,即 hash(key)%numPartitions
。如果key为null,那么将会挑选出一个随机的节点。如果想要实现自定义的分区策略,也可以通过设置 partitioner.class
配置参数实现。
消费者 APIS
kafka提供两种级别的消费者APIS。对于普通、简单的消费者API,其仅包含对单个节点的连接,且可关闭发送给server网络请求。这个API是完全无状态的,每个网络请求将携带偏移量,用户可以根据自己的选择是否保留这些元数据。
高级的消费者API不仅隐藏了kafka集群的细节,而且可以消费集群中的任意一台机器而不用关心其背后的网络拓扑。同时,它也保留了消息是否被消费的状态。另外,高级别的消费者API还支持对依据过滤表达式来对订阅的topic进行过滤(譬如白名单或者黑名单等类似的正则表达式)
普通的 API
class SimpleConsumer {
/*向节点发出拉取消息的请求,并返回消息的数据集*/
public ByteBufferMessageSet fetch(FetchRequest request);
/*发送批量拉取消息的请求,返回响应数据集*/
public MultiFetchResponse multifetch(List<FetchRequest> fetches);
/**
*在给定时间前返回有效的偏移量(分区容量最大值)数据集,且为降序排列
*
* @param time: 毫秒,
* 如果设置了 OffsetRequest$.MODULE$.LATEST_TIME(),则可以从最新的偏移量获取消息
* 如果设置了 OffsetRequest$.MODULE$.EARLIEST_TIME(), 则可以从最早的偏移获取消息.
*/
public long[] getOffsetsBefore(String topic, int partition, long time, int maxNumOffsets);
}
普通消费者API通常用于实现高级API,以及用于一些离线消费者,这些消费者对于保持状态有特殊的要求
高级 API
/*创建一个对kafka集群的连接*/
ConsumerConnector connector = Consumer.create(consumerConfig);
interface ConsumerConnector {
/**
* 此方法用于获取KafkaStreams的集合,这个集合是
* MessageAndMetadata对象的迭代器,通过这个对象你可以获取到与元数据(目前仅指topic)相关联的消息
* Input: a map of <topic, #streams>
* Output: a map of <topic, list of message streams>
*/
public Map<String,List<KafkaStream>> createMessageStreams(Map<String,Int> topicCountMap);
/**
* 你可以获取KafkaStreams的集合, 对符合TopicFilter过滤后topic消息进行迭代(TopicFilter是用标准Java正则表达式封装的topic白名单或黑名单)
*/
public List<KafkaStream> createMessageStreamsByFilter(
TopicFilter topicFilter, int numStreams);
/* 提交截止至目前所有的已消费的消息的偏移量 */
public commitOffsets()
/* 关闭连接 */
public shutdown()
}
这个API围绕迭代器并由KafkaStream类实现。每个kafkastream表示从一个或多个服务器的一个或多个分区的信息流。每个流用于单线程处理,所以客户端可以在创建调用中提供所需的流数。因此,流可能代表多个服务器分区的合并(对应于处理线程的数量),但每个分区只会流向一个流。
createMessageStreams方法调用已在某个topic注册的consumer,这将导致消费者/kafka节点分配的再平衡。API鼓励在一个调用中创建多个主题流,以最小化这种重新平衡。createMessageStreamsByFilter方法调用(额外的)注册的watcher去发现新的符合被过滤的topic。注意通过createMessageStreamsByFilter方法返回的每个流可能会迭代多个topic的消息(譬如,过滤器中允许多个topic)
5.2 网络层
Kafka网络层是一个相当简单的NIO服务器,这个将不会进行详细的阐述。sendfile的实现是由 MessageSet
接口和 writeTo
方法完成。这使得备份文件的信息集合,使用更有效的 transferTo
实现而不是中间缓冲写。线程模型是一个单线程和用来处理每个固定连接数的N个处理器线程组成。这种设计已经在其他地方进行了充分的测试,并且被公认为是简单和快速的实现。该协议保持相当简洁的形式,以便将来更多其他类型语言的客户端实现。
5.3 消息
消息由固定大小的head、可变长度的不透明密钥键字节数组和可变长度的不透明值字节数组组成.消息头包含如下的一些字段:
– CRC32 用以检测消息的截取和损坏
– 格式版本
– 鉴别器的一个属性
– 时间戳
使键和值保持不透明是一个正确的确定:现在序列化包有很大的进展,任何特定的选择都不适合所有的使用.更不用说,一个特定的应用程序使用卡夫卡可能会指定一个特定的序列化类型作为其使用的一部分。MessageSet
接口仅仅只是一个迭代器,用于迭代方法产生的消息,这个方法对NiO通道进行批量读取和写入。
5.4 消息格式
/**
* 1. 消息的4字节CRC32
* 2. 一个字节的 identifier ,用以格式的变化,变化的值为0 或者1
* 3. 一个字节的 identifier属性,允许消息的注释与版本无关
* 位 0 ~ 2 : 压缩编解码
* 0 : 无压缩
* 1 : gzip
* 2 : snappy
* 3 : lz4
* bit 3 : 时间戳类型
* 0 : 创建时间
* 1 : 日志追加时间
* bit 4 ~ 7 : 保留位
* 4. (可选的) 8字节时间戳只有当“magic”标识符大于0
* 5. 4字节密钥长度,包含长度k
* 6. K 字节的 key
* 7. 4字节有效负载长度,含长度v
* 8. V字节的有效负载
*/
5.5 日志
topic名字为”my_topic”的日志有两个分区,并且包含两个目录(也就是 my_topic_0
和 my_topic_1
),目录里面包含的是该topic的消息数据文件。日志文件的格式是“日志条目”序列;每个日志条目是一个4字节整数n存储的消息长度,其次是N消息字节.每个消息唯一标识是一个64位整数的偏移量,该偏移量由发送到该主题的该分区的所有消息流中的消息的开始的字节位置给出。每个消息的磁盘格式如下。每个日志文件以其包含的第一条消息的偏移量命名。所以第一个创建的文件将是00000000000.kafka,每个附加文件将有一个大致S字节整数的名字,S是指之前在配置中设置的日志文件最大值
确切的二进制消息格式是版本化的并且是以标准的接口进行维护,因此,当进行容错时,消息集可以在生产者,kafka节点,以及客户端之间进行相互的任意转换,而不用复制和调节。改格式如下:
磁盘上消息的格式
offset : 8 bytes
message length : 4 bytes (value: 4 + 1 + 1 + 8(if magic value > 0) + 4 + K + 4 + V)
crc : 4 bytes
magic value : 1 byte
attributes : 1 byte
timestamp : 8 bytes (Only exists when magic value is greater than zero)
key length : 4 bytes
key : K bytes
value length : 4 bytes
value : V bytes
使用消息偏移作为消息ID是极其少见的。我们最初的想法是使用一个由生产者自带的GUID自增长实现,并且保存从GUID到每一个节点上偏移量的映射。但由于每个消费者必须持有每个server的ID,因此GUID的全局唯一性将无法提供。更重要的是持有随机id至分区偏移量
的复杂度需要很重的索引结构,而且必须要与磁盘同步,这在本质上就需要一种完全的持久化的随机存取数据结构。因此,为了简化查找结构,我们决定使用一个简单的针对每个分区的原子计数器,它可以用分区ID和节点ID唯一标识消息;这使得查找结构更简单,尽管多每个消费者的多个请求仍然是可能的。然而,一旦我们解决了计数器的问题,跳转至直接使用偏移量似乎变得顺理成章了,毕竟对于任意一个分区而言,计数器是单调唯一增长的。由于偏移量的实现对于消费者是不可见的,因此最终可以采取一种更为高效的方式具体实现( 此处句子读得不太明白 )。
写(Write)
日志允许串行追加,通常是追加到最后一个文件,当达到配置的最大值时,该文件会在另一个新的文件上继续追加。日志文件有两个配置参数:m是操作系统将文件刷新到磁盘之前,可写入的消息的数目,s是指强制多少秒执行一次刷新操作。这给出了一个持久性保障,在系统崩溃时,至多丢失M条消息或S秒的数据。
读(Read)
消息的读取是依据消息在分区中的64位逻辑偏移量以及S字节的最大读取值完成。这个将返回在S字节缓冲区中的消息迭代器。S值的配置应比任何消息的长度要大,但当消息的长度异常地大时,消息的读取将被重试多次,每一次重试都会扩大缓冲区一倍,直到消息被成功读取。消息的最大值缓冲区的最大值均可设置,如此服务器便可以拒绝处理比它们要大的消息,并提供客户端上要读取完整消息所需的最大值的绑定。已分区消息为结尾去读取缓冲区是有可能的,大小的分割将会被很容易地检测到。
从偏移量读取消息的实际过程需要首先定位存储数据的日志段文件,从全局偏移量计算文件特定的偏移量,然后从该文件偏移量读取。这是一个简单的对每个文件保持在内存范围内变化的二分查找
该日志提供了获取最近写入消息的能力,允许客户端“实时”订阅。这在消费者未能消费指定天数消息的场景下尤为有用。在这种情况下,消费者试图去消费一个由OutOfRangeException引起的不存在的偏移量,要么采取重置自身的方式,要么消费失败并作为消费失败的案例(有些许问题)
如下是发送给消费者的结果的格式
MessageSetSend (fetch result)
total length : 4 bytes
error code : 2 bytes
message 1 : x bytes
...
message n : x bytes
MultiMessageSetSend (multiFetch result)
total length : 4 bytes
error code : 2 bytes
messageSetSend 1
...
messageSetSend n
删除(Delete)
数据删除是一次删除一个日志段。日志管理器允许可插入的删除策略来选择哪些文件适合删除。当前的策略是删除任何超过 N天前修改时间的日志,虽然保留最后N GB的政策也可以是有用的。为了避免读取时锁定,而仍然允许修改段列表的删除,我们使用一个 copy-on-write样式分段列表实现,它提供一致的视图,允许以二分查找的方式读取日志段的不变的静态快照视图,且删除操作也同步进行。
保证(Guarantees)
kafka日志提供了一个可配置的参数 M ,该参数可以控制在消息被强制写入到磁盘前消息的最大数量。在kafka服务启动时,一个日志恢复线程会伴随着启动,它迭代所有最新分段日志的消息,并且校验所有消息项的有效性。如果消息的大小和偏移量的总和不超过该文件的长度并且消息有效负载的CRC32匹配消息存储有CRC信息,那么该消息项就是有效的。在检测到崩溃事件时,日志将被截断为最后有效的偏移量。
请注意,两种类型的崩溃必须处理:由于崩溃导致的写块丢失损坏,以及无意义区块被追加到文件的损坏。这样做的原因是,在一般的操作系统不保证在文件的节点之间的和实际的数据块的写入顺序,因此除了失去已写入的数据,假如节点正在更新值,但在数据写入之前发生崩溃了,那么该文件获取到的都是无意义的数据。CRC就是检测这样的边缘案例,并防止它损坏日志(当然,未写入的消息会丢失)