searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

kafka消息数据存储源码解析

2023-08-22 06:14:13
35
0

KafkaApis定义了各种API接口,handle处理各种请求

request.header.apiKey match {
  case ApiKeys.PRODUCE => handleProduceRequest(request)
  case ApiKeys.FETCH => handleFetchRequest(request)
  case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
  case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

 

handleProduceRequest处理生产消息,其中 appendRecords()方法主要用于追加生产请求的消息数据到日志系统中。

// call the replica manager to append messages to the replicas
replicaManager.appendRecords(
  timeout = produceRequest.timeout.toLong,
  requiredAcks = produceRequest.acks,
  internalTopicsAllowed = internalTopicsAllowed,
  origin = AppendOrigin.Client,
  entriesPerPartition = authorizedRequestInfo,
  responseCallback = sendResponseCallback,
  recordConversionStatsCallback = processingStatsCallback)

 

appendRecords()会先将数据追加到日志,在本地存一份,先写入 leader partition,然后根据 ack 的设置调用回调函数将响应结果返回给客户端。

/**
 * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
 * the callback function will be triggered either when timeout or the required acks are satisfied;
 * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
 *
 * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
 * are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
 * locks.
 */
def appendRecords(timeout: Long,
                  requiredAcks: Short,
                  internalTopicsAllowed: Boolean,
                  origin: AppendOrigin,
                  entriesPerPartition: Map[TopicPartition, MemoryRecords],
                  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                  delayedProduceLock: Option[Lock] = None,
                  recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {  //acks应该就是判断传过来的acks的参数是否有效,-1,0,1
  if (isValidRequiredAcks(requiredAcks)) {
    val sTime = time.milliseconds//把数据追加到本地日志里面//localProduceResults 这个就是服务端写完消息以后的处理的结果。
    val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
      origin, entriesPerPartition, requiredAcks)
    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

//根据写日志返回来的结果,去封装返回客户端的响应。//将数据先写入了leader partition
    val produceStatus = localProduceResults.map { case (topicPartition, result) =>
      topicPartition -> ProducePartitionStatus(
        result.info.lastOffset + 1, // required offset
        new PartitionResponse(
          result.error,
          result.info.firstOffset.map(_.messageOffset).getOrElse(-1),
          result.info.logAppendTime,
          result.info.logStartOffset,
          result.info.recordErrors.asJava,
          result.info.errorMessage
        )
      ) // response status
    }

    actionQueue.add {
      () =>
        localProduceResults.foreach {
          case (topicPartition, result) =>
            val requestKey = TopicPartitionOperationKey(topicPartition)
            result.info.leaderHwChange match {
              case LeaderHwChange.Increased =>
                // some delayed operations may be unblocked after HW changed
                delayedProducePurgatory.checkAndComplete(requestKey)
                delayedFetchPurgatory.checkAndComplete(requestKey)
                delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
              case LeaderHwChange.Same =>
                // probably unblock some follower fetch requests since log end offset has been updated
                delayedFetchPurgatory.checkAndComplete(requestKey)
              case LeaderHwChange.None =>
                // nothing
            }
        }
    }

    recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

    if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
      // create delayed produce operation
      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
      val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

      // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
      val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

      // try to complete the request immediately, otherwise put it into the purgatory
      // this is because while the delayed produce operation is being created, new
      // requests may arrive and hence make this operation completable.
      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

    } else {
      // we can respond immediately
      val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
      responseCallback(produceResponseStatus)
    }
  } else {
    // If required.acks is outside accepted range, something is wrong with the client
    // Just return an error and don't handle the request at all
    val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
      topicPartition -> new PartitionResponse(
        Errors.INVALID_REQUIRED_ACKS,
        LogAppendInfo.UnknownLogAppendInfo.firstOffset.map(_.messageOffset).getOrElse(-1),
        RecordBatch.NO_TIMESTAMP,
        LogAppendInfo.UnknownLogAppendInfo.logStartOffset
      )
    }//最终调用回调函数//就是靠这个调用回调函数给客户端返回响应结果。
    responseCallback(responseStatus)
  }
}

 

将消息追加到 Log 中:

/**
 * Append the messages to the local replica logs
 */
