摘要:
在用多线程聚合处理时,遇到的访问问题
日志分析:
原始的单线程聚合:
[2022-08-26 10:34:52.539318] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 10:34:52.539323] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1540
[2022-08-26 10:34:52.539330] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 10:34:52.539335] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1541
[2022-08-26 10:34:52.539342] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 10:34:52.539347] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1542
[2022-08-26 10:34:52.539354] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 10:34:52.539358] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1543
多线程聚合只让一个工作线程去处理聚合:
[2022-08-26 11:29:10.026887] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 11:29:10.026891] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1540
[2022-08-26 11:29:10.026898] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 11:29:10.026904] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1541
[2022-08-26 11:29:10.026910] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 11:29:10.026915] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1542
[2022-08-26 11:29:10.026921] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-26 11:29:10.026926] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1543
多线程聚合处理遇到NULL值:
[2022-08-28 08:51:22.791406] [51081] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-28 08:51:22.791411] [51081] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1540
[2022-08-28 08:51:22.791421] [51081] [INFO] [rc_attr.h:111] MSG: rc_attr:GetValueInt64 get_packN NULL, obj: 1540 pack: 0
[2022-08-28 08:51:22.791429] [51081] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 347
[2022-08-28 08:51:22.791434] [51081] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1541
[2022-08-28 08:51:22.791439] [51081] [INFO] [rc_attr.h:111] MSG: rc_attr:GetValueInt64 get_packN NULL, obj: 1541 pack: 0
核心函数:
AggregationWorkerEnt::DistributeAggreTaskAverage
/*Average allocation task*/
void AggregationWorkerEnt::DistributeAggreTaskAverage(MIIterator &mit) {
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage start");
Transaction *conn = current_tx;
DimensionVector dims(mind->NumOfDimensions());
std::vector<CTask> vTask;
std::vector<std::unique_ptr<GroupByWrapper>> vGBW;
vGBW.reserve(m_threads);
vTask.reserve(m_threads);
if (rccontrol.isOn()) rccontrol.lock(conn->GetThreadID()) << "Prepare data for parallel aggreation" << system::unlock;
int packnum = 0;
int curtuple_index = 0;
std::unordered_map<int, int> pack2cur;
while (mit.IsValid()) {
int64_t packrow_length = mit.GetPackSizeLeft();
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage pack_no: %d curtuple_index: %d", packnum,
curtuple_index);
curtuple_index += packrow_length;
packnum++;
mit.NextPackrow();
}
pack2cur.emplace(std::pair<int, int>(packnum, curtuple_index));
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage pack_no: %d curtuple_index: %d", packnum, curtuple_index);
int loopcnt = (packnum < m_threads) ? packnum : m_threads;
int mod = packnum % loopcnt;
int num = packnum / loopcnt;
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage mod: %d num: %d loopcnt: %d packnum: %d", mod, num,
loopcnt, packnum);
utils::result_set<void> res;
for (int i = 0; i < loopcnt; ++i) {
res.insert(rceng->query_thread_pool.add_task(&AggregationWorkerEnt::PrepShardingCopy, this, &mit, gb_main, &vGBW));
int pack_start = i * num;
int pack_end = 0;
int dwPackNum = 0;
if (i==(loopcnt-1)) {
pack_end = packnum;
dwPackNum = packnum;
} else {
pack_end = (i + 1) * num - 1;
dwPackNum = pack_end + 1;
}
int cur_start = pack2cur[pack_start];
int cur_end = pack2cur[pack_end]-1;
CTask tmp;
tmp.dwTaskId = i;
tmp.dwPackNum = dwPackNum;
tmp.dwStartPackno = pack_start;
tmp.dwEndPackno = pack_end;
tmp.dwStartTuple = cur_start;
tmp.dwEndTuple = cur_end;
tmp.dwTuple = cur_start;
tmp.dwPack2cur = pack2cur;
vTask.push_back(tmp);
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage task_id: %d pack_start: %d pack_end: %d dwPackNum: %d dwStartTuple: %d dwEndTuple: %d dwTuple: %d",
tmp.dwTaskId, pack_start, pack_end, tmp.dwPackNum, tmp.dwStartTuple, tmp.dwEndTuple, tmp.dwTuple);
}
res.get_all_with_except();
mit.Rewind();
std::vector<MultiIndex> mis;
mis.reserve(vTask.size());
std::vector<MIIterator> taskIterator;
taskIterator.reserve(vTask.size());
utils::result_set<void> res1;
for (uint i = 0; i < vTask.size(); ++i) {
auto &mi = mis.emplace_back(*mind, true);
if (dims.NoDimsUsed() == 0) dims.SetAll();
auto &mii = taskIterator.emplace_back(mit, true);
mii.SetTaskNum(vTask.size());
mii.SetTaskId(i);
// mii.SetNoPacksToGo(vTask[i].dwEndPackno);
// mii.RewindToPack(vTask[i].dwStartPackno);
}
res1.insert(rceng->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[0], &dims,
&mit, &vTask[0],
gb_main, conn));
for (size_t i = 1; i < vTask.size(); ++i) {
res1.insert(rceng->query_thread_pool.add_task(&AggregationWorkerEnt::TaskAggrePacks, this, &taskIterator[i], &dims,
&mit, &vTask[i], vGBW[i].get(), conn));
}
res1.get_all_with_except();
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage threads over");
for (size_t i = 0; i < vTask.size(); ++i) {
// Merge aggreation data together
if (i != 0) {
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage Merge task: %d", i);
gb_main->Merge(*(vGBW[i]));
}
}
TIANMU_LOG(LogCtl_Level::INFO, "DistributeAggreTaskAverage merge over, gbw_row_num: %d", gb_main->GetRowsNo());
}
AggregationWorkerEnt::TaskAggrePacks
void AggregationWorkerEnt::TaskAggrePacks(MIIterator *taskIterator, [[maybe_unused]] DimensionVector *dims,
[[maybe_unused]] MIIterator *mit, [[maybe_unused]] CTask *task,
GroupByWrapper *gbw, Transaction *ci) {
int id = taskIterator->GetTaskId();
if ((0 != id) && (0 != id) && (1 != id)) {
return;
}
TIANMU_LOG(LogCtl_Level::INFO,
"TaskAggrePacks dwTaskId: %d dwStartPackno: %d dwEndPackno: %d dwPackNum: %d dwStartTuple: %d "
"dwEndTuple: %d dwTuple: %d",
task->dwTaskId, task->dwStartPackno, task->dwEndPackno, task->dwPackNum, task->dwStartTuple,
task->dwEndTuple, task->dwTuple);
int task_pack_num = 0;
int i = 0;
int tuple_left = 0;
taskIterator->Rewind();
while (taskIterator->IsValid()) {
if ((task_pack_num >= task->dwStartPackno) && (task_pack_num <= task->dwEndPackno)) {
int cur_tuple = task->dwPack2cur[task_pack_num];
MIInpackIterator mii(*taskIterator);
int grouping_result = aa->AggregatePackrow(*gbw, &mii, cur_tuple);
if (grouping_result != 5) i++;
if (grouping_result == 1) break;
if (grouping_result == 2) throw common::KilledException();
if (grouping_result == 3 || grouping_result == 4) throw common::NotImplementedException("Aggregation overflow.");
tuple_left = gbw->TuplesLeftBetween(task->dwStartTuple, task->dwEndTuple);
TIANMU_LOG(LogCtl_Level::INFO, "TaskAggrePacks id: %d gbw_row_num: %d tuple_left: %d task_pack_num: %d cur_tuple: %d", task->dwTaskId,
gbw->GetRowsNo(), tuple_left, task_pack_num, cur_tuple);
}
taskIterator->NextPackrow();
++task_pack_num;
}
tuple_left = gbw->TuplesLeftBetween(task->dwStartTuple, task->dwEndTuple);
TIANMU_LOG(LogCtl_Level::INFO, "TaskAggrePacks id: %d gbw_row_num: %d tuple_left: %d ", task->dwTaskId,
gbw->GetRowsNo(), tuple_left);
}
### Wrong place(multi-thread)
[2022-08-26 10:24:47.165501] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:24:47.165506] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1088
[2022-08-26 10:24:47.165512] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:24:47.165516] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1089
[2022-08-26 10:24:47.165522] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:24:47.165526] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1090
[2022-08-26 10:24:47.165533] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 10:24:47.165537] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1091
[2022-08-26 10:24:47.165543] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:24:47.165548] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1092
[2022-08-26 10:24:47.165554] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:24:47.165558] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1093
[2022-08-26 10:24:47.165564] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:24:47.165568] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1094
[2022-08-26 10:24:47.165574] [32679] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:24:47.165578] [32679] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1095
### Where to do it right(Single-Threaded)
[2022-08-26 10:34:52.533638] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:34:52.533647] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1088
[2022-08-26 10:34:52.533720] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:34:52.533729] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1089
[2022-08-26 10:34:52.533736] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:34:52.533740] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1090
-------------------------------------------------------------------------------------------------------------------------------------
[2022-08-26 10:34:52.533746] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 10:34:52.533750] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1091
-------------------------------------------------------------------------------------------------------------------------------------
[2022-08-26 10:34:52.533756] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 10:34:52.533760] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1092
[2022-08-26 10:34:52.533766] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 10:34:52.533770] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1093
[2022-08-26 10:34:52.533776] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 10:34:52.533780] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1094
[2022-08-26 10:34:52.533785] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 10:34:52.533789] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1095
[2022-08-26 10:34:52.533796] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:34:52.533800] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1100
[2022-08-26 10:34:52.533806] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:34:52.533809] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1101
[2022-08-26 10:34:52.533815] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:34:52.533819] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1102
[2022-08-26 10:34:52.533824] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 10:34:52.533828] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1103
[2022-08-26 10:34:54.741657] [35759] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 57560
[2022-08-26 10:34:54.741707] [35759] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 301976
The single working thread is handled only by multi-threaded aggregation scenarios
[2022-08-26 11:29:10.021256] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 11:29:10.021261] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1088
[2022-08-26 11:29:10.021267] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 11:29:10.021272] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1089
[2022-08-26 11:29:10.021279] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 11:29:10.021283] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1090
[2022-08-26 11:29:10.021294] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 242
[2022-08-26 11:29:10.021302] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1091
[2022-08-26 11:29:10.021314] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 11:29:10.021321] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1092
[2022-08-26 11:29:10.021332] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 11:29:10.021340] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1093
[2022-08-26 11:29:10.021350] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 11:29:10.021451] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1094
[2022-08-26 11:29:10.021468] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 243
[2022-08-26 11:29:10.021473] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1095
[2022-08-26 11:29:10.021481] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 11:29:10.021486] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1100
[2022-08-26 11:29:10.021492] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 11:29:10.021497] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1101
[2022-08-26 11:29:10.021506] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 11:29:10.021511] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1102
[2022-08-26 11:29:10.021518] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 244
[2022-08-26 11:29:10.021523] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 1103
[2022-08-26 11:29:12.182563] [38928] [INFO] [group_table.cpp:569] MSG: PutAggregatedValue col: 3 row: 57560
[2022-08-26 11:29:12.182599] [38928] [INFO] [single_column.h:80] MSG: GetValueInt64Impl dim: 0 mit_cur_row: 301976
AggregationAlgorithm::AggregatePackrow
int AggregationAlgorithm::AggregatePackrow(GroupByWrapper &gbw, MIIterator *mit, int64_t cur_tuple) {
int64_t packrow_length = mit->GetPackSizeLeft();
if (!gbw.AnyTuplesLeft(cur_tuple, cur_tuple + packrow_length - 1)) {
mit->NextPackrow();
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow 5");
return 5;
}
int64_t uniform_pos = common::NULL_VALUE_64;
bool skip_packrow = false;
bool packrow_done = false;
bool part_omitted = false;
bool stop_all = false;
bool aggregations_not_changeable = false;
bool require_locking_ag = true; // a new packrow, so locking will be needed
bool require_locking_gr = true; // do not lock if the grouping row is uniform
if (require_locking_gr) {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
gbw.LockPackAlways(gr_a, *mit); // note: ColumnNotOmitted checked
// inside//»á¼ÓÔؽâѹgroup byÁÐÊý¾Ý°ü
require_locking_gr = false;
}
if (require_locking_ag) {
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
gbw.LockPackAlways(gr_a, *mit); // note: ColumnNotOmitted checked inside
require_locking_ag = false;
}
gbw.ResetPackrow();
int64_t rows_in_pack = gbw.TuplesLeftBetween(cur_tuple, cur_tuple + packrow_length - 1);
DEBUG_ASSERT(rows_in_pack > 0);
// TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow TuplesLeftBetween cur_tuple: %d packrow_length: %d rows_in_pack: %d gbw_row_num: %d",
// cur_tuple, packrow_length, rows_in_pack, gbw.GetRowsNo());
skip_packrow = AggregateRough(gbw, *mit, packrow_done, part_omitted, aggregations_not_changeable, stop_all,
uniform_pos, rows_in_pack, factor);
if (t->NumOfObj() + gbw.NumOfGroups() == gbw.UpperApproxOfGroups()) { // no more groups!
gbw.SetAllGroupsFound();
if (skip_packrow) // no aggr. changeable and no new groups possible?
packrow_done = true;
if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs() // just DISTINCT without grouping
|| stop_all) { // or aggregation already done on rough level
gbw.TuplesResetAll(); // no more rows needed, just produce output
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow 1");
return 1; // aggregation finished
}
}
if (skip_packrow)
gbw.packrows_omitted++;
else if (part_omitted)
gbw.packrows_part_omitted++;
if (packrow_done) { // This packrow will not be needed any more
gbw.TuplesResetBetween(cur_tuple, cur_tuple + packrow_length - 1);
}
if (packrow_done || skip_packrow) {
mit->NextPackrow();
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow next 0");
return 0; // success - roughly omitted
}
// bool require_locking_ag = true; // a new packrow,
// so locking will be needed bool require_locking_gr = (uniform_pos ==
// common::NULL_VALUE_64); // do not lock if the grouping row is uniform
while (mit->IsValid()) { // becomes invalid on pack end
if (m_conn->Killed()) { // killed
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow 2");
return 2;
}
if (gbw.TuplesGet(cur_tuple)) {
if (require_locking_gr) {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++)
gbw.LockPack(gr_a, *mit); // note: ColumnNotOmitted checked inside
require_locking_gr = false;
}
mit->SetCurUsedPos(cur_tuple);
int64_t pos = 0;
bool existed = true;
if (uniform_pos != common::NULL_VALUE_64) // either uniform because of KNs, or = 0,
// because there is no grouping columns
pos = uniform_pos; // existed == true, as above
else {
for (int gr_a = 0; gr_a < gbw.NumOfGroupingAttrs(); gr_a++) {
if (gbw.ColumnNotOmitted(gr_a)) {
gbw.PutGroupingValue(gr_a, *mit);
}
}
existed = gbw.FindCurrentRow(pos);
}
if (pos == common::NULL_VALUE_64) {
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow pos==NULL_VALUE_64 cur_tuple: %d existed: %s", cur_tuple,
existed ? "true" : "false");
}
if (pos != common::NULL_VALUE_64) { // Any place left? If not, just omit
// the tuple.
gbw.TuplesReset(cur_tuple); // internally delayed for optimization
// purposes - must be committed at the end
if (!existed) {
aggregations_not_changeable = false;
gbw.AddGroup(); // successfully added
if (t->NumOfObj() + gbw.NumOfGroups() == gbw.UpperApproxOfGroups()) { // no more groups!
gbw.SetAllGroupsFound();
if (gbw.NumOfGroupingAttrs() == gbw.NumOfAttrs()) { // just DISTINCT without grouping
gbw.TuplesResetAll(); // no more rows needed, just produce output
TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow 1");
return 1; // aggregation finished
}
}
}
if (!aggregations_not_changeable) {
// Lock packs if needed
if (require_locking_ag) {
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
gbw.LockPack(gr_a,
*mit); // note: ColumnNotOmitted checked inside
require_locking_ag = false;
}
// Prepare packs for aggregated columns
for (int gr_a = gbw.NumOfGroupingAttrs(); gr_a < gbw.NumOfAttrs(); gr_a++)
if (gbw.ColumnNotOmitted(gr_a)) {
bool value_successfully_aggregated = gbw.PutAggregatedValue(gr_a, pos, *mit, factor);
if (!value_successfully_aggregated) {
gbw.DistinctlyOmitted(gr_a, cur_tuple);
}
}
}
}
}
cur_tuple++;
mit->Increment();
if (mit->PackrowStarted()) break;
}
gbw.CommitResets();
// TIANMU_LOG(LogCtl_Level::INFO, "AggregatePackrow ok 0");
return 0; // success
}
GroupTable::PutAggregatedValue
bool GroupTable::PutAggregatedValue(int col, int64_t row, MIIterator &mit, int64_t factor, bool as_string) {
if (distinct[col]) {
// Repetition? Return without action.
DEBUG_ASSERT(gdistinct[col]);
if (vc[col]->IsNull(mit)) return true; // omit nulls
GDTResult res = gdistinct[col]->Add(row, mit);
if (res == GDTResult::GDT_EXISTS) return true; // value found, do not aggregate it again
if (res == GDTResult::GDT_FULL) {
if (gdistinct[col]->AlreadyFull())
not_full = false; // disable also the main grouping table (if it is a
// persistent rejection)
return false; // value not found in DISTINCT buffer, which is already
// full
}
factor = 1; // ignore repetitions for distinct
}
TIANMUAggregator *cur_aggr = aggregator[col];
if (factor == common::NULL_VALUE_64 && cur_aggr->FactorNeeded())
throw common::NotImplementedException("Aggregation overflow.");
if (as_string) {
types::BString v;
vc[col]->GetValueString(v, mit);
if (v.IsNull() && cur_aggr->IgnoreNulls()) return true; // null omitted
cur_aggr->PutAggregatedValue(vm_tab->GetAggregationRow(row) + aggregated_col_offset[col], v, factor);
} else {
// note: it is too costly to check nulls separately (e.g. for complex
// expressions)
TIANMU_LOG(LogCtl_Level::INFO, "PutAggregatedValue col: %d row: %d", col, row);
int64_t v = vc[col]->GetValueInt64(mit);
if (v == common::NULL_VALUE_64 && cur_aggr->IgnoreNulls()) return true;
cur_aggr->PutAggregatedValue(vm_tab->GetAggregationRow(row) + aggregated_col_offset[col], v, factor);
}
return true;
}