searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

kafka副本同步源码解析

2023-08-22 06:14:11
11
0

kafka服务端把消息数据写入主分区后,再将数据同步到 follower 中。

case ApiKeys.FETCH => handleFetchRequest(request)
/**
 * Handle a fetch request
 */
def handleFetchRequest(request: RequestChannel.Request): Unit = {
  val versionId = request.header.apiVersion
  val clientId = request.header.clientId
  val fetchRequest = request.body[FetchRequest]
  val fetchContext = fetchManager.newContext(
    fetchRequest.metadata,
    fetchRequest.fetchData,
    fetchRequest.toForget,
    fetchRequest.isFromFollower)

  val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
    // Fetch API version 11 added preferred replica logic
    Some(new DefaultClientMetadata(
      fetchRequest.rackId,
      clientId,
      request.context.clientAddress,
      request.context.principal,
      request.context.listenerName.value))
  } else {
    None
  }

  def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
    new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
      FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
  }

  val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
  val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
  if (fetchRequest.isFromFollower) {
    // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
    if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
      fetchContext.foreachPartition { (topicPartition, data) =>
        if (!metadataCache.contains(topicPartition))
          erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
        else
          interesting += (topicPartition -> data)
      }
    } else {
      fetchContext.foreachPartition { (part, _) =>
        erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      }
    }
  } else {
    // Regular Kafka consumers need READ permission on each partition they are fetching.
    val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
    fetchContext.foreachPartition { (topicPartition, partitionData) =>
      partitionDatas += topicPartition -> partitionData
    }
    val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
    partitionDatas.foreach { case (topicPartition, data) =>
      if (!authorizedTopics.contains(topicPartition.topic))
        erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      else if (!metadataCache.contains(topicPartition))
        erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
      else
        interesting += (topicPartition -> data)
    }
  }

  // the callback for process a fetch response, invoked before throttling
  def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
    val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
    val reassigningPartitions = mutable.Set[TopicPartition]()
    responsePartitionData.foreach { case (tp, data) =>
      val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
      val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
      if (data.isReassignmentFetch)
        reassigningPartitions.add(tp)
      val error = maybeDownConvertStorageError(data.error, versionId)
      partitions.put(tp, new FetchResponse.PartitionData(
        error,
        data.highWatermark,
        lastStableOffset,
        data.logStartOffset,
        data.preferredReadReplica.map(int2Integer).asJava,
        abortedTransactions,
        data.divergingEpoch.asJava,
        data.records))
    }
    erroneous.foreach { case (tp, data) => partitions.put(tp, data) }

    var unconvertedFetchResponse: FetchResponse[Records] = null

    def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
      // Down-convert messages for each partition if required
      val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]]
      unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
        if (unconvertedPartitionData.error != Errors.NONE)
          debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
            s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
        convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
      }

      // Prepare fetch response from converted data
      val response = new FetchResponse(unconvertedFetchResponse.error, convertedData, throttleTimeMs,
        unconvertedFetchResponse.sessionId)
      // record the bytes out metrics only when the response is being sent
      response.responseData.forEach { (tp, data) =>
        brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), data.records.sizeInBytes)
      }
      response
    }

    def updateConversionStats(send: Send): Unit = {
      send match {
        case send: MultiRecordsSend if send.recordConversionStats != null =>
          send.recordConversionStats.asScala.toMap.foreach {
            case (tp, stats) => updateRecordConversionStats(request, tp, stats)
          }
        case _ =>
      }
    }

    if (fetchRequest.isFromFollower) {
      // We've already evaluated against the quota and are good to go. Just need to record it now.
      unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
      val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
      quotas.leader.record(responseSize)
      trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " +
        s"metadata=${unconvertedFetchResponse.sessionId}")
      requestHelper.sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
    } else {
      // Fetch size used to determine throttle time is calculated before any down conversions.
      // This may be slightly different from the actual response size. But since down conversions
      // result in data being loaded into memory, we should do this only when we are not going to throttle.
      //
      // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the
      // quotas have been violated. If both quotas have been violated, use the max throttle time between the two
      // quotas. When throttled, we unrecord the recorded bandwidth quota value
      val responseSize = fetchContext.getResponseSize(partitions, versionId)
      val timeMs = time.milliseconds()
      val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
      val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)

      val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
      if (maxThrottleTimeMs > 0) {
        request.apiThrottleTimeMs = maxThrottleTimeMs
        // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
        // from the fetch quota because we are going to return an empty response.
        quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
          quotas.fetch.throttle(request, bandwidthThrottleTimeMs, requestChannel.sendResponse)
        } else {
          quotas.request.throttle(request, requestThrottleTimeMs, requestChannel.sendResponse)
        }
        // If throttling is required, return an empty response.
        unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
      } else {
        // Get the actual response. This will update the fetch context.
        unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
        trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}")
      }

      // Send the response immediately.
      requestHelper.sendResponse(request, Some(createResponse(maxThrottleTimeMs)), Some(updateConversionStats))
    }
  }

  // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
  // no bytes were recorded in the recent quota window
  // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
  val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
    Int.MaxValue
  else
    quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt

  val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
  val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
  if (interesting.isEmpty)
    processResponseCallback(Seq.empty)
  else {
    // call the replica manager to fetch messages from the local replica
    replicaManager.fetchMessages(
      fetchRequest.maxWait.toLong,
      fetchRequest.replicaId,
      fetchMinBytes,
      fetchMaxBytes,
      versionId <= 2,
      interesting,
      replicationQuota(fetchRequest),
      processResponseCallback,
      fetchRequest.isolationLevel,
      clientMetadata)
  }
}