private def appendToLocalLog(internalTopicsAllowed: Boolean,
                             origin: AppendOrigin,
                             entriesPerPartition: Map[TopicPartition, MemoryRecords],
                             requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  val traceEnabled = isTraceEnabled
  def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
    val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
    brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
    brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
    error(s"Error processing append operation on partition $topicPartition", t)

    logStartOffset
  }

  if (traceEnabled)
    trace(s"Append [$entriesPerPartition] to local log")

   //遍历每个分区
  entriesPerPartition.map { case (topicPartition, records) =>
    brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
    brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()

    // reject appending to internal topics if it is not allowed     //过滤掉内部 topic,不允许添加
    if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
      (topicPartition, LogAppendResult(
        LogAppendInfo.UnknownLogAppendInfo,
        Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
    } else {
      try {         //根据 topic 和 partition 找到对应写数据的分区
        val partition = getPartitionOrException(topicPartition)         //把数据写到leader partition里面
        val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
        val numAppendedMessages = info.numMessages

        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
        brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
        brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
        brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)

        if (traceEnabled)
          trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
            s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")

        (topicPartition, LogAppendResult(info))
      } catch {
        // NOTE: Failed produce requests metric is not incremented for known exceptions
        // it is supposed to indicate un-expected failures of a broker in handling a produce request
        case e@ (_: UnknownTopicOrPartitionException |
                 _: NotLeaderOrFollowerException |
                 _: RecordTooLargeException |
                 _: RecordBatchTooLargeException |
                 _: CorruptRecordException |
                 _: KafkaStorageException) =>
          (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
        case rve: RecordValidationException =>
          val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
          val recordErrors = rve.recordErrors
          (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
            logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException)))
        case t: Throwable =>
          val logStartOffset = processFailedRecord(topicPartition, t)
          (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
      }
    }
  }
}

 

将数据写入 leader partition:

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
  val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
    leaderLogIfLocal match {
      case Some(leaderLog) =>        //获取正在同步中的副本
        val minIsr = leaderLog.config.minInSyncReplicas
        val inSyncSize = isrState.isr.size

        // Avoid writing to leader if there are not enough insync replicas to make it safe
        if (inSyncSize < minIsr && requiredAcks == -1) {
          throw new NotEnoughReplicasException(s"The size of the current ISR ${isrState.isr} " +
            s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
        }

        val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
          interBrokerProtocolVersion)

        // we may need to increment high watermark since ISR could be down to 1
        (info, maybeIncrementLeaderHW(leaderLog))

      case None =>
        throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
          .format(topicPartition, localBrokerId))
    }
  }

  info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}

 

最后调用回调方法responseCallback()响应消息给客户端。

 

0条评论
作者已关闭评论
a****k
16文章数
0粉丝数
a****k
16 文章 | 0 粉丝
原创

kafka消息数据存储源码解析

2023-08-22 06:14:13
35
0

KafkaApis定义了各种API接口,handle处理各种请求

request.header.apiKey match {
  case ApiKeys.PRODUCE => handleProduceRequest(request)
  case ApiKeys.FETCH => handleFetchRequest(request)
  case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
  case ApiKeys.METADATA => handleTopicMetadataRequest(request)
  case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)

 

handleProduceRequest处理生产消息,其中 appendRecords()方法主要用于追加生产请求的消息数据到日志系统中。

// call the replica manager to append messages to the replicas
replicaManager.appendRecords(
  timeout = produceRequest.timeout.toLong,
  requiredAcks = produceRequest.acks,
  internalTopicsAllowed = internalTopicsAllowed,
  origin = AppendOrigin.Client,
  entriesPerPartition = authorizedRequestInfo,
  responseCallback = sendResponseCallback,
  recordConversionStatsCallback = processingStatsCallback)

 

appendRecords()会先将数据追加到日志,在本地存一份,先写入 leader partition,然后根据 ack 的设置调用回调函数将响应结果返回给客户端。

/**
 * Append messages to leader replicas of the partition, and wait for them to be replicated to other replicas;
 * the callback function will be triggered either when timeout or the required acks are satisfied;
 * if the callback function itself is already synchronized on some object then pass this object to avoid deadlock.
 *
 * Noted that all pending delayed check operations are stored in a queue. All callers to ReplicaManager.appendRecords()
 * are expected to call ActionQueue.tryCompleteActions for all affected partitions, without holding any conflicting
 * locks.
 */
