searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

alluxio中的Inode锁、Block锁介绍

2024-06-04 09:07:04
33
0
本篇文章重点分析alluxio中两种主要的锁实现:

1)InodeLockManager:用于创建Inode文件锁,并针对不用的锁进行垃圾回收。

2)BlockLockManager:创建worker访问Block锁,并提供针对锁的回收。

1、InodeLockManager

InodeLockManagher主要用于管理 Alluxio 文件系统的 inode 锁定。它维护了两个不同类型的锁池(LockPool):mInodeLocks 用于 inode 锁,mEdgeLocks 用于边锁。这些锁用于确保在多线程环境下对 inode 和边的并发访问操作是安全的。

该类还包括以下功能:

  1. 对 inode 执行读锁或写锁,可以使用 lockInode 方法,根据需要选择使用 LockModeuseTryLock 参数。这些锁用于管理 inode 的并发访问。
  2. 对边执行读锁或写锁,可以使用 lockEdge 方法,根据需要选择使用 LockModeuseTryLock 参数。这些锁用于管理文件系统中 inode 之间的关系。
  3. 获取持久化锁以确保对 inode 的持久化操作是线程安全的,可以使用 tryAcquirePersistingLock 方法。这个锁确保只有一个线程可以同时持久化一个 inode。
  4. 锁定父 inode 的最后修改时间和大小,以确保在重命名、创建或删除操作期间不会发生并发问题。
  5. 包含一些用于测试和调试的方法,例如 inodeReadLockedByCurrentThreadassertAllLocksReleased

这个类是 Alluxio 文件系统中用于维护并发访问的关键组件之一,确保了多线程环境下的线程安全性。

属性:

1private final LockPool<Long> mInodeLocks(节点锁)

Inode锁池,其底层实现基于LockPool类。Inode锁定之前必须在该池中先尝试获取该Inode的读锁,可以回收不用的Inode锁。池中保存的值为Inode ID,类型为long。

(2)private final LockPool<Edge> mEdgeLocks(边锁)

Inode边(A -> B)之间的锁池。其类似于Inode锁池(底层实现均基于LockPool类)。池中保存的值为Edge类型,Edge类包含两个属性:mId、mName。

其中,mId属性为父Inode(A)的Inode ID值,mName属性为子Inode(B)的节点名称。

(3)private final Striped<Lock> mParentUpdateLocks(更新锁)

更新锁池,用于保护已被读取锁定的父Inode中的last modified time和size的更改的锁。其使用了guava中的Striped类用于保存这些锁池信息。

(4)private final LoadingCache<Long, AtomicBoolean> mPersistingLocks

持久化锁池,用于保存持久化到UFS中的Inode的锁。底层采用guava中cache实现,用于保存锁信息。其中键为Long类型的Inode ID,值为AtomicBoolean类型的参数,用于描述该Inode是否正在被持久化。

方法:

方法引用

说明

boolean inodeReadLockedByCurrentThread(long inodeId)

检查当前线程是否持有Inode的读锁

boolean inodeWriteLockedByCurrentThread(long inodeId)

检查当前线程是否持有Inode的写锁

boolean edgeReadLockedByCurrentThread(Edge edge)

检查当前线程是否持有Edge的读锁

boolean edgeWriteLockedByCurrentThread(Edge edge)

检查当前线程是否持有Edge的写锁

public void assertAllLocksReleased()

确认当前所有锁是否被释放

public RWLockResource lockInode(InodeView inode, LockMode mode, boolean useTryLock)

获取Inode锁

public RWLockResource lockInode(Long inodeId, LockMode mode)

基于Lock#lock方法获取锁(强制获取锁)

public Optional<RWLockResource> tryLockInode(Long inodeId, LockMode mode)

尝试去获取Inode锁,存在无法获取Inode锁的可能

public RWLockResource lockEdge(Edge edge, LockMode mode, boolean useTryLock)

获取Edge锁

public Optional<RWLockResource> tryLockEdge(Edge edge, LockMode mode)

尝试去获取Edge锁,存在无法获取Edge锁的可能

public Optional<Scoped> tryAcquirePersistingLock(long inodeId)