ApiKeys.FETCH拉取数据的请求发生在replicaManager.fetchMessages()方法

/**
 * Fetch messages from a replica, and wait until enough data can be fetched and return;
 * the callback function will be triggered either when timeout or required fetch info is satisfied.
 * Consumers may fetch from any replica, but followers can only fetch from the leader.
 */
def fetchMessages(timeout: Long,
                  replicaId: Int,
                  fetchMinBytes: Int,
                  fetchMaxBytes: Int,
                  hardMaxBytesLimit: Boolean,
                  fetchInfos: Seq[(TopicPartition, PartitionData)],
                  quota: ReplicaQuota,
                  responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
                  isolationLevel: IsolationLevel,
                  clientMetadata: Option[ClientMetadata]): Unit = {
  val isFromFollower = Request.isValidBrokerId(replicaId)
  val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
  val fetchIsolation = if (!isFromConsumer)
    FetchLogEnd
  else if (isolationLevel == IsolationLevel.READ_COMMITTED)
    FetchTxnCommitted
  else
    FetchHighWatermark

  // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
  val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
  def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
    val result = readFromLocalLog(
      replicaId = replicaId,
      fetchOnlyFromLeader = fetchOnlyFromLeader,
      fetchIsolation = fetchIsolation,
      fetchMaxBytes = fetchMaxBytes,
      hardMaxBytesLimit = hardMaxBytesLimit,
      readPartitionInfo = fetchInfos,
      quota = quota,
      clientMetadata = clientMetadata)
    if (isFromFollower) updateFollowerFetchState(replicaId, result)
    else result
  }

  val logReadResults = readFromLog()

  // check if this fetch request can be satisfied right away
  var bytesReadable: Long = 0
  var errorReadingData = false
  var hasDivergingEpoch = false
  val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
  logReadResults.foreach { case (topicPartition, logReadResult) =>
    brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
    brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()

    if (logReadResult.error != Errors.NONE)
      errorReadingData = true
    if (logReadResult.divergingEpoch.nonEmpty)
      hasDivergingEpoch = true
    bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
    logReadResultMap.put(topicPartition, logReadResult)
  }

  // respond immediately if 1) fetch request does not want to wait
  //                        2) fetch request does not require any data
  //                        3) has enough data to respond
  //                        4) some error happens while reading data
  //                        5) we found a diverging epoch
  if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
    val fetchPartitionData = logReadResults.map { case (tp, result) =>
      val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
      tp -> result.toFetchPartitionData(isReassignmentFetch)
    }
    responseCallback(fetchPartitionData)
  } else {
    // construct the fetch results from the read results
    val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
    fetchInfos.foreach { case (topicPartition, partitionData) =>
      logReadResultMap.get(topicPartition).foreach(logReadResult => {
        val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
        fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
      })
    }
    val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
      fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
    val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
      responseCallback)

    // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
    val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }

    // try to complete the request immediately, otherwise put it into the purgatory;
    // this is because while the delayed fetch operation is being created, new requests
    // may arrive and hence make this operation completable.
    delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
  }
}

上面第一步会判断 follower 副本的状态,确定拉取数据的目标,然后会先从本地的磁盘里面去读取日志信息readFromLocalLog()。

/**
 * Read from multiple topic partitions at the given offset up to maxSize bytes
 */