def appendRecords(timeout: Long,
                  requiredAcks: Short,
                  internalTopicsAllowed: Boolean,
                  origin: AppendOrigin,
                  entriesPerPartition: Map[TopicPartition, MemoryRecords],
                  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
                  delayedProduceLock: Option[Lock] = None,
                  recordConversionStatsCallback: Map[TopicPartition, RecordConversionStats] => Unit = _ => ()): Unit = {  //acks应该就是判断传过来的acks的参数是否有效,-1,0,1
  if (isValidRequiredAcks(requiredAcks)) {
    val sTime = time.milliseconds//把数据追加到本地日志里面//localProduceResults 这个就是服务端写完消息以后的处理的结果。
    val localProduceResults = appendToLocalLog(internalTopicsAllowed = internalTopicsAllowed,
      origin, entriesPerPartition, requiredAcks)
    debug("Produce to local log in %d ms".format(time.milliseconds - sTime))

//根据写日志返回来的结果,去封装返回客户端的响应。//将数据先写入了leader partition
    val produceStatus = localProduceResults.map { case (topicPartition, result) =>
      topicPartition -> ProducePartitionStatus(
        result.info.lastOffset + 1, // required offset
        new PartitionResponse(
          result.error,
          result.info.firstOffset.map(_.messageOffset).getOrElse(-1),
          result.info.logAppendTime,
          result.info.logStartOffset,
          result.info.recordErrors.asJava,
          result.info.errorMessage
        )
      ) // response status
    }

    actionQueue.add {
      () =>
        localProduceResults.foreach {
          case (topicPartition, result) =>
            val requestKey = TopicPartitionOperationKey(topicPartition)
            result.info.leaderHwChange match {
              case LeaderHwChange.Increased =>
                // some delayed operations may be unblocked after HW changed
                delayedProducePurgatory.checkAndComplete(requestKey)
                delayedFetchPurgatory.checkAndComplete(requestKey)
                delayedDeleteRecordsPurgatory.checkAndComplete(requestKey)
              case LeaderHwChange.Same =>
                // probably unblock some follower fetch requests since log end offset has been updated
                delayedFetchPurgatory.checkAndComplete(requestKey)
              case LeaderHwChange.None =>
                // nothing
            }
        }
    }

    recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })

    if (delayedProduceRequestRequired(requiredAcks, entriesPerPartition, localProduceResults)) {
      // create delayed produce operation
      val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
      val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

      // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
      val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

      // try to complete the request immediately, otherwise put it into the purgatory
      // this is because while the delayed produce operation is being created, new
      // requests may arrive and hence make this operation completable.
      delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

    } else {
      // we can respond immediately
      val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
      responseCallback(produceResponseStatus)
    }
  } else {
    // If required.acks is outside accepted range, something is wrong with the client
    // Just return an error and don't handle the request at all
    val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
      topicPartition -> new PartitionResponse(
        Errors.INVALID_REQUIRED_ACKS,
        LogAppendInfo.UnknownLogAppendInfo.firstOffset.map(_.messageOffset).getOrElse(-1),
        RecordBatch.NO_TIMESTAMP,
        LogAppendInfo.UnknownLogAppendInfo.logStartOffset
      )
    }//最终调用回调函数//就是靠这个调用回调函数给客户端返回响应结果。
    responseCallback(responseStatus)
  }
}

 

将消息追加到 Log 中:

/**
 * Append the messages to the local replica logs
 */