尝试获取锁,用于持久化指定的Inode

public LockResource lockUpdate(long inodeId)

获取锁,用于修改Inode的最后修改时间、大小等信息时。调用该方法的前提条件:需要先获取该Inode的读锁。

public void close()

关闭锁池

下面重点分析Inode锁的获取方法:

方法入口: public RWLockResource lockInode(InodeView inode, LockMode mode, boolean useTryLock)。用于获取Inode的读/写锁,底层调用alluxio.collections.LockPool#get(K, alluxio.concurrent.LockMode, boolean)

public RWLockResource get(K key, LockMode mode, boolean useTryLock) {
    //step 1: 调用本地alluxio.collections.LockPool#getResource
    Resource resource = getResource(key);  
    //step 2: 调用远程alluxio.resource.RefCountLockResource#RefCountLockResource的构造器方法,用于初始化该类
    return new RefCountLockResource(resource.mLock, mode, true, resource.mRefCount, useTryLock);  
}

其中第一步实现的思路比较直接,直接查询并更新ConcurrentHashMap中的对应key的resource 值,用于直接复用池中的对象。若没有该对象,则新建一个。

第二步的最终执行在RefCountLockResource的父类LockResource中(继承关系为:RefCountLockResource >> RWLockResource >> LockResource),初始化方法为lock,最终执行ReentrantReadWriteLock.lock( )方法

其中在LockPool层面实现了引用计数器,当超过pool的高水位时,唤醒回收线程(Evictor)释放引用计数为0的读/写锁

2、BlockLockManager

Alluxio 中的 BlockLockManager 类,用于处理worker读取底层数据块锁(block locks)。它提供了获取、释放、清理块锁的功能,并确保在并发环境下不会出现竞争条件。

属性:

  1. mLockPool - ResourcePool<ClientRWLock>:
    • 用于管理块锁的资源池。
    • 说明:该资源池用于维护可重用的块锁,以限制锁的数量。块锁在获取和释放时会从该池中进行管理。
  1. mLocks - ConcurrentHashMap<Long, ClientRWLock>:
    • 从块ID到块锁的映射。
    • 说明:这个并发哈希映射用于将块ID与相应的块锁关联起来,以便管理和共享块锁。
  1. mSessionCleaning - Semaphore:
    • 用于控制会话清理操作的信号量。
    • 说明:这个信号量用于确保在进行会话清理操作时,不会并发添加新的锁记录。会话清理是为了移除特定会话持有的锁记录,以确保数据的一致性。
  1. mLockRecords - IndexedSet<LockRecord>:
    • 记录当前由客户端持有的锁。
    • 说明:这是一个用于记录当前由客户端持有的锁的数据结构。新的锁记录将被添加到此集合,并在会话清理操作期间受到互斥保护。它使用索引以支持快速查询和检索记录。
  1. LOCK_ID_GEN - AtomicLong:
    • 用于生成唯一的锁ID。
    • 说明:该原子长整型用于生成唯一的锁ID,以确保锁的唯一性。
  1. MAX_READERS - int:
    • 最大允许的读者数。
    • 说明:指定在锁池中的每个块锁上允许的最大读者数。这有助于控制并发访问。

以上属性是BlockLockManager类中的主要成员,它们在管理块锁和维护锁记录时起着关键作用。这些属性用于确保在多线程环境中对块的访问是安全和有序的。