def readFromLocalLog(replicaId: Int,
                     fetchOnlyFromLeader: Boolean,
                     fetchIsolation: FetchIsolation,
                     fetchMaxBytes: Int,
                     hardMaxBytesLimit: Boolean,
                     readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota,
                     clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
  val traceEnabled = isTraceEnabled

  def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
    val offset = fetchInfo.fetchOffset
    val partitionFetchSize = fetchInfo.maxBytes
    val followerLogStartOffset = fetchInfo.logStartOffset

    val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
    try {
      if (traceEnabled)
        trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
          s"remaining response limit $limitBytes" +
          (if (minOneMessage) s", ignoring response/partition size limits" else ""))

      val partition = getPartitionOrException(tp)
      val fetchTimeMs = time.milliseconds

      // If we are the leader, determine the preferred read-replica
      val preferredReadReplica = clientMetadata.flatMap(
        metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))

      if (preferredReadReplica.isDefined) {
        replicaSelectorOpt.foreach { selector =>
          debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
            s"${preferredReadReplica.get} for $clientMetadata")
        }
        // If a preferred read-replica is set, skip the read
        val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
          divergingEpoch = None,
          highWatermark = offsetSnapshot.highWatermark.messageOffset,
          leaderLogStartOffset = offsetSnapshot.logStartOffset,
          leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
          followerLogStartOffset = followerLogStartOffset,
          fetchTimeMs = -1L,
          lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
          preferredReadReplica = preferredReadReplica,
          exception = None)
      } else {
        // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
        val readInfo: LogReadInfo = partition.readRecords(
          lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
          fetchOffset = fetchInfo.fetchOffset,
          currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
          maxBytes = adjustedMaxBytes,
          fetchIsolation = fetchIsolation,
          fetchOnlyFromLeader = fetchOnlyFromLeader,
          minOneMessage = minOneMessage)

        val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
          // If the partition is being throttled, simply return an empty set.
          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
        } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
          // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
          // progress in such cases and don't need to report a `RecordTooLargeException`
          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
        } else {
          readInfo.fetchedData
        }

        LogReadResult(info = fetchDataInfo,
          divergingEpoch = readInfo.divergingEpoch,
          highWatermark = readInfo.highWatermark,
          leaderLogStartOffset = readInfo.logStartOffset,
          leaderLogEndOffset = readInfo.logEndOffset,
          followerLogStartOffset = followerLogStartOffset,
          fetchTimeMs = fetchTimeMs,
          lastStableOffset = Some(readInfo.lastStableOffset),
          preferredReadReplica = preferredReadReplica,
          exception = None)
      }
    } catch {
      // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
      // is supposed to indicate un-expected failure of a broker in handling a fetch request
      case e@ (_: UnknownTopicOrPartitionException |
               _: NotLeaderOrFollowerException |
               _: UnknownLeaderEpochException |
               _: FencedLeaderEpochException |
               _: ReplicaNotAvailableException |
               _: KafkaStorageException |
               _: OffsetOutOfRangeException) =>
        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
          divergingEpoch = None,
          highWatermark = Log.UnknownOffset,
          leaderLogStartOffset = Log.UnknownOffset,
          leaderLogEndOffset = Log.UnknownOffset,
          followerLogStartOffset = Log.UnknownOffset,
          fetchTimeMs = -1L,
          lastStableOffset = None,
          exception = Some(e))
      case e: Throwable =>
        brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
        brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()

        val fetchSource = Request.describeReplicaId(replicaId)
        error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
          s"on partition $tp: $fetchInfo", e)

        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
          divergingEpoch = None,
          highWatermark = Log.UnknownOffset,
          leaderLogStartOffset = Log.UnknownOffset,
          leaderLogEndOffset = Log.UnknownOffset,
          followerLogStartOffset = Log.UnknownOffset,
          fetchTimeMs = -1L,
          lastStableOffset = None,
          exception = Some(e))
    }
  }

  var limitBytes = fetchMaxBytes
  val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
  var minOneMessage = !hardMaxBytesLimit
  readPartitionInfo.foreach { case (tp, fetchInfo) =>
    val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
    val recordBatchSize = readResult.info.records.sizeInBytes
    // Once we read from a non-empty partition, we stop ignoring request and partition level size limits
    if (recordBatchSize > 0)
      minOneMessage = false
    limitBytes = math.max(0, limitBytes - recordBatchSize)
    result += (tp -> readResult)
  }
  result
}

readFromLocalLog()方法首先会通过传入的fetchOnlyFromLeader和readOnlyCommitted参数确定拉取分区和可拉取消息的最大偏移量,然后通过分区的Log对象的read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None):FetchDataInfo方法读取数据,读取的数据会被包装为LogReadResult对象进行返回。接下来的操作就交给上层的fetchMessages()方法了,如果发起拉取请求的是Follower副本,会调用updateFollowerFetchState ()方法:

/**
 * Update the follower's state in the leader based on the last fetch request. See
 * [[Replica.updateFetchState()]] for details.
 *
 * @return true if the follower's fetch state was updated, false if the followerId is not recognized
 */
def updateFollowerFetchState(followerId: Int,
                             followerFetchOffsetMetadata: LogOffsetMetadata,
                             followerStartOffset: Long,
                             followerFetchTimeMs: Long,
                             leaderEndOffset: Long): Boolean = {
  getReplica(followerId) match {
    case Some(followerReplica) =>
      // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
      val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
      val prevFollowerEndOffset = followerReplica.logEndOffset
      followerReplica.updateFetchState(
        followerFetchOffsetMetadata,
        followerStartOffset,
        followerFetchTimeMs,
        leaderEndOffset)

      val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
      // check if the LW of the partition has incremented
      // since the replica's logStartOffset may have incremented
      val leaderLWIncremented = newLeaderLW > oldLeaderLW

      // Check if this in-sync replica needs to be added to the ISR.
      maybeExpandIsr(followerReplica, followerFetchTimeMs)

      // check if the HW of the partition can now be incremented
      // since the replica may already be in the ISR and its LEO has just incremented
      val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) {
        // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
        // leaderIsrUpdateLock to prevent adding new hw to invalid log.
        inReadLock(leaderIsrUpdateLock) {
          leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
        }
      } else {
        false
      }

      // some delayed operations may be unblocked after HW or LW changed
      if (leaderLWIncremented || leaderHWIncremented)
        tryCompleteDelayedRequests()

      debug(s"Recorded replica $followerId log end offset (LEO) position " +
        s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
      true

    case None =>
      false
  }
}

