摘要:
拆分成多线程去访问元组后,只有当给全部数据访问的入口加锁,使并行的线程进行串行处理时数据才会正常,而当缩小临界区时,在访问时候将会出现问题。
本文记录遇到的问题。
核心函数:
JoinerGeneral::ExecuteInnerJoinPackRow
void JoinerGeneral::ExecuteInnerJoinPackRow(MIIterator *mii, Condition *cond, MINewContents *new_mind,
DimensionVector *all_dims, std::vector<bool> *pack_desc_locked,
int64_t *tuples_in_output, int64_t limit, bool count_only,
bool *stop_execution,
int64_t *rows_passed, int64_t *rows_omitted) {
std::scoped_lock guard(mtx);
int no_desc = (*cond).Size();
int index = 0;
while (mii->IsValid() && !*stop_execution) {
if (mii->PackrowStarted()) {
bool omit_this_packrow = false;
for (int i = 0; (i < no_desc && !omit_this_packrow); i++)
if ((*cond)[i].EvaluateRoughlyPack(*mii) == common::RSValue::RS_NONE) omit_this_packrow = true;
for (int i = 0; i < no_desc; i++) (*pack_desc_locked)[i] = false; // delay locking
if (new_mind->NoMoreTuplesPossible())
break; // stop the join if nothing new may be obtained in some optimized cases
if (omit_this_packrow) {
(*rows_omitted) += mii->GetPackSizeLeft();
(*rows_passed) += mii->GetPackSizeLeft();
mii->NextPackrow();
continue;
}
}
{
bool loc_result = true;
for (int i = 0; (i < no_desc && loc_result); i++) {
if (!(*pack_desc_locked)[i]) { // delayed locking - maybe will not be locked at all?
(*cond)[i].LockSourcePacks(*mii);
(*pack_desc_locked)[i] = true;
}
if (types::RequiresUTFConversions((*cond)[i].GetCollation())) {
if ((*cond)[i].CheckCondition_UTF(*mii) == false) loc_result = false;
} else {
if ((*cond)[i].CheckCondition(*mii) == false) loc_result = false;
}
}
if (loc_result) {
if (!count_only) {
for (int i = 0; i < mind->NumOfDimensions(); i++)
if ((*all_dims)[i]) new_mind->SetNewTableValue(i, (*mii)[i]);
new_mind->CommitNewTableValues();
}
(*tuples_in_output)++;
}
}
(*rows_passed)++;
if (m_conn->Killed()) throw common::KilledException();
if (limit > -1 && *tuples_in_output >= limit) *stop_execution = true;
// TIANMU_LOG(LogCtl_Level::INFO, "ExecuteInnerJoinPackRow index: %d tuples_in_output: %d rows_passed: %d", index,
// *tuples_in_output, *rows_passed);
{
mii->Increment();
if (mii->PackrowStarted()) break;
}
++index;
}
}
函数分析:
- std::scoped_lock guard(mtx); 对临界区进行加锁,使用RAII来进行加锁和解锁
- 该函数应该只处理当前的pack,不能进入下一个pack
- 由外部调用方对整体的pack进行分割, 每个任务只处理分割给自己的pack的子集
Descriptor::LockSourcePacks
void Descriptor::LockSourcePacks(const MIIterator &mit) {
if (tree) tree->root->PrepareToLock(0);
if (attr.vc) attr.vc->LockSourcePacks(mit);
if (val1.vc) val1.vc->LockSourcePacks(mit);
if (val2.vc) val2.vc->LockSourcePacks(mit);
}
问题分析:
最大的问题还是在多线程并行访问时,底层的数据结构并不支持, 在访问时候出现异常。
一. Tianmu::core::RCAttr::GetNotNullValueInt64异常
问题描述:
(gdb) bt
#0 0x0000000002dc0c0e in Tianmu::mm::TraceableObject::IsLocked (this=0x0) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/mm/traceable_object.h:91
#1 0x0000000002de9cd4 in Tianmu::core::RCAttr::GetNotNullValueInt64 (this=0x7f242803f280, obj=327680)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/rc_attr.h:131
#2 0x0000000002eac933 in Tianmu::vcolumn::SingleColumn::GetNotNullValueInt64 (this=0x7f24280561d0, mit=...)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/vc/single_column.h:62
#3 0x0000000003062914 in Tianmu::core::Descriptor::CheckSetCondition (this=0x7f2428a913a0, mit=..., op=Tianmu::common::O_IN)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/descriptor.cpp:1354
#4 0x000000000305fd79 in Tianmu::core::Descriptor::CheckCondition (this=0x7f2428a913a0, mit=...)
at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/descriptor.cpp:1069
#5 0x000000000314b35e in Tianmu::core::JoinerGeneral::ExecuteInnerJoinPackRow (this=0x7f2428a91770, mii=0x7f45627facf0, cond=0x7f47989178c0, new_mind=0x7f4798917590,
all_dims=0x7f4798917770, pack_desc_locked=0x7f4798917740, tuples_in_output=0x7f4798917588, limit=-1, count_only=false, stop_execution=0x7f479891742f, rows_passed=0x7f4798917438,
rows_omitted=0x7f4798917430) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/joiner_general.cpp:123
#6 0x000000000314b6ed in Tianmu::core::JoinerGeneral::TaskInnerJoinPacks (this=0x7f2428a91770, taskIterator=0x7f2428a92548, task=0x7f2428a91f48, cond=0x7f47989178c0,
new_mind=0x7f4798917590, all_dims=0x7f4798917770, pack_desc_locked=0x7f4798917740, tuples_in_output=0x7f4798917588, limit=-1, count_only=false, stop_execution=0x7f479891742f,
rows_passed=0x7f4798917438, rows_omitted=0x7f4798917430) at /home/jenkins/workspace/stonedb5.7-zsl-centos7.9-30-119-20220805/storage/tianmu/core/joiner_general.cpp:164
// Get value which we already know as not null
int64_t GetNotNullValueInt64(int64_t obj) const override {
int pack = row2pack(obj);
const auto &dpn = get_dpn(pack);
if (!dpn.Trivial()) {
DEBUG_ASSERT(get_pack(pack)->IsLocked());
int64_t res = get_packN(pack)->GetValInt(row2offset(obj)); // 2-level encoding
// Natural encoding
if (ATI::IsRealType(TypeName())) return res;
res += dpn.min_i;
return res;
}
// the only possibility: uniform
return dpn.min_i;
}
DEBUG_ASSERT(get_pack(pack)->IsLocked());
Pack *RCAttr::get_pack(size_t i) { return reinterpret_cast<Pack *>(get_dpn(i).GetPackPtr() & tag_mask); }
DPN &get_dpn(size_t i) {
ASSERT(i < m_idx.size(), "bad dpn index " + std::to_string(i) + "/" + std::to_string(m_idx.size()));
return *m_share->get_dpn_ptr(m_idx[i]);
}
问题分析:
有一些关于代码逻辑的问题:
- 在MIIterator递增的过程中, 会对底层模块的数据有那些影响?
- VCPackGuardian::LockPackrow如何加载数据