方法:

  1. acquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType): BlockLock
    • 获取指定会话对指定块的锁。返回一个BlockLock对象,允许稍后释放锁。
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
      • blockLockType:块锁类型,可以是读锁或写锁
  1. tryAcquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType, long time, TimeUnit unit): Optional<BlockLock>
    • 尝试在给定的时间内获取指定会话对指定块的锁。如果在给定时间内无法获取锁,返回一个空的Optional
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
      • blockLockType:块锁类型,可以是读锁或写锁
      • time:最大等待时间
      • unit:时间单位
  1. lockBlockInternal(long sessionId, long blockId, BlockLockType blockLockType, boolean blocking, Long time, TimeUnit unit): OptionalLong
    • 内部方法,用于实现块锁的获取逻辑。支持阻塞和非阻塞锁获取,检查是否会话已经持有了块的锁。
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
      • blockLockType:块锁类型,可以是读锁或写锁
      • blocking:是否阻塞等待获取锁
      • time:最大等待时间(仅在非阻塞情况下使用)
      • unit:时间单位(仅在非阻塞情况下使用)
  1. sessionHoldsLock(long sessionId, long blockId): boolean
    • 检查特定会话是否持有指定块的锁。
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
    • 返回值:如果会话持有指定块的锁,则返回true;否则返回false
  1. cleanupSession(long sessionId): void
    • 清理特定会话持有的锁。它会删除与会话关联的锁记录并释放相应的块锁。
    • 参数:
      • sessionId:会话ID
  1. getLockedBlocks(): Set<Long>
    • 获取当前已锁定的块的快照,用于监控已锁定块的状态。
    • 返回值:包含已锁定块ID的集合
  1. unlockBlock(long lockId): void
    • 释放具有指定锁ID的锁。它查找锁记录,删除它,并解锁关联的块,如果锁不再使用。
    • 参数:
      • lockId:锁ID
  1. getBlockLock(long blockId): ClientRWLock
    • 获取指定块ID的块锁,如果不存在则获取一个。如果所有块锁已分配完毕,此方法将等待直到可以从锁池中获取一个。
    • 参数:
      • blockId:块ID
    • 返回值:块锁对象
  1. releaseBlockLockIfUnused(long blockId): void
    • 检查块锁是否不再使用,如果不再使用则释放它,将其返回到锁池。
    • 参数:
      • blockId:块ID
  1. validate(): void
    • 用于验证BlockLockManager内部状态的方法,通常用于测试目的。它检查块锁的引用计数是否与锁记录计数相匹配。
    • 说明:此方法在验证类的内部状态时使用,确保所有不变量都得到满足。

这些方法共同用于管理块的锁、确保数据的一致性、控制并发访问,并支持块锁的获取和释放。

下面重点分析其中的加锁方法

alluxio.worker.block.BlockLockManager#tryAcquireBlockLock:

/**
sessionId:会话ID
blockId:块ID
blockLockType:块锁类型(读锁 or 写锁)
time:时间
unit:时间单位。与time一起,用于表示在指定时间获取锁,超时返回null
*/
  public Optional<BlockLock> tryAcquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType, long time, TimeUnit unit) {
    //获取锁ID,其中false为是否阻塞获取锁,time&unit表示在指定的时间内获取锁
    OptionalLong lockId = lockBlockInternal(sessionId, blockId, blockLockType, false, time, unit);
    //返回Block锁对象
    return lockId.isPresent() ? Optional.of(new BlockLock(lockId.getAsLong(), this::unlockBlock)) : Optional.empty();
  }

其中,获取锁ID为重点步骤,调用的方法位于

