一、概述
ConnectionManager为Router内管理每个用户+NN的连接的服务,它通过一个map<ConnectionPoolId, ConnectionPool>对象pools存储所有pool。下面从服务启动为起点追溯整个服务工作流程:
Router启动时,会执行ConnectionManager.start启动用户连接池的管理 :
二、启动流程
2.1 creator启动
-
启动creator:用来给用户创建新的connection,但是需满足以下条件:
-
该用户连接池未满
-
已有连接total中 active > dfs.federation.router.connection.min-active-ratio * total (已创建的connection数)
循环从queue中获取ConnectionPool:
2.1.1 pool.newConnection()
三个参数(绑定到对应的connection):nnAddress、ugi、proto
1、创建对应nnAddress的socket,
2、执行RPC.getProtocolProxy 获取ProtocolProxy
>. 先获取RpcEngine(客户端会缓存这些Engine,下次调用从缓存直接获取)
>.(这里可以看到 一个用户给的pool内 connectionId <同一用户+nnAddress+protocol>这也就造成router性能瓶颈),获取ProtocolProxy<包含一个"连接服务的connection"和"服务支持的methods">
3、传入ProtocolProxy创建ProtoClient (invoke调用方法传入的就是这个对象);这里跟踪下
这里使用反射构造方法Constructor,传入ProtocolProxy反射生成实例
4、传入ProtoClient创建ProxyAndInfo clientProxy<protoclient, socket> (同一个proxy使用一个socket)
5、传入clientProxy创建ConnectionContext:(当客户端使用连接时,它会增加一个计数器<numThreads>以将其标记为活动的。一旦客户端完成连接,它就会减少计数器。它还负责在不活动时关闭连接)
2.1.2 pool.addConnection(conn)
把返回的ConnectionContext添加到pool.connections<ConnectionContext>中
ConnectionContext.getClient 获取ProxyAndInfo
2.2 连接池清理
调度周期:min('dfs.federation.router.connection.clean.ms', 'dfs.federation.router.connection.pool.clean.ms'),执行CleanupTask 清理连接:(pool.lastActiveTime 当调用getConnection和addConnection时都会被更新为当前时间)
-
若满足pool remove (条件:连接池内所有连接超过 dfs.federation.router.connection.pool.clean.ms 这么久未使用),则把该pool添加到(list)toRemove中,等所有pool遍历完后,执行pools.remove,移除连接池
-
若pool remove不满足,则判断执行cleanup(pool),清理连接池内老旧Connection
2.2.1 cleanup(pool)
执行pool.removeConnections(1) 移除 最新创建的一个connection
conn.close: 可以看到该conn会被标记为closed,也就是不会在接收请求,直至处理请求完毕后(numThreads==0),会调用stopProxy关闭连接