更新完 replica 的 LEO 之后,会针对 replica 的状态决定是否将 replica 放入到 ISR 中,因为 replica 是存在滞后的情况,如果滞后超过阈值,则排除在 ISR 之外,这个逻辑在 maybeExpandIsr()处理。

/**
 * Check and maybe expand the ISR of the partition.
 * A replica will be added to ISR if its LEO >= current hw of the partition and it is caught up to
 * an offset within the current leader epoch. A replica must be caught up to the current leader
 * epoch before it can join ISR, because otherwise, if there is committed data between current
 * leader's HW and LEO, the replica may become the leader before it fetches the committed data
 * and the data will be lost.
 *
 * Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs,
 * even if its log end offset is >= HW. However, to be consistent with how the follower determines
 * whether a replica is in-sync, we only check HW.
 *
 * This function can be triggered when a replica's LEO has incremented.
 */
private def maybeExpandIsr(followerReplica: Replica, followerFetchTimeMs: Long): Unit = {
  val needsIsrUpdate = canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock) {
    needsExpandIsr(followerReplica)
  }
  if (needsIsrUpdate) {
    inWriteLock(leaderIsrUpdateLock) {
      // check if this replica needs to be added to the ISR
      if (needsExpandIsr(followerReplica)) {
        expandIsr(followerReplica.brokerId)
      }
    }
  }
}
0条评论
作者已关闭评论
a****k
16文章数
0粉丝数
a****k
16 文章 | 0 粉丝
原创

kafka副本同步源码解析

2023-08-22 06:14:11
11
0

kafka服务端把消息数据写入主分区后,再将数据同步到 follower 中。

case ApiKeys.FETCH => handleFetchRequest(request)
/**
 * Handle a fetch request
 */
