KafkaApis定义了各种API接口,handle处理各种请求
request.header.apiKey match {
case ApiKeys.PRODUCE => handleProduceRequest(request)
case ApiKeys.FETCH => handleFetchRequest(request)
case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
case ApiKeys.METADATA => handleTopicMetadataRequest(request)
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
handleProduceRequest处理生产消息,其中 appendRecords()方法主要用于追加生产请求的消息数据到日志系统中。
// call the replica manager to append messages to the replicas
replicaManager.appendRecords(
timeout = produceRequest.timeout.toLong,
requiredAcks = produceRequest.acks,
internalTopicsAllowed = internalTopicsAllowed,
origin = AppendOrigin.Client,
entriesPerPartition = authorizedRequestInfo,
responseCallback = sendResponseCallback,
recordConversionStatsCallback = processingStatsCallback)
appendRecords()会先将数据追加到日志,在本地存一份,先写入 leader partition,然后根据 ack 的设置调用回调函数将响应结果返回给客户端。
/**
* Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
* the callback function will be triggered either when timeout or the required acks are satisfied;
* if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
*
* Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
* are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
* locks.
*/
def appendRecords(timeout: Long,
requiredAcks: Short,
internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
delayedProduceLock: Option[Lock] = None,
recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = { //acks应该就是判断传过来的acks的参数是否有效,-1,0,1
if (isValidRequiredAcks(requiredAcks)) {
val sTime = time.milliseconds//把数据追加到本地日志里面//localProduceResults 这个就是服务端写完消息以后的处理的结果。
val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
origin, entriesPerPartition, requiredAcks)
debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
//根据写日志返回来的结果,去封装返回客户端的响应。//将数据先写入了leader partition
val produceStatus = localProduceResults.map { case (topicPartition, result) =>
topicPartition -> ProducePartitionStatus(
result.info.lastOffset + 1, // required offset
new PartitionResponse(
result.error,
result.info.firstOffset.map(_.messageOffset).getOrElse(-1),
result.info.logAppendTime,
result.info.logStartOffset,
result.info.recordErrors.asJava,
result.info.errorMessage
)
) // response status
}
actionQueue.add {
() =>
localProduceResults.foreach {
case (topicPartition, result) =>
val requestKey = TopicPartitionOperationKey(topicPartition)
result.info.leaderHwChange match {
case LeaderHwChange.Increased =>
// some delayed operations may be unblocked after HW changed
delayedProducePurgatory.checkAndComplete(requestKey)
delayedFetchPurgatory.checkAndComplete(requestKey)
delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.Same =>
// probably unblock some follower fetch requests since log end offset has been updated
delayedFetchPurgatory.checkAndComplete(requestKey)
case LeaderHwChange.None =>
// nothing
}
}
}
recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
// create delayed produce operation
val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
// create a list of (topic, partition) pairs to use as keys for this delayed produce operation
val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
// try to complete the request immediately, otherwise put it into the purgatory
// this is because while the delayed produce operation is being created, new
// requests may arrive and hence make this operation completable.
delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
} else {
// we can respond immediately
val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
responseCallback(produceResponseStatus)
}
} else {
// If required.acks is outside accepted range, something is wrong with the client
// Just return an error and don't handle the request at all
val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
topicPartition -> new PartitionResponse(
Errors.INVALID_REQUIRED_ACKS,
LogAppendInfo.UnknownLogAppendInfo.firstOffset.map(_.messageOffset).getOrElse(-1),
RecordBatch.NO_TIMESTAMP,
LogAppendInfo.UnknownLogAppendInfo.logStartOffset
)
}//最终调用回调函数//就是靠这个调用回调函数给客户端返回响应结果。
responseCallback(responseStatus)
}
}
将消息追加到 Log 中:
/**
* Append the messages to the local replica logs
*/
private def appendToLocalLog(internalTopicsAllowed: Boolean,
origin: AppendOrigin,
entriesPerPartition: Map[TopicPartition, MemoryRecords],
requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
val traceEnabled = isTraceEnabled
def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
error(s"Error processing append operation on partition $topicPartition", t)
logStartOffset
}
if (traceEnabled)
trace(s"Append [$entriesPerPartition] to local log")
//遍历每个分区
entriesPerPartition.map { case (topicPartition, records) =>
brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
// reject appending to internal topics if it is not allowed //过滤掉内部 topic,不允许添加
if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
(topicPartition, LogAppendResult(
LogAppendInfo.UnknownLogAppendInfo,
Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
} else {
try { //根据 topic 和 partition 找到对应写数据的分区
val partition = getPartitionOrException(topicPartition) //把数据写到leader partition里面
val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
val numAppendedMessages = info.numMessages
// update stats for successfully appended bytes and messages as bytesInRate and messageInRate
brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)
if (traceEnabled)
trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")
(topicPartition, LogAppendResult(info))
} catch {
// NOTE: Failed produce requests metric is not incremented for known exceptions
// it is supposed to indicate un-expected failures of a broker in handling a produce request
case e@ (_: UnknownTopicOrPartitionException |
_: NotLeaderOrFollowerException |
_: RecordTooLargeException |
_: RecordBatchTooLargeException |
_: CorruptRecordException |
_: KafkaStorageException) =>
(topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
case rve: RecordValidationException =>
val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
val recordErrors = rve.recordErrors
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException)))
case t: Throwable =>
val logStartOffset = processFailedRecord(topicPartition, t)
(topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
}
}
}
}
将数据写入 leader partition:
def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderLogIfLocal match {
case Some(leaderLog) => //获取正在同步中的副本
val minIsr = leaderLog.config.minInSyncReplicas
val inSyncSize = isrState.isr.size
// Avoid writing to leader if there are not enough insync replicas to make it safe
if (inSyncSize < minIsr && requiredAcks == -1) {
throw new NotEnoughReplicasException(s"The size of the current ISR ${isrState.isr} " +
s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
}
val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
interBrokerProtocolVersion)
// we may need to increment high watermark since ISR could be down to 1
(info, maybeIncrementLeaderHW(leaderLog))
case None =>
throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
.format(topicPartition, localBrokerId))
}
}
info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}
最后调用回调方法responseCallback()响应消息给客户端。