alluxio.worker.block.BlockLockManager#lockBlockInternal:

  private OptionalLong lockBlockInternal(long sessionId, long blockId, BlockLockType blockLockType, boolean blocking, @Nullable Long time, @Nullable TimeUnit unit) {
    //获取客户端锁对象,重点!!!
    ClientRWLock blockLock = getBlockLock(blockId);
    //获取锁类型
    Lock lock = blockLockType == BlockLockType.READ ? blockLock.readLock() : blockLock.writeLock();   
    //当需要获取写锁且当前会话已获取锁时(即写锁需要与其他读写锁互斥),抛出异常
    if (blockLockType == BlockLockType.WRITE && sessionHoldsLock(sessionId, blockId)) {
      ...
      //抛出异常
    }
    if (blocking) {  //是否使用lock函数,阻塞获取锁
      lock.lock();
    } else {
      ...
      try {
        if (!lock.tryLock(time, unit)) {
          ...
          return OptionalLong.empty();
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return OptionalLong.empty();
      }
    }
    long lockId = LOCK_ID_GEN.getAndIncrement();
    LockRecord record = new LockRecord(sessionId, blockId, lockId, lock);

    //记录(锁、会话、block)的关联信息到mLockRecords记录中,此处通过设置信号量互斥执行
    try {
      try {
        mSessionCleaning.acquire();  //获取互斥锁
      } catch (InterruptedException e) {
        ...
      }
      mLockRecords.add(record);
      mSessionCleaning.release();
      return OptionalLong.of(lockId);
    } catch (Throwable e) {
      ...
    }
  }

下面重点看下获取Block锁的方法

alluxio.worker.block.BlockLockManager#getBlockLock:

  private ClientRWLock getBlockLock(long blockId) {
    //循环获取block锁
    while (true) {
      //在mLocks缓存中检查是否存在该block锁。如果有的话,可以直接复用
      ClientRWLock reuseExistingLock = mLocks.computeIfPresent(
          blockId, (blkid, lock) -> {
            lock.addReference();
            return lock;
          }
      );
      if (reuseExistingLock != null) {
        return reuseExistingLock;
      }
      //缓存没有的话,新建一个客户端锁对象ClientRWLock
      ClientRWLock newlyAcquiredLock = mLockPool.acquire(1, TimeUnit.SECONDS);
      //如果新建的锁对象不为null的话,继续执行如下代码;如果锁对象为null,则执行循环体
      if (newlyAcquiredLock != null) {
        int referenceCount = newlyAcquiredLock.getReferenceCount();
        if (referenceCount != 0) {
          ... //打印异常日志
        }
        ClientRWLock computed = mLocks.compute(blockId, (id, lock) -> {
          if (lock != null) {  //该block锁不为null时
            lock.addReference();
            return lock;
          } else {  //该block锁为null时,递增引用计数,返回新创建的block锁对象
            newlyAcquiredLock.addReference();
            return newlyAcquiredLock;
          }
        });
        if (computed != newlyAcquiredLock) { //当mLocks存在该锁时,将新锁释放到mLockPool中
          mLockPool.release(newlyAcquiredLock);
        }
        return computed;
      }
    }
  }
0条评论
作者已关闭评论
KnightChen
3文章数
0粉丝数
KnightChen
3 文章 | 0 粉丝
KnightChen
3文章数
0粉丝数
KnightChen
3 文章 | 0 粉丝
原创

alluxio中的Inode锁、Block锁介绍

2024-06-04 09:07:04
33
0
本篇文章重点分析alluxio中两种主要的锁实现:

1)InodeLockManager:用于创建Inode文件锁,并针对不用的锁进行垃圾回收。

2)BlockLockManager:创建worker访问Block锁,并提供针对锁的回收。

1、InodeLockManager

InodeLockManagher主要用于管理 Alluxio 文件系统的 inode 锁定。它维护了两个不同类型的锁池(LockPool):mInodeLocks 用于 inode 锁,mEdgeLocks 用于边锁。这些锁用于确保在多线程环境下对 inode 和边的并发访问操作是安全的。

该类还包括以下功能:

  1. 对 inode 执行读锁或写锁,可以使用 lockInode 方法,根据需要选择使用 LockModeuseTryLock 参数。这些锁用于管理 inode 的并发访问。
  2. 对边执行读锁或写锁,可以使用 lockEdge 方法,根据需要选择使用 LockModeuseTryLock 参数。这些锁用于管理文件系统中 inode 之间的关系。
  3. 获取持久化锁以确保对 inode 的持久化操作是线程安全的,可以使用 tryAcquirePersistingLock 方法。这个锁确保只有一个线程可以同时持久化一个 inode。
  4. 锁定父 inode 的最后修改时间和大小,以确保在重命名、创建或删除操作期间不会发生并发问题。
  5. 包含一些用于测试和调试的方法,例如 inodeReadLockedByCurrentThreadassertAllLocksReleased

这个类是 Alluxio 文件系统中用于维护并发访问的关键组件之一,确保了多线程环境下的线程安全性。

属性:

1private final LockPool<Long> mInodeLocks(节点锁)

Inode锁池,其底层实现基于LockPool类。Inode锁定之前必须在该池中先尝试获取该Inode的读锁,可以回收不用的Inode锁。池中保存的值为Inode ID,类型为long。

(2)private final LockPool<Edge> mEdgeLocks(边锁)