def handleFetchRequest(request: RequestChannel.Request): Unit = {
  val versionId = request.header.apiVersion
  val clientId = request.header.clientId
  val fetchRequest = request.body[FetchRequest]
  val fetchContext = fetchManager.newContext(
    fetchRequest.metadata,
    fetchRequest.fetchData,
    fetchRequest.toForget,
    fetchRequest.isFromFollower)

  val clientMetadata: Option[ClientMetadata] = if (versionId >= 11) {
    // Fetch API version 11 added preferred replica logic
    Some(new DefaultClientMetadata(
      fetchRequest.rackId,
      clientId,
      request.context.clientAddress,
      request.context.principal,
      request.context.listenerName.value))
  } else {
    None
  }

  def errorResponse[T >: MemoryRecords <: BaseRecords](error: Errors): FetchResponse.PartitionData[T] = {
    new FetchResponse.PartitionData[T](error, FetchResponse.INVALID_HIGHWATERMARK, FetchResponse.INVALID_LAST_STABLE_OFFSET,
      FetchResponse.INVALID_LOG_START_OFFSET, null, MemoryRecords.EMPTY)
  }

  val erroneous = mutable.ArrayBuffer[(TopicPartition, FetchResponse.PartitionData[Records])]()
  val interesting = mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]()
  if (fetchRequest.isFromFollower) {
    // The follower must have ClusterAction on ClusterResource in order to fetch partition data.
    if (authHelper.authorize(request.context, CLUSTER_ACTION, CLUSTER, CLUSTER_NAME)) {
      fetchContext.foreachPartition { (topicPartition, data) =>
        if (!metadataCache.contains(topicPartition))
          erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
        else
          interesting += (topicPartition -> data)
      }
    } else {
      fetchContext.foreachPartition { (part, _) =>
        erroneous += part -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      }
    }
  } else {
    // Regular Kafka consumers need READ permission on each partition they are fetching.
    val partitionDatas = new mutable.ArrayBuffer[(TopicPartition, FetchRequest.PartitionData)]
    fetchContext.foreachPartition { (topicPartition, partitionData) =>
      partitionDatas += topicPartition -> partitionData
    }
    val authorizedTopics = authHelper.filterByAuthorized(request.context, READ, TOPIC, partitionDatas)(_._1.topic)
    partitionDatas.foreach { case (topicPartition, data) =>
      if (!authorizedTopics.contains(topicPartition.topic))
        erroneous += topicPartition -> errorResponse(Errors.TOPIC_AUTHORIZATION_FAILED)
      else if (!metadataCache.contains(topicPartition))
        erroneous += topicPartition -> errorResponse(Errors.UNKNOWN_TOPIC_OR_PARTITION)
      else
        interesting += (topicPartition -> data)
    }
  }

  // the callback for process a fetch response, invoked before throttling
  def processResponseCallback(responsePartitionData: Seq[(TopicPartition, FetchPartitionData)]): Unit = {
    val partitions = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
    val reassigningPartitions = mutable.Set[TopicPartition]()
    responsePartitionData.foreach { case (tp, data) =>
      val abortedTransactions = data.abortedTransactions.map(_.asJava).orNull
      val lastStableOffset = data.lastStableOffset.getOrElse(FetchResponse.INVALID_LAST_STABLE_OFFSET)
      if (data.isReassignmentFetch)
        reassigningPartitions.add(tp)
      val error = maybeDownConvertStorageError(data.error, versionId)
      partitions.put(tp, new FetchResponse.PartitionData(
        error,
        data.highWatermark,
        lastStableOffset,
        data.logStartOffset,
        data.preferredReadReplica.map(int2Integer).asJava,
        abortedTransactions,
        data.divergingEpoch.asJava,
        data.records))
    }
    erroneous.foreach { case (tp, data) => partitions.put(tp, data) }

    var unconvertedFetchResponse: FetchResponse[Records] = null

    def createResponse(throttleTimeMs: Int): FetchResponse[BaseRecords] = {
      // Down-convert messages for each partition if required
      val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[BaseRecords]]
      unconvertedFetchResponse.responseData.forEach { (tp, unconvertedPartitionData) =>
        if (unconvertedPartitionData.error != Errors.NONE)
          debug(s"Fetch request with correlation id ${request.header.correlationId} from client $clientId " +
            s"on partition $tp failed due to ${unconvertedPartitionData.error.exceptionName}")
        convertedData.put(tp, maybeConvertFetchedData(tp, unconvertedPartitionData))
      }

      // Prepare fetch response from converted data
      val response = new FetchResponse(unconvertedFetchResponse.error, convertedData, throttleTimeMs,
        unconvertedFetchResponse.sessionId)
      // record the bytes out metrics only when the response is being sent
      response.responseData.forEach { (tp, data) =>
        brokerTopicStats.updateBytesOut(tp.topic, fetchRequest.isFromFollower, reassigningPartitions.contains(tp), data.records.sizeInBytes)
      }
      response
    }

    def updateConversionStats(send: Send): Unit = {
      send match {
        case send: MultiRecordsSend if send.recordConversionStats != null =>
          send.recordConversionStats.asScala.toMap.foreach {
            case (tp, stats) => updateRecordConversionStats(request, tp, stats)
          }
        case _ =>
      }
    }

    if (fetchRequest.isFromFollower) {
      // We've already evaluated against the quota and are good to go. Just need to record it now.
      unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
      val responseSize = KafkaApis.sizeOfThrottledPartitions(versionId, unconvertedFetchResponse, quotas.leader)
      quotas.leader.record(responseSize)
      trace(s"Sending Fetch response with partitions.size=${unconvertedFetchResponse.responseData.size}, " +
        s"metadata=${unconvertedFetchResponse.sessionId}")
      requestHelper.sendResponseExemptThrottle(request, createResponse(0), Some(updateConversionStats))
    } else {
      // Fetch size used to determine throttle time is calculated before any down conversions.
      // This may be slightly different from the actual response size. But since down conversions
      // result in data being loaded into memory, we should do this only when we are not going to throttle.
      //
      // Record both bandwidth and request quota-specific values and throttle by muting the channel if any of the
      // quotas have been violated. If both quotas have been violated, use the max throttle time between the two
      // quotas. When throttled, we unrecord the recorded bandwidth quota value
      val responseSize = fetchContext.getResponseSize(partitions, versionId)
      val timeMs = time.milliseconds()
      val requestThrottleTimeMs = quotas.request.maybeRecordAndGetThrottleTimeMs(request, timeMs)
      val bandwidthThrottleTimeMs = quotas.fetch.maybeRecordAndGetThrottleTimeMs(request, responseSize, timeMs)

      val maxThrottleTimeMs = math.max(bandwidthThrottleTimeMs, requestThrottleTimeMs)
      if (maxThrottleTimeMs > 0) {
        request.apiThrottleTimeMs = maxThrottleTimeMs
        // Even if we need to throttle for request quota violation, we should "unrecord" the already recorded value
        // from the fetch quota because we are going to return an empty response.
        quotas.fetch.unrecordQuotaSensor(request, responseSize, timeMs)
        if (bandwidthThrottleTimeMs > requestThrottleTimeMs) {
          quotas.fetch.throttle(request, bandwidthThrottleTimeMs, requestChannel.sendResponse)
        } else {
          quotas.request.throttle(request, requestThrottleTimeMs, requestChannel.sendResponse)
        }
        // If throttling is required, return an empty response.
        unconvertedFetchResponse = fetchContext.getThrottledResponse(maxThrottleTimeMs)
      } else {
        // Get the actual response. This will update the fetch context.
        unconvertedFetchResponse = fetchContext.updateAndGenerateResponseData(partitions)
        trace(s"Sending Fetch response with partitions.size=$responseSize, metadata=${unconvertedFetchResponse.sessionId}")
      }

      // Send the response immediately.
      requestHelper.sendResponse(request, Some(createResponse(maxThrottleTimeMs)), Some(updateConversionStats))
    }
  }

  // for fetch from consumer, cap fetchMaxBytes to the maximum bytes that could be fetched without being throttled given
  // no bytes were recorded in the recent quota window
  // trying to fetch more bytes would result in a guaranteed throttling potentially blocking consumer progress
  val maxQuotaWindowBytes = if (fetchRequest.isFromFollower)
    Int.MaxValue
  else
    quotas.fetch.getMaxValueInQuotaWindow(request.session, clientId).toInt

  val fetchMaxBytes = Math.min(Math.min(fetchRequest.maxBytes, config.fetchMaxBytes), maxQuotaWindowBytes)
  val fetchMinBytes = Math.min(fetchRequest.minBytes, fetchMaxBytes)
  if (interesting.isEmpty)
    processResponseCallback(Seq.empty)
  else {
    // call the replica manager to fetch messages from the local replica
    replicaManager.fetchMessages(
      fetchRequest.maxWait.toLong,
      fetchRequest.replicaId,
      fetchMinBytes,
      fetchMaxBytes,
      versionId <= 2,
      interesting,
      replicationQuota(fetchRequest),
      processResponseCallback,
      fetchRequest.isolationLevel,
      clientMetadata)
  }
}

