1)InodeLockManager:用于创建Inode文件锁,并针对不用的锁进行垃圾回收。
2)BlockLockManager:创建worker访问Block锁,并提供针对锁的回收。
1、InodeLockManager
InodeLockManagher主要用于管理 Alluxio 文件系统的 inode 锁定。它维护了两个不同类型的锁池(LockPool):mInodeLocks 用于 inode 锁,mEdgeLocks 用于边锁。这些锁用于确保在多线程环境下对 inode 和边的并发访问操作是安全的。
该类还包括以下功能:
- 对 inode 执行读锁或写锁,可以使用 lockInode 方法,根据需要选择使用 LockMode 和 useTryLock 参数。这些锁用于管理 inode 的并发访问。
- 对边执行读锁或写锁,可以使用 lockEdge 方法,根据需要选择使用 LockMode 和 useTryLock 参数。这些锁用于管理文件系统中 inode 之间的关系。
- 获取持久化锁以确保对 inode 的持久化操作是线程安全的,可以使用 tryAcquirePersistingLock 方法。这个锁确保只有一个线程可以同时持久化一个 inode。
- 锁定父 inode 的最后修改时间和大小,以确保在重命名、创建或删除操作期间不会发生并发问题。
- 包含一些用于测试和调试的方法,例如 inodeReadLockedByCurrentThread 和 assertAllLocksReleased。
这个类是 Alluxio 文件系统中用于维护并发访问的关键组件之一,确保了多线程环境下的线程安全性。
属性:
(1)private final LockPool<Long> mInodeLocks(节点锁)
Inode锁池,其底层实现基于LockPool类。Inode锁定之前必须在该池中先尝试获取该Inode的读锁,可以回收不用的Inode锁。池中保存的值为Inode ID,类型为long。
(2)private final LockPool<Edge> mEdgeLocks(边锁)
Inode边(A -> B)之间的锁池。其类似于Inode锁池(底层实现均基于LockPool类)。池中保存的值为Edge类型,Edge类包含两个属性:mId、mName。
其中,mId属性为父Inode(A)的Inode ID值,mName属性为子Inode(B)的节点名称。
(3)private final Striped<Lock> mParentUpdateLocks(更新锁)
更新锁池,用于保护已被读取锁定的父Inode中的last modified time和size的更改的锁。其使用了guava中的Striped类用于保存这些锁池信息。
(4)private final LoadingCache<Long, AtomicBoolean> mPersistingLocks
持久化锁池,用于保存持久化到UFS中的Inode的锁。底层采用guava中cache实现,用于保存锁信息。其中键为Long类型的Inode ID,值为AtomicBoolean类型的参数,用于描述该Inode是否正在被持久化。
方法:
方法引用 |
说明 |
boolean inodeReadLockedByCurrentThread(long inodeId) |
检查当前线程是否持有Inode的读锁 |
boolean inodeWriteLockedByCurrentThread(long inodeId) |
检查当前线程是否持有Inode的写锁 |
boolean edgeReadLockedByCurrentThread(Edge edge) |
检查当前线程是否持有Edge的读锁 |
boolean edgeWriteLockedByCurrentThread(Edge edge) |
检查当前线程是否持有Edge的写锁 |
public void assertAllLocksReleased() |
确认当前所有锁是否被释放 |
public RWLockResource lockInode(InodeView inode, LockMode mode, boolean useTryLock) |
获取Inode锁 |
public RWLockResource lockInode(Long inodeId, LockMode mode) |
基于Lock#lock方法获取锁(强制获取锁) |
public Optional<RWLockResource> tryLockInode(Long inodeId, LockMode mode) |
尝试去获取Inode锁,存在无法获取Inode锁的可能 |
public RWLockResource lockEdge(Edge edge, LockMode mode, boolean useTryLock) |
获取Edge锁 |
public Optional<RWLockResource> tryLockEdge(Edge edge, LockMode mode) |
尝试去获取Edge锁,存在无法获取Edge锁的可能 |
public Optional<Scoped> tryAcquirePersistingLock(long inodeId) |
尝试获取锁,用于持久化指定的Inode |
public LockResource lockUpdate(long inodeId) |
获取锁,用于修改Inode的最后修改时间、大小等信息时。调用该方法的前提条件:需要先获取该Inode的读锁。 |
public void close() |
关闭锁池 |
下面重点分析Inode锁的获取方法:
方法入口: public RWLockResource lockInode(InodeView inode, LockMode mode, boolean useTryLock)。用于获取Inode的读/写锁,底层调用alluxio.collections.LockPool#get(K, alluxio.concurrent.LockMode, boolean)
public RWLockResource get(K key, LockMode mode, boolean useTryLock) {
//step 1: 调用本地alluxio.collections.LockPool#getResource
Resource resource = getResource(key);
//step 2: 调用远程alluxio.resource.RefCountLockResource#RefCountLockResource的构造器方法,用于初始化该类
return new RefCountLockResource(resource.mLock, mode, true, resource.mRefCount, useTryLock);
}
其中第一步实现的思路比较直接,直接查询并更新ConcurrentHashMap中的对应key的resource 值,用于直接复用池中的对象。若没有该对象,则新建一个。
第二步的最终执行在RefCountLockResource的父类LockResource中(继承关系为:RefCountLockResource >> RWLockResource >> LockResource),初始化方法为lock,最终执行ReentrantReadWriteLock.lock( )方法
其中在LockPool层面实现了引用计数器,当超过pool的高水位时,唤醒回收线程(Evictor)释放引用计数为0的读/写锁
2、BlockLockManager
Alluxio 中的 BlockLockManager 类,用于处理worker读取底层数据块锁(block locks)。它提供了获取、释放、清理块锁的功能,并确保在并发环境下不会出现竞争条件。
属性:
- mLockPool - ResourcePool<ClientRWLock>:
- 用于管理块锁的资源池。
- 说明:该资源池用于维护可重用的块锁,以限制锁的数量。块锁在获取和释放时会从该池中进行管理。
- mLocks - ConcurrentHashMap<Long, ClientRWLock>:
- 从块ID到块锁的映射。
- 说明:这个并发哈希映射用于将块ID与相应的块锁关联起来,以便管理和共享块锁。
- mSessionCleaning - Semaphore:
- 用于控制会话清理操作的信号量。
- 说明:这个信号量用于确保在进行会话清理操作时,不会并发添加新的锁记录。会话清理是为了移除特定会话持有的锁记录,以确保数据的一致性。
- mLockRecords - IndexedSet<LockRecord>:
- 记录当前由客户端持有的锁。
- 说明:这是一个用于记录当前由客户端持有的锁的数据结构。新的锁记录将被添加到此集合,并在会话清理操作期间受到互斥保护。它使用索引以支持快速查询和检索记录。
- LOCK_ID_GEN - AtomicLong:
- 用于生成唯一的锁ID。
- 说明:该原子长整型用于生成唯一的锁ID,以确保锁的唯一性。
- MAX_READERS - int:
- 最大允许的读者数。
- 说明:指定在锁池中的每个块锁上允许的最大读者数。这有助于控制并发访问。
以上属性是BlockLockManager类中的主要成员,它们在管理块锁和维护锁记录时起着关键作用。这些属性用于确保在多线程环境中对块的访问是安全和有序的。
方法:
- acquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType): BlockLock
- 获取指定会话对指定块的锁。返回一个BlockLock对象,允许稍后释放锁。
- 参数:
- sessionId:会话ID
- blockId:块ID
- blockLockType:块锁类型,可以是读锁或写锁
- tryAcquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType, long time, TimeUnit unit): Optional<BlockLock>
- 尝试在给定的时间内获取指定会话对指定块的锁。如果在给定时间内无法获取锁,返回一个空的Optional。
- 参数:
- sessionId:会话ID
- blockId:块ID
- blockLockType:块锁类型,可以是读锁或写锁
- time:最大等待时间
- unit:时间单位
- lockBlockInternal(long sessionId, long blockId, BlockLockType blockLockType, boolean blocking, Long time, TimeUnit unit): OptionalLong
- 内部方法,用于实现块锁的获取逻辑。支持阻塞和非阻塞锁获取,检查是否会话已经持有了块的锁。
- 参数:
- sessionId:会话ID
- blockId:块ID
- blockLockType:块锁类型,可以是读锁或写锁
- blocking:是否阻塞等待获取锁
- time:最大等待时间(仅在非阻塞情况下使用)
- unit:时间单位(仅在非阻塞情况下使用)
- sessionHoldsLock(long sessionId, long blockId): boolean
- 检查特定会话是否持有指定块的锁。
- 参数:
- sessionId:会话ID
- blockId:块ID
- 返回值:如果会话持有指定块的锁,则返回true;否则返回false。
- cleanupSession(long sessionId): void
- 清理特定会话持有的锁。它会删除与会话关联的锁记录并释放相应的块锁。
- 参数:
- sessionId:会话ID
- getLockedBlocks(): Set<Long>
- 获取当前已锁定的块的快照,用于监控已锁定块的状态。
- 返回值:包含已锁定块ID的集合
- unlockBlock(long lockId): void
- 释放具有指定锁ID的锁。它查找锁记录,删除它,并解锁关联的块,如果锁不再使用。
- 参数:
- lockId:锁ID
- getBlockLock(long blockId): ClientRWLock
- 获取指定块ID的块锁,如果不存在则获取一个。如果所有块锁已分配完毕,此方法将等待直到可以从锁池中获取一个。
- 参数:
- blockId:块ID
- 返回值:块锁对象
- releaseBlockLockIfUnused(long blockId): void
- 检查块锁是否不再使用,如果不再使用则释放它,将其返回到锁池。
- 参数:
- blockId:块ID
- validate(): void
- 用于验证BlockLockManager内部状态的方法,通常用于测试目的。它检查块锁的引用计数是否与锁记录计数相匹配。
- 说明:此方法在验证类的内部状态时使用,确保所有不变量都得到满足。
这些方法共同用于管理块的锁、确保数据的一致性、控制并发访问,并支持块锁的获取和释放。
下面重点分析其中的加锁方法
alluxio.worker.block.BlockLockManager#tryAcquireBlockLock:
/**
sessionId:会话ID
blockId:块ID
blockLockType:块锁类型(读锁 or 写锁)
time:时间
unit:时间单位。与time一起,用于表示在指定时间获取锁,超时返回null
*/
public Optional<BlockLock> tryAcquireBlockLock(long sessionId, long blockId, BlockLockType blockLockType, long time, TimeUnit unit) {
//获取锁ID,其中false为是否阻塞获取锁,time&unit表示在指定的时间内获取锁
OptionalLong lockId = lockBlockInternal(sessionId, blockId, blockLockType, false, time, unit);
//返回Block锁对象
return lockId.isPresent() ? Optional.of(new BlockLock(lockId.getAsLong(), this::unlockBlock)) : Optional.empty();
}
其中,获取锁ID为重点步骤,调用的方法位于
alluxio.worker.block.BlockLockManager#lockBlockInternal:
private OptionalLong lockBlockInternal(long sessionId, long blockId, BlockLockType blockLockType, boolean blocking, @Nullable Long time, @Nullable TimeUnit unit) {
//获取客户端锁对象,重点!!!
ClientRWLock blockLock = getBlockLock(blockId);
//获取锁类型
Lock lock = blockLockType == BlockLockType.READ ? blockLock.readLock() : blockLock.writeLock();
//当需要获取写锁且当前会话已获取锁时(即写锁需要与其他读写锁互斥),抛出异常
if (blockLockType == BlockLockType.WRITE && sessionHoldsLock(sessionId, blockId)) {
...
//抛出异常
}
if (blocking) { //是否使用lock函数,阻塞获取锁
lock.lock();
} else {
...
try {
if (!lock.tryLock(time, unit)) {
...
return OptionalLong.empty();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return OptionalLong.empty();
}
}
long lockId = LOCK_ID_GEN.getAndIncrement();
LockRecord record = new LockRecord(sessionId, blockId, lockId, lock);
//记录(锁、会话、block)的关联信息到mLockRecords记录中,此处通过设置信号量互斥执行
try {
try {
mSessionCleaning.acquire(); //获取互斥锁
} catch (InterruptedException e) {
...
}
mLockRecords.add(record);
mSessionCleaning.release();
return OptionalLong.of(lockId);
} catch (Throwable e) {
...
}
}
下面重点看下获取Block锁的方法
alluxio.worker.block.BlockLockManager#getBlockLock:
private ClientRWLock getBlockLock(long blockId) {
//循环获取block锁
while (true) {
//在mLocks缓存中检查是否存在该block锁。如果有的话,可以直接复用
ClientRWLock reuseExistingLock = mLocks.computeIfPresent(
blockId, (blkid, lock) -> {
lock.addReference();
return lock;
}
);
if (reuseExistingLock != null) {
return reuseExistingLock;
}
//缓存没有的话,新建一个客户端锁对象ClientRWLock
ClientRWLock newlyAcquiredLock = mLockPool.acquire(1, TimeUnit.SECONDS);
//如果新建的锁对象不为null的话,继续执行如下代码;如果锁对象为null,则执行循环体
if (newlyAcquiredLock != null) {
int referenceCount = newlyAcquiredLock.getReferenceCount();
if (referenceCount != 0) {
... //打印异常日志
}
ClientRWLock computed = mLocks.compute(blockId, (id, lock) -> {
if (lock != null) { //该block锁不为null时
lock.addReference();
return lock;
} else { //该block锁为null时,递增引用计数,返回新创建的block锁对象
newlyAcquiredLock.addReference();
return newlyAcquiredLock;
}
});
if (computed != newlyAcquiredLock) { //当mLocks存在该锁时,将新锁释放到mLockPool中
mLockPool.release(newlyAcquiredLock);
}
return computed;
}
}
}