Inode边(A -> B)之间的锁池。其类似于Inode锁池(底层实现均基于LockPool类)。池中保存的值为Edge类型,Edge类包含两个属性:mId、mName。

其中,mId属性为父Inode(A)的Inode ID值,mName属性为子Inode(B)的节点名称。

(3)private final Striped<Lock> mParentUpdateLocks(更新锁)

更新锁池,用于保护已被读取锁定的父Inode中的last modified time和size的更改的锁。其使用了guava中的Striped类用于保存这些锁池信息。

(4)private final LoadingCache<Long, AtomicBoolean> mPersistingLocks

持久化锁池,用于保存持久化到UFS中的Inode的锁。底层采用guava中cache实现,用于保存锁信息。其中键为Long类型的Inode ID,值为AtomicBoolean类型的参数,用于描述该Inode是否正在被持久化。

方法:

方法引用

说明

boolean inodeReadLockedByCurrentThread(long inodeId)

检查当前线程是否持有Inode的读锁

boolean inodeWriteLockedByCurrentThread(long inodeId)

检查当前线程是否持有Inode的写锁

boolean edgeReadLockedByCurrentThread(Edge edge)

检查当前线程是否持有Edge的读锁

boolean edgeWriteLockedByCurrentThread(Edge edge)

检查当前线程是否持有Edge的写锁

public void assertAllLocksReleased()

确认当前所有锁是否被释放

public RWLockResource lockInode(InodeView inode, LockMode mode, boolean useTryLock)

获取Inode锁

public RWLockResource lockInode(Long inodeId, LockMode mode)

基于Lock#lock方法获取锁(强制获取锁)

public Optional<RWLockResource> tryLockInode(Long inodeId, LockMode mode)

尝试去获取Inode锁,存在无法获取Inode锁的可能

public RWLockResource lockEdge(Edge edge, LockMode mode, boolean useTryLock)

获取Edge锁

public Optional<RWLockResource> tryLockEdge(Edge edge, LockMode mode)

尝试去获取Edge锁,存在无法获取Edge锁的可能

public Optional<Scoped> tryAcquirePersistingLock(long inodeId)

尝试获取锁,用于持久化指定的Inode

public LockResource lockUpdate(long inodeId)

获取锁,用于修改Inode的最后修改时间、大小等信息时。调用该方法的前提条件:需要先获取该Inode的读锁。

public void close()

关闭锁池

下面重点分析Inode锁的获取方法:

方法入口: public RWLockResource lockInode(InodeView inode, LockMode mode, boolean useTryLock)。用于获取Inode的读/写锁,底层调用alluxio.collections.LockPool#get(K, alluxio.concurrent.LockMode, boolean)

public RWLockResource get(K key, LockMode mode, boolean useTryLock) {
    //step 1: 调用本地alluxio.collections.LockPool#getResource
    Resource resource = getResource(key);  
    //step 2: 调用远程alluxio.resource.RefCountLockResource#RefCountLockResource的构造器方法,用于初始化该类
    return new RefCountLockResource(resource.mLock, mode, true, resource.mRefCount, useTryLock);  
}

其中第一步实现的思路比较直接,直接查询并更新ConcurrentHashMap中的对应key的resource 值,用于直接复用池中的对象。若没有该对象,则新建一个。

第二步的最终执行在RefCountLockResource的父类LockResource中(继承关系为:RefCountLockResource >> RWLockResource >> LockResource),初始化方法为lock,最终执行ReentrantReadWriteLock.lock( )方法

其中在LockPool层面实现了引用计数器,当超过pool的高水位时,唤醒回收线程(Evictor)释放引用计数为0的读/写锁

2、BlockLockManager

Alluxio 中的 BlockLockManager 类,用于处理worker读取底层数据块锁(block locks)。它提供了获取、释放、清理块锁的功能,并确保在并发环境下不会出现竞争条件。

