searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Hadoop RBF ConnectionManager

2023-05-30 06:26:09
18
0

一、概述

ConnectionManager为Router内管理每个用户+NN的连接的服务,它通过一个map<ConnectionPoolId, ConnectionPool>对象pools存储所有pool。下面从服务启动为起点追溯整个服务工作流程:
 
Router启动时,会执行ConnectionManager.start启动用户连接池的管理 :
 

二、启动流程

2.1 creator启动 

  1. 启动creator:用来给用户创建新的connection,但是需满足以下条件:
  • 该用户连接池未满
  • 已有连接total中 active > dfs.federation.router.connection.min-active-ratio * total (已创建的connection数)
       循环从queue中获取ConnectionPool:
   
 

2.1.1 pool.newConnection()

三个参数(绑定到对应的connection):nnAddress、ugi、proto
            1、创建对应nnAddress的socket,
            2、执行RPC.getProtocolProxy  获取ProtocolProxy
                  >. 先获取RpcEngine(客户端会缓存这些Engine,下次调用从缓存直接获取)
                  >.(这里可以看到 一个用户给的pool内 connectionId <同一用户+nnAddress+protocol>这也就造成router性能瓶颈),获取ProtocolProxy<包含一个"连接服务的connection"和"服务支持的methods">
            3、传入ProtocolProxy创建ProtoClient (invoke调用方法传入的就是这个对象);这里跟踪下
           
            这里使用反射构造方法Constructor,传入ProtocolProxy反射生成实例 
           
            4、传入ProtoClient创建ProxyAndInfo clientProxy<protoclient, socket> (同一个proxy使用一个socket)
            5、传入clientProxy创建ConnectionContext:(当客户端使用连接时,它会增加一个计数器<numThreads>以将其标记为活动的。一旦客户端完成连接,它就会减少计数器。它还负责在不活动时关闭连接)
           
 

2.1.2 pool.addConnection(conn)

把返回的ConnectionContext添加到pool.connections<ConnectionContext>中
ConnectionContext.getClient 获取ProxyAndInfo
 

2.2 连接池清理

调度周期:min('dfs.federation.router.connection.clean.ms', 'dfs.federation.router.connection.pool.clean.ms'),执行CleanupTask 清理连接:(pool.lastActiveTime 当调用getConnection和addConnection时都会被更新为当前时间)
  1. 若满足pool remove (条件:连接池内所有连接超过 dfs.federation.router.connection.pool.clean.ms 这么久未使用),则把该pool添加到(list)toRemove中,等所有pool遍历完后,执行pools.remove,移除连接池
  2. 若pool remove不满足,则判断执行cleanup(pool),清理连接池内老旧Connection

2.2.1  cleanup(pool) 

执行pool.removeConnections(1) 移除 最新创建的一个connection 

conn.close: 可以看到该conn会被标记为closed,也就是不会在接收请求,直至处理请求完毕后(numThreads==0),会调用stopProxy关闭连接

0条评论
0 / 1000
付朝洪
3文章数
0粉丝数
付朝洪
3 文章 | 0 粉丝
付朝洪
3文章数
0粉丝数
付朝洪
3 文章 | 0 粉丝
原创

Hadoop RBF ConnectionManager

2023-05-30 06:26:09
18
0

一、概述

ConnectionManager为Router内管理每个用户+NN的连接的服务,它通过一个map<ConnectionPoolId, ConnectionPool>对象pools存储所有pool。下面从服务启动为起点追溯整个服务工作流程:
 
Router启动时,会执行ConnectionManager.start启动用户连接池的管理 :
 

二、启动流程

2.1 creator启动 

  1. 启动creator:用来给用户创建新的connection,但是需满足以下条件:
  • 该用户连接池未满
  • 已有连接total中 active > dfs.federation.router.connection.min-active-ratio * total (已创建的connection数)
       循环从queue中获取ConnectionPool:
   
 

2.1.1 pool.newConnection()

三个参数(绑定到对应的connection):nnAddress、ugi、proto
            1、创建对应nnAddress的socket,
            2、执行RPC.getProtocolProxy  获取ProtocolProxy
                  >. 先获取RpcEngine(客户端会缓存这些Engine,下次调用从缓存直接获取)
                  >.(这里可以看到 一个用户给的pool内 connectionId <同一用户+nnAddress+protocol>这也就造成router性能瓶颈),获取ProtocolProxy<包含一个"连接服务的connection"和"服务支持的methods">
            3、传入ProtocolProxy创建ProtoClient (invoke调用方法传入的就是这个对象);这里跟踪下
           
            这里使用反射构造方法Constructor,传入ProtocolProxy反射生成实例 
           
            4、传入ProtoClient创建ProxyAndInfo clientProxy<protoclient, socket> (同一个proxy使用一个socket)
            5、传入clientProxy创建ConnectionContext:(当客户端使用连接时,它会增加一个计数器<numThreads>以将其标记为活动的。一旦客户端完成连接,它就会减少计数器。它还负责在不活动时关闭连接)
           
 

2.1.2 pool.addConnection(conn)

把返回的ConnectionContext添加到pool.connections<ConnectionContext>中
ConnectionContext.getClient 获取ProxyAndInfo
 

2.2 连接池清理

调度周期:min('dfs.federation.router.connection.clean.ms', 'dfs.federation.router.connection.pool.clean.ms'),执行CleanupTask 清理连接:(pool.lastActiveTime 当调用getConnection和addConnection时都会被更新为当前时间)
  1. 若满足pool remove (条件:连接池内所有连接超过 dfs.federation.router.connection.pool.clean.ms 这么久未使用),则把该pool添加到(list)toRemove中,等所有pool遍历完后,执行pools.remove,移除连接池
  2. 若pool remove不满足,则判断执行cleanup(pool),清理连接池内老旧Connection

2.2.1  cleanup(pool) 

执行pool.removeConnections(1) 移除 最新创建的一个connection 

conn.close: 可以看到该conn会被标记为closed,也就是不会在接收请求,直至处理请求完毕后(numThreads==0),会调用stopProxy关闭连接

文章来自个人专栏
Hadoop-RBF
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0