AttemptManager 属于dremio kernel部分的,主要用来管理对于一个查询的所有fragments(分段)(本地以及远程)
参考处理流程(内容来自官方说明)
- AttemptManager 做为一个runnable 提交(实际上内部使用了线程调度)
- runnable 执行查询计划
- 状态从pending 到running
- runnable 发送fragments 启动
- runnable 运行完成,但是AttemptManager 依然存在
- AttemptManager 监听状态变动消息
- 状态消息可能是从失败到取消
- 当所有fragments 都完成之后,状态标记为完成
依赖的外部服务
构造函数
public AttemptManager(
final SabotContext sabotContext, // sabotcontext
final AttemptId attemptId, // id
final UserRequest queryRequest, // 用户请求
final AttemptObserver observer,
final OptionProvider options,
final Cache<Long, PreparedPlan> preparedPlans,
final QueryContext queryContext, // 查询上下文
final CommandPool commandPool, // 命令执行
final MaestroService maestroService, // 大师服务,具体的执行操作
final JobTelemetryClient jobTelemetryClient, // 任务监控的
final RuleBasedEngineSelector ruleBasedEngineSelector, // 资源调度服务
final boolean runInSameThread
)
类图
使用AttemptManager的服务
主要是Foreman 使用(apache drill 以及dremio 的核心任务处理),Foreman 由ForemenWorkManager集成使用
Foreman 集成部分
protected AttemptManager newAttemptManager(SabotContext context, AttemptId attemptId, UserRequest queryRequest,
AttemptObserver observer, UserSession session, OptionProvider options,
Cache<Long, PreparedPlan> preparedPlans, PlanCache planCache,
Predicate<DatasetConfig> datasetValidityChecker, CommandPool commandPool) {
final QueryContext queryContext = new QueryContext(session, context, attemptId.toQueryId(),
queryRequest.getPriority(), queryRequest.getMaxAllocation(), datasetValidityChecker, planCache);
return new AttemptManager(context, attemptId, queryRequest, observer, options, preparedPlans,
queryContext, commandPool, maestroService, jobTelemetryClient, ruleBasedEngineSelector,
queryRequest.runInSameThread());
}
ForemenWorkManager 集成foreman部分
public void submit(
final ExternalId externalId,
final QueryObserver observer,
final UserSession session,
final UserRequest request,
final TerminationListenerRegistry registry,
final OptionProvider config,
final ReAttemptHandler attemptHandler) {
final DelegatingCompletionListener delegate = new DelegatingCompletionListener();
final Foreman foreman = newForeman(pool, commandPool.get(), delegate, externalId, observer, session, request,
config, attemptHandler, preparedHandles, planCache);
final ManagedForeman managed = new ManagedForeman(registry, foreman);
externalIdToForeman.put(foreman.getExternalId(), managed);
delegate.setListener(managed);
foreman.start();
}
参考资料
sabot/kernel/src/main/java/com/dremio/exec/work/foreman/AttemptManager.java