属性:

  1. mLockPool - ResourcePool<ClientRWLock>:
    • 用于管理块锁的资源池。
    • 说明:该资源池用于维护可重用的块锁,以限制锁的数量。块锁在获取和释放时会从该池中进行管理。
  1. mLocks - ConcurrentHashMap<Long, ClientRWLock>:
    • 从块ID到块锁的映射。
    • 说明:这个并发哈希映射用于将块ID与相应的块锁关联起来,以便管理和共享块锁。
  1. mSessionCleaning - Semaphore:
    • 用于控制会话清理操作的信号量。
    • 说明:这个信号量用于确保在进行会话清理操作时,不会并发添加新的锁记录。会话清理是为了移除特定会话持有的锁记录,以确保数据的一致性。
  1. mLockRecords - IndexedSet<LockRecord>:
    • 记录当前由客户端持有的锁。
    • 说明:这是一个用于记录当前由客户端持有的锁的数据结构。新的锁记录将被添加到此集合,并在会话清理操作期间受到互斥保护。它使用索引以支持快速查询和检索记录。
  1. LOCK_ID_GEN - AtomicLong:
    • 用于生成唯一的锁ID。
    • 说明:该原子长整型用于生成唯一的锁ID,以确保锁的唯一性。
  1. MAX_READERS - int:
    • 最大允许的读者数。
    • 说明:指定在锁池中的每个块锁上允许的最大读者数。这有助于控制并发访问。

以上属性是BlockLockManager类中的主要成员,它们在管理块锁和维护锁记录时起着关键作用。这些属性用于确保在多线程环境中对块的访问是安全和有序的。

方法:

  1. acquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType): BlockLock
    • 获取指定会话对指定块的锁。返回一个BlockLock对象,允许稍后释放锁。
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
      • blockLockType:块锁类型,可以是读锁或写锁
  1. tryAcquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType, long time, TimeUnit unit): Optional<BlockLock>
    • 尝试在给定的时间内获取指定会话对指定块的锁。如果在给定时间内无法获取锁,返回一个空的Optional
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
      • blockLockType:块锁类型,可以是读锁或写锁
      • time:最大等待时间
      • unit:时间单位
  1. lockBlockInternal(long sessionId, long blockId, BlockLockType blockLockType, boolean blocking, Long time, TimeUnit unit): OptionalLong
    • 内部方法,用于实现块锁的获取逻辑。支持阻塞和非阻塞锁获取,检查是否会话已经持有了块的锁。
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
      • blockLockType:块锁类型,可以是读锁或写锁
      • blocking:是否阻塞等待获取锁
      • time:最大等待时间(仅在非阻塞情况下使用)
      • unit:时间单位(仅在非阻塞情况下使用)
  1. sessionHoldsLock(long sessionId, long blockId): boolean
    • 检查特定会话是否持有指定块的锁。
    • 参数:
      • sessionId:会话ID
      • blockId:块ID
    • 返回值:如果会话持有指定块的锁,则返回true;否则返回false
  1. cleanupSession(long sessionId): void
    • 清理特定会话持有的锁。它会删除与会话关联的锁记录并释放相应的块锁。
    • 参数:
      • sessionId:会话ID
  1. getLockedBlocks(): Set<Long>
    • 获取当前已锁定的块的快照,用于监控已锁定块的状态。
    • 返回值:包含已锁定块ID的集合
  1. unlockBlock(long lockId): void
    • 释放具有指定锁ID的锁。它查找锁记录,删除它,并解锁关联的块,如果锁不再使用。
    • 参数:
      • lockId:锁ID
  1. getBlockLock(long blockId): ClientRWLock
    • 获取指定块ID的块锁,如果不存在则获取一个。如果所有块锁已分配完毕,此方法将等待直到可以从锁池中获取一个。
    • 参数:
      • blockId:块ID
    • 返回值:块锁对象
  1. releaseBlockLockIfUnused(long blockId): void
    • 检查块锁是否不再使用,如果不再使用则释放它,将其返回到锁池。
    • 参数:
      • blockId:块ID
  1. validate(): void
    • 用于验证BlockLockManager内部状态的方法,通常用于测试目的。它检查块锁的引用计数是否与锁记录计数相匹配。
    • 说明:此方法在验证类的内部状态时使用,确保所有不变量都得到满足。

这些方法共同用于管理块的锁、确保数据的一致性、控制并发访问,并支持块锁的获取和释放。