ApiKeys.FETCH拉取数据的请求发生在replicaManager.fetchMessages()方法

/**
 * Fetch messages from a replica, and wait until enough data can be fetched and return;
 * the callback function will be triggered either when timeout or required fetch info is satisfied.
 * Consumers may fetch from any replica, but followers can only fetch from the leader.
 */
def fetchMessages(timeout: Long,
                  replicaId: Int,
                  fetchMinBytes: Int,
                  fetchMaxBytes: Int,
                  hardMaxBytesLimit: Boolean,
                  fetchInfos: Seq[(TopicPartition, PartitionData)],
                  quota: ReplicaQuota,
                  responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
                  isolationLevel: IsolationLevel,
                  clientMetadata: Option[ClientMetadata]): Unit = {
  val isFromFollower = Request.isValidBrokerId(replicaId)
  val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
  val fetchIsolation = if (!isFromConsumer)
    FetchLogEnd
  else if (isolationLevel == IsolationLevel.READ_COMMITTED)
    FetchTxnCommitted
  else
    FetchHighWatermark

  // Restrict fetching to leader if request is from follower or from a client with older version (no ClientMetadata)
  val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
  def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
    val result = readFromLocalLog(
      replicaId = replicaId,
      fetchOnlyFromLeader = fetchOnlyFromLeader,
      fetchIsolation = fetchIsolation,
      fetchMaxBytes = fetchMaxBytes,
      hardMaxBytesLimit = hardMaxBytesLimit,
      readPartitionInfo = fetchInfos,
      quota = quota,
      clientMetadata = clientMetadata)
    if (isFromFollower) updateFollowerFetchState(replicaId, result)
    else result
  }

  val logReadResults = readFromLog()

  // check if this fetch request can be satisfied right away
  var bytesReadable: Long = 0
  var errorReadingData = false
  var hasDivergingEpoch = false
  val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
  logReadResults.foreach { case (topicPartition, logReadResult) =>
    brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
    brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()

    if (logReadResult.error != Errors.NONE)
      errorReadingData = true
    if (logReadResult.divergingEpoch.nonEmpty)
      hasDivergingEpoch = true
    bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
    logReadResultMap.put(topicPartition, logReadResult)
  }

  // respond immediately if 1) fetch request does not want to wait
  //                        2) fetch request does not require any data
  //                        3) has enough data to respond
  //                        4) some error happens while reading data
  //                        5) we found a diverging epoch
  if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData || hasDivergingEpoch) {
    val fetchPartitionData = logReadResults.map { case (tp, result) =>
      val isReassignmentFetch = isFromFollower && isAddingReplica(tp, replicaId)
      tp -> result.toFetchPartitionData(isReassignmentFetch)
    }
    responseCallback(fetchPartitionData)
  } else {
    // construct the fetch results from the read results
    val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
    fetchInfos.foreach { case (topicPartition, partitionData) =>
      logReadResultMap.get(topicPartition).foreach(logReadResult => {
        val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
        fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
      })
    }
    val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
      fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
    val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
      responseCallback)

    // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
    val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }

    // try to complete the request immediately, otherwise put it into the purgatory;
    // this is because while the delayed fetch operation is being created, new requests
    // may arrive and hence make this operation completable.
    delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
  }
}

上面第一步会判断 follower 副本的状态,确定拉取数据的目标,然后会先从本地的磁盘里面去读取日志信息readFromLocalLog()。

/**
 * Read from multiple topic partitions at the given offset up to maxSize bytes
 */