private def appendToLocalLog(internalTopicsAllowed: Boolean,
                             origin: AppendOrigin,
                             entriesPerPartition: Map[TopicPartition, MemoryRecords],
                             requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  val traceEnabled = isTraceEnabled
  def processFailedRecord(topicPartition: TopicPartition, t: Throwable) = {
    val logStartOffset = onlinePartition(topicPartition).map(_.logStartOffset).getOrElse(-1L)
    brokerTopicStats.topicStats(topicPartition.topic).failedProduceRequestRate.mark()
    brokerTopicStats.allTopicsStats.failedProduceRequestRate.mark()
    error(s"Error processing append operation on partition $topicPartition", t)

    logStartOffset
  }

  if (traceEnabled)
    trace(s"Append [$entriesPerPartition] to local log")

   //遍历每个分区
  entriesPerPartition.map { case (topicPartition, records) =>
    brokerTopicStats.topicStats(topicPartition.topic).totalProduceRequestRate.mark()
    brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()

    // reject appending to internal topics if it is not allowed     //过滤掉内部 topic,不允许添加
    if (Topic.isInternal(topicPartition.topic) && !internalTopicsAllowed) {
      (topicPartition, LogAppendResult(
        LogAppendInfo.UnknownLogAppendInfo,
        Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
    } else {
      try {         //根据 topic 和 partition 找到对应写数据的分区
        val partition = getPartitionOrException(topicPartition)         //把数据写到leader partition里面
        val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
        val numAppendedMessages = info.numMessages

        // update stats for successfully appended bytes and messages as bytesInRate and messageInRate
        brokerTopicStats.topicStats(topicPartition.topic).bytesInRate.mark(records.sizeInBytes)
        brokerTopicStats.allTopicsStats.bytesInRate.mark(records.sizeInBytes)
        brokerTopicStats.topicStats(topicPartition.topic).messagesInRate.mark(numAppendedMessages)
        brokerTopicStats.allTopicsStats.messagesInRate.mark(numAppendedMessages)

        if (traceEnabled)
          trace(s"${records.sizeInBytes} written to log $topicPartition beginning at offset " +
            s"${info.firstOffset.getOrElse(-1)} and ending at offset ${info.lastOffset}")

        (topicPartition, LogAppendResult(info))
      } catch {
        // NOTE: Failed produce requests metric is not incremented for known exceptions
        // it is supposed to indicate un-expected failures of a broker in handling a produce request
        case e@ (_: UnknownTopicOrPartitionException |
                 _: NotLeaderOrFollowerException |
                 _: RecordTooLargeException |
                 _: RecordBatchTooLargeException |
                 _: CorruptRecordException |
                 _: KafkaStorageException) =>
          (topicPartition, LogAppendResult(LogAppendInfo.UnknownLogAppendInfo, Some(e)))
        case rve: RecordValidationException =>
          val logStartOffset = processFailedRecord(topicPartition, rve.invalidException)
          val recordErrors = rve.recordErrors
          (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithAdditionalInfo(
            logStartOffset, recordErrors, rve.invalidException.getMessage), Some(rve.invalidException)))
        case t: Throwable =>
          val logStartOffset = processFailedRecord(topicPartition, t)
          (topicPartition, LogAppendResult(LogAppendInfo.unknownLogAppendInfoWithLogStartOffset(logStartOffset), Some(t)))
      }
    }
  }
}

 

将数据写入 leader partition:

def appendRecordsToLeader(records: MemoryRecords, origin: AppendOrigin, requiredAcks: Int): LogAppendInfo = {
  val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
    leaderLogIfLocal match {
      case Some(leaderLog) =>        //获取正在同步中的副本
        val minIsr = leaderLog.config.minInSyncReplicas
        val inSyncSize = isrState.isr.size

        // Avoid writing to leader if there are not enough insync replicas to make it safe
        if (inSyncSize < minIsr && requiredAcks == -1) {
          throw new NotEnoughReplicasException(s"The size of the current ISR ${isrState.isr} " +
            s"is insufficient to satisfy the min.isr requirement of $minIsr for partition $topicPartition")
        }

        val info = leaderLog.appendAsLeader(records, leaderEpoch = this.leaderEpoch, origin,
          interBrokerProtocolVersion)

        // we may need to increment high watermark since ISR could be down to 1
        (info, maybeIncrementLeaderHW(leaderLog))

      case None =>
        throw new NotLeaderOrFollowerException("Leader not local for partition %s on broker %d"
          .format(topicPartition, localBrokerId))
    }
  }

  info.copy(leaderHwChange = if (leaderHWIncremented) LeaderHwChange.Increased else LeaderHwChange.Same)
}

 

最后调用回调方法responseCallback()响应消息给客户端。

 

文章来自个人专栏
云组件
16 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0