下面重点分析其中的加锁方法

alluxio.worker.block.BlockLockManager#tryAcquireBlockLock:

/**
sessionId:会话ID
blockId:块ID
blockLockType:块锁类型(读锁 or 写锁)
time:时间
unit:时间单位。与time一起,用于表示在指定时间获取锁,超时返回null
*/
  public Optional<BlockLock> tryAcquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType, long time, TimeUnit unit) {
    //获取锁ID,其中false为是否阻塞获取锁,time&unit表示在指定的时间内获取锁
    OptionalLong lockId = lockBlockInternal(sessionId, blockId, blockLockType, false, time, unit);
    //返回Block锁对象
    return lockId.isPresent() ? Optional.of(new BlockLock(lockId.getAsLong(), this::unlockBlock)) : Optional.empty();
  }

其中,获取锁ID为重点步骤,调用的方法位于

alluxio.worker.block.BlockLockManager#lockBlockInternal:

  private OptionalLong lockBlockInternal(long sessionId, long blockId, BlockLockType blockLockType, boolean blocking, @Nullable Long time, @Nullable TimeUnit unit) {
    //获取客户端锁对象,重点!!!
    ClientRWLock blockLock = getBlockLock(blockId);
    //获取锁类型
    Lock lock = blockLockType == BlockLockType.READ ? blockLock.readLock() : blockLock.writeLock();   
    //当需要获取写锁且当前会话已获取锁时(即写锁需要与其他读写锁互斥),抛出异常
    if (blockLockType == BlockLockType.WRITE && sessionHoldsLock(sessionId, blockId)) {
      ...
      //抛出异常
    }
    if (blocking) {  //是否使用lock函数,阻塞获取锁
      lock.lock();
    } else {
      ...
      try {
        if (!lock.tryLock(time, unit)) {
          ...
          return OptionalLong.empty();
        }
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
        return OptionalLong.empty();
      }
    }
    long lockId = LOCK_ID_GEN.getAndIncrement();
    LockRecord record = new LockRecord(sessionId, blockId, lockId, lock);

    //记录(锁、会话、block)的关联信息到mLockRecords记录中,此处通过设置信号量互斥执行
    try {
      try {
        mSessionCleaning.acquire();  //获取互斥锁
      } catch (InterruptedException e) {
        ...
      }
      mLockRecords.add(record);
      mSessionCleaning.release();
      return OptionalLong.of(lockId);
    } catch (Throwable e) {
      ...
    }
  }

下面重点看下获取Block锁的方法

alluxio.worker.block.BlockLockManager#getBlockLock:

  private ClientRWLock getBlockLock(long blockId) {
    //循环获取block锁
    while (true) {
      //在mLocks缓存中检查是否存在该block锁。如果有的话,可以直接复用
      ClientRWLock reuseExistingLock = mLocks.computeIfPresent(
          blockId, (blkid, lock) -> {
            lock.addReference();
            return lock;
          }
      );
      if (reuseExistingLock != null) {
        return reuseExistingLock;
      }
      //缓存没有的话,新建一个客户端锁对象ClientRWLock
      ClientRWLock newlyAcquiredLock = mLockPool.acquire(1, TimeUnit.SECONDS);
      //如果新建的锁对象不为null的话,继续执行如下代码;如果锁对象为null,则执行循环体
      if (newlyAcquiredLock != null) {
        int referenceCount = newlyAcquiredLock.getReferenceCount();
        if (referenceCount != 0) {
          ... //打印异常日志
        }
        ClientRWLock computed = mLocks.compute(blockId, (id, lock) -> {
          if (lock != null) {  //该block锁不为null时
            lock.addReference();
            return lock;
          } else {  //该block锁为null时,递增引用计数,返回新创建的block锁对象
            newlyAcquiredLock.addReference();
            return newlyAcquiredLock;
          }
        });
        if (computed != newlyAcquiredLock) { //当mLocks存在该锁时,将新锁释放到mLockPool中
          mLockPool.release(newlyAcquiredLock);
        }
        return computed;
      }
    }
  }
文章来自个人专栏
大数据探索
3 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
2
1