-
介绍
Apache Arrow 是一个用于内存分析的开发平台。它包含了一系列技术,使大数据系统能够快速处理数据。它定义了一种标准化的、与语言无关的列式内存格式,适用于现代硬件上高效地进行分析操作,支持平面和层次化数据的组织。
Arrow 中的 Compute 实现了多种多样的计算功能,了解 Arrow 的 Compute 有利于我们使用 Arrow 的计算能力。
-
Compute 使用
在了解 Arrow Compute 的实现原理之前,先简单的了解怎么使用的,这有助于我们对其实现有一个基本了解。
-
使用 arrow::compute::CallFunction()
下面的例子调用 add 函数
std::shared_ptr<arrow::Array> numbers_array = ...;
std::shared_ptr<arrow::Scalar> increment = ...;
arrow::Datum incremented_datum;
ARROW_ASSIGN_OR_RAISE(incremented_datum,
arrow::compute::CallFunction("add", {numbers_array, increment}));
std::shared_ptr<Array> incremented_array = std::move(incremented_datum).make_array();
这个示例使用了从
std::shared_ptr<Array>
到Datum
的隐式类型转换-
直接调用 arrow 的 APIs
Compute 的输入以一个通用的
Datum
来表示,这是一个 std::variant(可变类型),包含多种类型,ScalarArray 和 ChunkedArray,如下所示:enum Kind { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE };
std::variant<Empty, std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
std::shared_ptr<Table>>
大部分 compute functions 的输入即支持 Array(chunked or not)又支持 Scalar ,然而,有些函数则强制要求特定的输入类型。例如,
array_sort_indices
函数要求其第一个且唯一一个输入必须是数组(array),而泛化的sort_indices
函数则接受数组(array)、chunked array, record batch 或者 table。调用 arrow::compute::Add() 实现加法
std::shared_ptr<arrow::Array> numbers_array = ...;
std::shared_ptr<arrow::Scalar> increment = ...;
arrow::Datum incremented_datum;
ARROW_ASSIGN_OR_RAISE(incremented_datum,
arrow::compute::Add(numbers_array, increment));
std::shared_ptr<Array> incremented_array = std::move(incremented_datum).make_array();
一些函数接受或需要一个选项结构,这个结构决定了函数的确切语义:
ScalarAggregateOptions scalar_aggregate_options;
scalar_aggregate_options.skip_nulls = false;
std::shared_ptr<arrow::Array> array = ...;
arrow::Datum min_max;
ARROW_ASSIGN_OR_RAISE(min_max,
arrow::compute::CallFunction("min_max", {array},
&scalar_aggregate_options));
// Unpack struct scalar result (a two-field {"min", "max"} scalar)
std::shared_ptr<arrow::Scalar> min_value, max_value;
min_value = min_max.scalar_as<arrow::StructScalar>().value[0];
max_value = min_max.scalar_as<arrow::StructScalar>().value[1];
上述的 compute 计算涉及到 Datum,其结构如下, 其实就是 Arrow 的各种 Array 类型
std::variant<Empty, std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
std::shared_ptr<Table>>
-
Function Registry
在了解了使用方式后,我们将进一步的分析 Compute 计算的功能和实现。Function Registry 管理了 Compute 中 所有 Arrow 用到的函数,其核心是存在两个 std::unordered_map 通过 name 来找对应的 Function 和 FunctinOptionsType。
class FunctionRegistry::FunctionRegistryImpl {
...
std::unordered_map<std::string, std::shared_ptr<Function>> name_to_function_;
std::unordered_map<std::string, const FunctionOptionsType*> name_to_options_type_;
};
关于 FunctionRegistry 的使用
auto default_registry = GetFunctionRegistry();
auto registry = FunctionRegistry::Make(default_registry);
for (std::string func_name : {"f1", "f2"}) {
std::shared_ptr<Function> func = std::make_shared<ScalarFunction>(
func_name, Arity::Unary(), /*doc=*/FunctionDoc::Empty());
ASSERT_OK(registry->CanAddFunction(func));
ASSERT_OK(registry->AddFunction(func));
ASSERT_RAISES(KeyError, registry->CanAddFunction(func));
ASSERT_RAISES(KeyError, registry->AddFunction(func));
ASSERT_OK(default_registry->CanAddFunction(func));
}
-
Function 与 Kernel
Arrow 的 Function 是所有函数的 base class,可以通过 Function Registry 获取到对应的 Function,然后调用 Execute 执行该函数。Kernel 是每一个 Function 的具体实现,例如 Add(int32_t, int32_t), Add(int64_t, int64_t) 的具体实现就属于 Kernel,而 Function 可以认为是 Add(T, T), 是一个模板。Function 可以通过 DispatchExact/DispatchBest 得到具体的 Kernel。
Function 的 kind 如下:
enum Kind {
/// A function that performs scalar data operations on whole arrays of
/// data. Can generally process Array or Scalar values. The size of the
/// output will be the same as the size (or broadcasted size, in the case
/// of mixing Array and Scalar inputs) of the input.
SCALAR,
/// A function with array input and output whose behavior depends on the
/// values of the entire arrays passed, rather than the value of each scalar
/// value.
VECTOR,
/// A function that computes scalar summary statistics from array input.
SCALAR_AGGREGATE,
/// A function that computes grouped summary statistics from array input
/// and an array of group identifiers.
HASH_AGGREGATE,
/// A function that dispatches to other functions and does not contain its
/// own kernels.
META
};
FunctionImpl 内部会保留所有 Add 进来的 Kernel
template <typename KernelType>
class FunctionImpl : public Function {
...
std::vector<KernelType> kernels_;
};
上面 kind 中指定的 5 中 Function 都是通过下面的 AddKernel 函数来保存到 FunctionImpl::kernels_ 内部。
function.h
class ScalarFunction : public detail::FunctionImpl<ScalarKernel> {
...
Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
ArrayKernelExec exec, KernelInit init = NULLPTR);
Status AddKernel(ScalarKernel kernel);
};
class VectorFunction : public detail::FunctionImpl<VectorKernel> {
...
Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
ArrayKernelExec exec, KernelInit init = NULLPTR);
Status AddKernel(VectorKernel kernel);
};
class HashAggregateFunction : public detail::FunctionImpl<HashAggregateKernel> {
...
Status AddKernel(HashAggregateKernel kernel);
};
class MetaFunction : public Function {
// 无 AddKernel
};
-
Kernel
Kernel 是某个函数的具体实现,并且还会保留函数计算过程的状态,相对 Function 会复杂一些。与 Kernel 先关的关键几个类,
ExecContext
, SelectionVector
,ExecBatch(ExecSpan)
,Executor
,KernelState
, KernelContext
。Kernel 的部分 uml 如下所示:
KernelState 的部分 uml 如下所示:
Kernel 保存的是具体的函数的实现,而 KernelState 保留的是该函数计算的中间状态和结果。
由于对同一种算法会根据输入数据的类型有不同 Kernel(例如对 加法,输入参数有 int32, int64 等),因此,Kernel 在构造的时候就参考了不同算法的输入类型和输出类型,
KernelSignature
这个类其实也是根据不同算法的 输入类型和输出类型 来生成不同的对象。-
如何搜索具体的 Function
InputType::Matches
会根据当前 kind
的类型选择不同的比较方式,如果 kind == EXACT_TYPE
则会走 TypeEquals
利用 Arrow DataType
的 EqualityComparable
类定义的 operator==
来比较,否则 根据typeid
来匹配输入参数是否一样。对于嵌套类型,则会走 VisitTypeInline
来逐个比较。如果
kind == USE_TYPE_MATCHER
则会走 xxxMatcher::Matches() 来比较。-
Function 的执行
Function 的执行涉及到两个核心类
FunctionExecutor
, KernelExecutor
根据前面介绍 Kernel 才是某个函数执行的具体实现,因此 FunctionExecutor
只是封装了调用 KernelExecutor
的相关接口KernelExecutor
的 uml 图如下所示:具体的 kernel 保存在 KernelExecutorImpl::kernel_ 与 KernelExecutorImpl::kernel_ctx_。
函数的执行过程的流程图:
目前只有
ScalarKernel
和 VectorKernel
有 ArrayKernelExec exec
因此 kernels 下面以 scaler_xxx.cc 和 vector_xxx.cc 里面包含了具体的 exec 的实现。Init 的过程
-
Function 与 Kernel 的关系
Function 与 kernel 的关系如下图所示,Arrow 将 Function 和 Kernel 等同起来,一个 Function 就是一个抽象的函数模板,而每个具体的 Kernel 就是该 Function 对应的具体类型的模板的特化
-
总结
Function
和 Kernel
共同组成 Arrow 中的 Compute 中的函数,Function Registry 则管理了这些函数,开发人员可以通过在 Function Registry 中找到计算某种类型的函数就能使用。