def readFromLocalLog(replicaId: Int,
                     fetchOnlyFromLeader: Boolean,
                     fetchIsolation: FetchIsolation,
                     fetchMaxBytes: Int,
                     hardMaxBytesLimit: Boolean,
                     readPartitionInfo: Seq[(TopicPartition, PartitionData)],
                     quota: ReplicaQuota,
                     clientMetadata: Option[ClientMetadata]): Seq[(TopicPartition, LogReadResult)] = {
  val traceEnabled = isTraceEnabled

  def read(tp: TopicPartition, fetchInfo: PartitionData, limitBytes: Int, minOneMessage: Boolean): LogReadResult = {
    val offset = fetchInfo.fetchOffset
    val partitionFetchSize = fetchInfo.maxBytes
    val followerLogStartOffset = fetchInfo.logStartOffset

    val adjustedMaxBytes = math.min(fetchInfo.maxBytes, limitBytes)
    try {
      if (traceEnabled)
        trace(s"Fetching log segment for partition $tp, offset $offset, partition fetch size $partitionFetchSize, " +
          s"remaining response limit $limitBytes" +
          (if (minOneMessage) s", ignoring response/partition size limits" else ""))

      val partition = getPartitionOrException(tp)
      val fetchTimeMs = time.milliseconds

      // If we are the leader, determine the preferred read-replica
      val preferredReadReplica = clientMetadata.flatMap(
        metadata => findPreferredReadReplica(partition, metadata, replicaId, fetchInfo.fetchOffset, fetchTimeMs))

      if (preferredReadReplica.isDefined) {
        replicaSelectorOpt.foreach { selector =>
          debug(s"Replica selector ${selector.getClass.getSimpleName} returned preferred replica " +
            s"${preferredReadReplica.get} for $clientMetadata")
        }
        // If a preferred read-replica is set, skip the read
        val offsetSnapshot = partition.fetchOffsetSnapshot(fetchInfo.currentLeaderEpoch, fetchOnlyFromLeader = false)
        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
          divergingEpoch = None,
          highWatermark = offsetSnapshot.highWatermark.messageOffset,
          leaderLogStartOffset = offsetSnapshot.logStartOffset,
          leaderLogEndOffset = offsetSnapshot.logEndOffset.messageOffset,
          followerLogStartOffset = followerLogStartOffset,
          fetchTimeMs = -1L,
          lastStableOffset = Some(offsetSnapshot.lastStableOffset.messageOffset),
          preferredReadReplica = preferredReadReplica,
          exception = None)
      } else {
        // Try the read first, this tells us whether we need all of adjustedFetchSize for this partition
        val readInfo: LogReadInfo = partition.readRecords(
          lastFetchedEpoch = fetchInfo.lastFetchedEpoch,
          fetchOffset = fetchInfo.fetchOffset,
          currentLeaderEpoch = fetchInfo.currentLeaderEpoch,
          maxBytes = adjustedMaxBytes,
          fetchIsolation = fetchIsolation,
          fetchOnlyFromLeader = fetchOnlyFromLeader,
          minOneMessage = minOneMessage)

        val fetchDataInfo = if (shouldLeaderThrottle(quota, partition, replicaId)) {
          // If the partition is being throttled, simply return an empty set.
          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
        } else if (!hardMaxBytesLimit && readInfo.fetchedData.firstEntryIncomplete) {
          // For FetchRequest version 3, we replace incomplete message sets with an empty one as consumers can make
          // progress in such cases and don't need to report a `RecordTooLargeException`
          FetchDataInfo(readInfo.fetchedData.fetchOffsetMetadata, MemoryRecords.EMPTY)
        } else {
          readInfo.fetchedData
        }

        LogReadResult(info = fetchDataInfo,
          divergingEpoch = readInfo.divergingEpoch,
          highWatermark = readInfo.highWatermark,
          leaderLogStartOffset = readInfo.logStartOffset,
          leaderLogEndOffset = readInfo.logEndOffset,
          followerLogStartOffset = followerLogStartOffset,
          fetchTimeMs = fetchTimeMs,
          lastStableOffset = Some(readInfo.lastStableOffset),
          preferredReadReplica = preferredReadReplica,
          exception = None)
      }
    } catch {
      // NOTE: Failed fetch requests metric is not incremented for known exceptions since it
      // is supposed to indicate un-expected failure of a broker in handling a fetch request
      case e@ (_: UnknownTopicOrPartitionException |
               _: NotLeaderOrFollowerException |
               _: UnknownLeaderEpochException |
               _: FencedLeaderEpochException |
               _: ReplicaNotAvailableException |
               _: KafkaStorageException |
               _: OffsetOutOfRangeException) =>
        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
          divergingEpoch = None,
          highWatermark = Log.UnknownOffset,
          leaderLogStartOffset = Log.UnknownOffset,
          leaderLogEndOffset = Log.UnknownOffset,
          followerLogStartOffset = Log.UnknownOffset,
          fetchTimeMs = -1L,
          lastStableOffset = None,
          exception = Some(e))
      case e: Throwable =>
        brokerTopicStats.topicStats(tp.topic).failedFetchRequestRate.mark()
        brokerTopicStats.allTopicsStats.failedFetchRequestRate.mark()

        val fetchSource = Request.describeReplicaId(replicaId)
        error(s"Error processing fetch with max size $adjustedMaxBytes from $fetchSource " +
          s"on partition $tp: $fetchInfo", e)

        LogReadResult(info = FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MemoryRecords.EMPTY),
          divergingEpoch = None,
          highWatermark = Log.UnknownOffset,
          leaderLogStartOffset = Log.UnknownOffset,
          leaderLogEndOffset = Log.UnknownOffset,
          followerLogStartOffset = Log.UnknownOffset,
          fetchTimeMs = -1L,
          lastStableOffset = None,
          exception = Some(e))
    }
  }

  var limitBytes = fetchMaxBytes
  val result = new mutable.ArrayBuffer[(TopicPartition, LogReadResult)]
  var minOneMessage = !hardMaxBytesLimit
  readPartitionInfo.foreach { case (tp, fetchInfo) =>
    val readResult = read(tp, fetchInfo, limitBytes, minOneMessage)
    val recordBatchSize = readResult.info.records.sizeInBytes
    // Once we read from a non-empty partition, we stop ignoring request and partition level size limits
    if (recordBatchSize > 0)
      minOneMessage = false
    limitBytes = math.max(0, limitBytes - recordBatchSize)
    result += (tp -> readResult)
  }
  result
}

