摘要:
逐行访问数据库中的元素涉及对磁盘IO的操作, 读取pack后又涉及解压和数据转换的操作,单线程处理时会产生大量的耗时。
一个简单的做法便是将数据拆分成不同的子集,然后利用多核CPU去处理不同的子集,最后将结果汇总。
本文分析这样的做法的一般性。
拆分成多线程处理面临的问题:
一. 需要有一个线程池任务处理模块
如果没有的话,那就需要写一个这样的多线程任务处理的模块,需要包含以下内容:
- 包含对要处理的任务的基本的数据结构的包装
- 一个任务必须可以独立的运行
- 任务与线程之间的关系,由线程池去消费任务
- 可以线程池中添加任务
- 任务与线程的模型为生产者与消费者
- 是否可以在任意时刻向线程池添加任务,一方面是需求驱动,不过更多的情况是受限于系统内原有的线程池模型
- 启动线程可以等待线程池处理完毕
- 类似于协程中的yield概念
- 目的在于不破坏原有的上下文的逻辑,同步的逻辑,在经过多线程的工作池处理后,返回调用处依然是同步的语义
二. 明确出临界区的范围
要明确出临界区的范围,原因就在于多线程的内存可见性,以及由此引发的数据安全的问题。
将原有的单个集合,拆分成不同的独立的子集,这就相当于打破了原有的数据间的交互逻辑。从上下文的场景看,可以分为以下几种类型的数据:
2.1 要遍历访问的元组数据
元组的数据是相对容易拆分的,因为访问的时候元组和元组间其实并没有交互,仅是按照顺序逐个访问。只要将整个表的元组,切分好起始位置, 划分成子集,就可以独立的访问每个子集元组的数据。
2.2 在遍历访问元组时的一些状态控制的数据
状态数据的目的一方面是为了统计,一方面是在遍历访问过程中对后序遍历做控制。
这些数据需要在具体场景中具体分析, 如果只是控制是否访问本pack,那么可以处理成任务内数据处理,而非全局的。
2.3 遍历访问元组获取的符合条件的结果
对元组访问结果的集合必然是一个全局的,但是对于结果集的处理有不同的做法:
- 在每个任务内先获取本任务的结果集,最后将所有任务的结果集合并
- 每个任务使用全局的结果集,但是在修改全局结果集时对数据加锁
- 使用线程安全的数据结构,每个任务向其中填充数据。(其实内部也是要加锁,区别在于加锁的范围)
第一个做法最为简单, 也没有锁的性能问题,但是,不足在于需要2倍的内存占用,以及内存拷贝的开销。不过在业务的测试中,内存拷贝相比其他的场景还可以接受(未对结果产生数量级影响)。
第二和第三个做法需要慎重,需要对底层的数据结构有深刻的理解,并且需要有实际的测试数据。
三. 理解底层模块间的数据关系, 避免在多线程处理时破坏底层的访问
听起来像是一句废话,如果不理解底层模块的数据的关系,那怎么能保证用多线程并行的去访问时不会产生破坏呢?
问题就在于要做到这一点并不容易, 模块间耦合严重的最大的恶果,就在于维护时无法独立的对某个模块进行维护,而必须将所有模块间的交互关系都理清。
要修改代码时,也无法单独的对特定的模块进行修改,必须将所涉及到的逻辑,经过重新设计后,一并修改。
以上还是基于工程化的设计思想上的,如果是基于数学模型实现的代码,那么可以说整个代码库是数学模型的一个具体实现,导致:
- 难以从代码推导出逻辑,必须首先理解其数学模型
- 修改时, 也必须从数学模型入手, 重新设计数学模型后,将其用代码实现
对顺序遍历元组进行多线程拆分的一般做法:
一. 必须读懂原有代码的逻辑
也是一句废话, 问题就在于考虑到代码的耦合程度,需要花费相当大的精力。必要时还必须理解AP系统的一些概念,和数据库系统的常规的数学模型。
二. 提取出每个任务所必须的数据
包含起始行号,起始pack号, 以及存放每个任务处理结果的数据结构。
例如:
int packnum = 0;
int curtuple_index = 0;
std::unordered_map<int, int> pack2cur;
while (mit.IsValid()) {
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));
int64_t packrow_length = mit.GetPackSizeLeft();
curtuple_index += packrow_length;
packnum++;
mit.NextPackrow();
}
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));
int loopcnt = (packnum < m_threads) ? packnum : m_threads;
int mod = packnum % loopcnt;
int num = packnum / loopcnt;
for (int i = 0; i < loopcnt; ++i) {
res.insert(
ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW));
int pack_start = i * num;
int pack_end = 0;
int dwPackNum = 0;
if (i == (loopcnt - 1)) {
pack_end = packnum;
dwPackNum = packnum;
} else {
pack_end = (i + 1) * num - 1;
dwPackNum = pack_end + 1;
}
int cur_start = pack2cur[pack_start];
int cur_end = pack2cur[pack_end] - 1;
CTask tmp;
tmp.dwTaskId = i;
tmp.dwPackNum = dwPackNum;
tmp.dwStartPackno = pack_start;
tmp.dwEndPackno = pack_end;
tmp.dwStartTuple = cur_start;
tmp.dwEndTuple = cur_end;
tmp.dwTuple = cur_start;
tmp.dwPack2cur = &pack2cur;
vTask.push_back(tmp);
}
三. 在单独的任务内访问元组子集
前提是能正确的分割元组,以及处理临界区, 可以看个例子:
utils::result_set<void> res1;
for (uint i = 0; i < vTask.size(); ++i) {
if (dims.NoDimsUsed() == 0) dims.SetAll();
auto &mii = taskIterator.emplace_back(mit, true);
mii.SetTaskNum(vTask.size());
mii.SetTaskId(i);
}
for (size_t i = 0; i < vTask.size(); ++i) {
GroupByWrapper *gbw = i == 0 ? gb_main : vGBW[i].get();
res1.insert(ha_rcengine_->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i],
&dims, &mit, &vTask[i], gbw, conn));
}
res1.get_all_with_except();
void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, [[maybe_unused]] DimensionVector *dims,
[[maybe_unused]] MIIterator *mit, [[maybe_unused]] CTask *task,
GroupByWrapper *gbw, Transaction *ci) {
taskIterator->Rewind();
int task_pack_num = 0;
while (taskIterator->IsValid()) {
if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) {
int cur_tuple = (*task->dwPack2cur)[task_pack_num];
MIInpackIterator mii(*taskIterator);
AggregaGroupingResult grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple);
if (grouping_result == AggregaGroupingResult::AGR_FINISH) break;
if (grouping_result == AggregaGroupingResult::AGR_KILLED) throw common::KilledException();
if (grouping_result == AggregaGroupingResult::AGR_OVERFLOW ||
grouping_result == AggregaGroupingResult::AGR_OTHER_ERROR)
throw common::NotImplementedException("Aggregation overflow.");
}
taskIterator->NextPackrow();
++task_pack_num;
}
}
四. 合并每个任务的结果集
这也不是一个简单的事情, 在保证正确性的前提下, 避免内存拷贝.
具体的处理需要根据业务场景的不同做调整。
例子:
for (size_t i = 0; i < vTask.size(); ++i) {
// Merge aggreation data together
if (i != 0) {
aa->MultiDimensionalDistinctScan(*(vGBW[i]), mit);
gb_main->Merge(*(vGBW[i]));
}
}
void GroupByWrapper::Merge(GroupByWrapper &sec) {
int64_t old_groups = gt.GetNoOfGroups();
gt.Merge(sec.gt, m_conn);
if (tuple_left) tuple_left->And(*(sec.tuple_left));
packrows_omitted += sec.packrows_omitted;
packrows_part_omitted += sec.packrows_part_omitted;
// note that no_groups may be different than gt->..., because it is global
no_groups += gt.GetNoOfGroups() - old_groups;
}
参考:
《POSIX多线程编程》
《数据库系统实现》
《数据库查询优化器的艺术》