readFromLocalLog()方法首先会通过传入的fetchOnlyFromLeader和readOnlyCommitted参数确定拉取分区和可拉取消息的最大偏移量,然后通过分区的Log对象的read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None):FetchDataInfo方法读取数据,读取的数据会被包装为LogReadResult对象进行返回。接下来的操作就交给上层的fetchMessages()方法了,如果发起拉取请求的是Follower副本,会调用updateFollowerFetchState ()方法:

/**
 * Update the follower's state in the leader based on the last fetch request. See
 * [[Replica.updateFetchState()]] for details.
 *
 * @return true if the follower's fetch state was updated, false if the followerId is not recognized
 */
def updateFollowerFetchState(followerId: Int,
                             followerFetchOffsetMetadata: LogOffsetMetadata,
                             followerStartOffset: Long,
                             followerFetchTimeMs: Long,
                             leaderEndOffset: Long): Boolean = {
  getReplica(followerId) match {
    case Some(followerReplica) =>
      // No need to calculate low watermark if there is no delayed DeleteRecordsRequest
      val oldLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
      val prevFollowerEndOffset = followerReplica.logEndOffset
      followerReplica.updateFetchState(
        followerFetchOffsetMetadata,
        followerStartOffset,
        followerFetchTimeMs,
        leaderEndOffset)

      val newLeaderLW = if (delayedOperations.numDelayedDelete > 0) lowWatermarkIfLeader else -1L
      // check if the LW of the partition has incremented
      // since the replica's logStartOffset may have incremented
      val leaderLWIncremented = newLeaderLW > oldLeaderLW

      // Check if this in-sync replica needs to be added to the ISR.
      maybeExpandIsr(followerReplica, followerFetchTimeMs)

      // check if the HW of the partition can now be incremented
      // since the replica may already be in the ISR and its LEO has just incremented
      val leaderHWIncremented = if (prevFollowerEndOffset != followerReplica.logEndOffset) {
        // the leader log may be updated by ReplicaAlterLogDirsThread so the following method must be in lock of
        // leaderIsrUpdateLock to prevent adding new hw to invalid log.
        inReadLock(leaderIsrUpdateLock) {
          leaderLogIfLocal.exists(leaderLog => maybeIncrementLeaderHW(leaderLog, followerFetchTimeMs))
        }
      } else {
        false
      }

      // some delayed operations may be unblocked after HW or LW changed
      if (leaderLWIncremented || leaderHWIncremented)
        tryCompleteDelayedRequests()

      debug(s"Recorded replica $followerId log end offset (LEO) position " +
        s"${followerFetchOffsetMetadata.messageOffset} and log start offset $followerStartOffset.")
      true

    case None =>
      false
  }
}

更新完 replica 的 LEO 之后,会针对 replica 的状态决定是否将 replica 放入到 ISR 中,因为 replica 是存在滞后的情况,如果滞后超过阈值,则排除在 ISR 之外,这个逻辑在 maybeExpandIsr()处理。

/**
 * Check and maybe expand the ISR of the partition.
 * A replica will be added to ISR if its LEO >= current hw of the partition and it is caught up to
 * an offset within the current leader epoch. A replica must be caught up to the current leader
 * epoch before it can join ISR, because otherwise, if there is committed data between current
 * leader's HW and LEO, the replica may become the leader before it fetches the committed data
 * and the data will be lost.
 *
 * Technically, a replica shouldn't be in ISR if it hasn't caught up for longer than replicaLagTimeMaxMs,
 * even if its log end offset is >= HW. However, to be consistent with how the follower determines
 * whether a replica is in-sync, we only check HW.
 *
 * This function can be triggered when a replica's LEO has incremented.
 */
private def maybeExpandIsr(followerReplica: Replica, followerFetchTimeMs: Long): Unit = {
  val needsIsrUpdate = canAddReplicaToIsr(followerReplica.brokerId) && inReadLock(leaderIsrUpdateLock) {
    needsExpandIsr(followerReplica)
  }
  if (needsIsrUpdate) {
    inWriteLock(leaderIsrUpdateLock) {
      // check if this replica needs to be added to the ISR
      if (needsExpandIsr(followerReplica)) {
        expandIsr(followerReplica.brokerId)
      }
    }
  }
}
文章来自个人专栏
云组件
16 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0