searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Appache Arrow Compute 介绍

2024-09-18 09:21:20
19
0
  1. 介绍

Apache Arrow 是一个用于内存分析的开发平台。它包含了一系列技术,使大数据系统能够快速处理数据。它定义了一种标准化的、与语言无关的列式内存格式,适用于现代硬件上高效地进行分析操作,支持平面和层次化数据的组织。
Arrow 中的 Compute 实现了多种多样的计算功能,了解 Arrow 的 Compute 有利于我们使用 Arrow 的计算能力。
  1. Compute 使用

在了解 Arrow Compute 的实现原理之前,先简单的了解怎么使用的,这有助于我们对其实现有一个基本了解。
  1. 使用 arrow::compute::CallFunction()

下面的例子调用 add 函数
std::shared_ptr<arrow::Array> numbers_array = ...;
std::shared_ptr<arrow::Scalar> increment = ...;
arrow::Datum incremented_datum;

ARROW_ASSIGN_OR_RAISE(incremented_datum,
                      arrow::compute::CallFunction("add", {numbers_array, increment}));
std::shared_ptr<Array> incremented_array = std::move(incremented_datum).make_array();
这个示例使用了从std::shared_ptr<Array>Datum的隐式类型转换
  1. 直接调用 arrow 的 APIs

Compute 的输入以一个通用的Datum来表示,这是一个 std::variant(可变类型),包含多种类型,ScalarArray 和 ChunkedArray,如下所示:
enum Kind { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE };

std::variant<Empty, std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
               std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
               std::shared_ptr<Table>>
大部分 compute functions 的输入即支持 Array(chunked or not)又支持 Scalar ,然而,有些函数则强制要求特定的输入类型。例如,array_sort_indices函数要求其第一个且唯一一个输入必须是数组(array),而泛化的sort_indices函数则接受数组(array)、chunked array, record batch 或者 table。
调用 arrow::compute::Add() 实现加法
std::shared_ptr<arrow::Array> numbers_array = ...;
std::shared_ptr<arrow::Scalar> increment = ...;
arrow::Datum incremented_datum;

ARROW_ASSIGN_OR_RAISE(incremented_datum,
                      arrow::compute::Add(numbers_array, increment));
std::shared_ptr<Array> incremented_array = std::move(incremented_datum).make_array();
一些函数接受或需要一个选项结构,这个结构决定了函数的确切语义:
ScalarAggregateOptions scalar_aggregate_options;
scalar_aggregate_options.skip_nulls = false;

std::shared_ptr<arrow::Array> array = ...;
arrow::Datum min_max;

ARROW_ASSIGN_OR_RAISE(min_max,
                      arrow::compute::CallFunction("min_max", {array},
                                                   &scalar_aggregate_options));

// Unpack struct scalar result (a two-field {"min", "max"} scalar)
std::shared_ptr<arrow::Scalar> min_value, max_value;
min_value = min_max.scalar_as<arrow::StructScalar>().value[0];
max_value = min_max.scalar_as<arrow::StructScalar>().value[1];
上述的 compute 计算涉及到 Datum,其结构如下, 其实就是 Arrow 的各种 Array 类型
std::variant<Empty, std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
               std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
               std::shared_ptr<Table>>
  1. Function Registry

在了解了使用方式后,我们将进一步的分析 Compute 计算的功能和实现。Function Registry 管理了 Compute 中 所有 Arrow 用到的函数,其核心是存在两个 std::unordered_map 通过 name 来找对应的 Function 和 FunctinOptionsType。
class FunctionRegistry::FunctionRegistryImpl {
...
  std::unordered_map<std::string, std::shared_ptr<Function>> name_to_function_;
  std::unordered_map<std::string, const FunctionOptionsType*> name_to_options_type_;
};
关于 FunctionRegistry 的使用
auto default_registry = GetFunctionRegistry();

auto registry = FunctionRegistry::Make(default_registry);
for (std::string func_name : {"f1", "f2"}) {
  std::shared_ptr<Function> func = std::make_shared<ScalarFunction>(
      func_name, Arity::Unary(), /*doc=*/FunctionDoc::Empty());
  ASSERT_OK(registry->CanAddFunction(func));
  ASSERT_OK(registry->AddFunction(func));
  ASSERT_RAISES(KeyError, registry->CanAddFunction(func));
  ASSERT_RAISES(KeyError, registry->AddFunction(func));
  ASSERT_OK(default_registry->CanAddFunction(func));
}
  1. Function 与 Kernel

Arrow 的 Function 是所有函数的 base class,可以通过 Function Registry 获取到对应的 Function,然后调用 Execute 执行该函数。Kernel 是每一个 Function 的具体实现,例如 Add(int32_t, int32_t), Add(int64_t, int64_t) 的具体实现就属于 Kernel,而 Function 可以认为是 Add(T, T), 是一个模板。Function 可以通过 DispatchExact/DispatchBest 得到具体的 Kernel。
Function 的 kind 如下:
enum Kind {
    /// A function that performs scalar data operations on whole arrays of
    /// data. Can generally process Array or Scalar values. The size of the
    /// output will be the same as the size (or broadcasted size, in the case
    /// of mixing Array and Scalar inputs) of the input.
    SCALAR,

    /// A function with array input and output whose behavior depends on the
    /// values of the entire arrays passed, rather than the value of each scalar
    /// value.
    VECTOR,

    /// A function that computes scalar summary statistics from array input.
    SCALAR_AGGREGATE,

    /// A function that computes grouped summary statistics from array input
    /// and an array of group identifiers.
    HASH_AGGREGATE,

    /// A function that dispatches to other functions and does not contain its
    /// own kernels.
    META
  };
 
FunctionImpl 内部会保留所有 Add 进来的 Kernel
template <typename KernelType>
class FunctionImpl : public Function {
...
  std::vector<KernelType> kernels_;
};
上面 kind 中指定的 5 中 Function 都是通过下面的 AddKernel 函数来保存到 FunctionImpl::kernels_ 内部。
function.h

class ScalarFunction : public detail::FunctionImpl<ScalarKernel> {
...
  Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
                   ArrayKernelExec exec, KernelInit init = NULLPTR);
  Status AddKernel(ScalarKernel kernel);
};

class VectorFunction : public detail::FunctionImpl<VectorKernel> {
...
  Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
                   ArrayKernelExec exec, KernelInit init = NULLPTR);
  Status AddKernel(VectorKernel kernel);
};

class HashAggregateFunction : public detail::FunctionImpl<HashAggregateKernel> {
...
  Status AddKernel(HashAggregateKernel kernel);
};

class MetaFunction : public Function {
// 无 AddKernel
};
  1. Kernel

Kernel 是某个函数的具体实现,并且还会保留函数计算过程的状态,相对 Function 会复杂一些。与 Kernel 先关的关键几个类,ExecContext, SelectionVector,ExecBatch(ExecSpan),Executor,KernelState, KernelContext
Kernel 的部分 uml 如下所示:
KernelState 的部分 uml 如下所示:
 
Kernel 保存的是具体的函数的实现,而 KernelState 保留的是该函数计算的中间状态和结果。
由于对同一种算法会根据输入数据的类型有不同 Kernel(例如对 加法,输入参数有 int32, int64 等),因此,Kernel 在构造的时候就参考了不同算法的输入类型和输出类型,KernelSignature 这个类其实也是根据不同算法的 输入类型和输出类型 来生成不同的对象。
 
  1. 如何搜索具体的 Function

InputType::Matches 会根据当前 kind 的类型选择不同的比较方式,如果 kind == EXACT_TYPE 则会走 TypeEquals 利用 Arrow DataTypeEqualityComparable 类定义的 operator== 来比较,否则 根据typeid 来匹配输入参数是否一样。对于嵌套类型,则会走 VisitTypeInline 来逐个比较。
 
如果 kind == USE_TYPE_MATCHER则会走 xxxMatcher::Matches() 来比较。
  1. Function 的执行

Function 的执行涉及到两个核心类 FunctionExecutor, KernelExecutor根据前面介绍 Kernel 才是某个函数执行的具体实现,因此 FunctionExecutor 只是封装了调用 KernelExecutor 的相关接口
KernelExecutor 的 uml 图如下所示:
具体的 kernel 保存在 KernelExecutorImpl::kernel_ 与 KernelExecutorImpl::kernel_ctx_。
函数的执行过程的流程图:

目前只有 ScalarKernelVectorKernelArrayKernelExec exec 因此 kernels 下面以 scaler_xxx.cc 和 vector_xxx.cc 里面包含了具体的 exec 的实现。
Init 的过程

 

  1. Function 与 Kernel 的关系

Function 与 kernel 的关系如下图所示,Arrow 将 Function 和 Kernel 等同起来,一个 Function 就是一个抽象的函数模板,而每个具体的 Kernel 就是该 Function 对应的具体类型的模板的特化

 

  1. 总结

FunctionKernel 共同组成 Arrow 中的 Compute 中的函数,Function Registry 则管理了这些函数,开发人员可以通过在 Function Registry 中找到计算某种类型的函数就能使用。
0条评论
作者已关闭评论
张****强
2文章数
0粉丝数
张****强
2 文章 | 0 粉丝
张****强
2文章数
0粉丝数
张****强
2 文章 | 0 粉丝
原创

Appache Arrow Compute 介绍

2024-09-18 09:21:20
19
0
  1. 介绍

Apache Arrow 是一个用于内存分析的开发平台。它包含了一系列技术,使大数据系统能够快速处理数据。它定义了一种标准化的、与语言无关的列式内存格式,适用于现代硬件上高效地进行分析操作,支持平面和层次化数据的组织。
Arrow 中的 Compute 实现了多种多样的计算功能,了解 Arrow 的 Compute 有利于我们使用 Arrow 的计算能力。
  1. Compute 使用

在了解 Arrow Compute 的实现原理之前,先简单的了解怎么使用的,这有助于我们对其实现有一个基本了解。
  1. 使用 arrow::compute::CallFunction()

下面的例子调用 add 函数
std::shared_ptr<arrow::Array> numbers_array = ...;
std::shared_ptr<arrow::Scalar> increment = ...;
arrow::Datum incremented_datum;

ARROW_ASSIGN_OR_RAISE(incremented_datum,
                      arrow::compute::CallFunction("add", {numbers_array, increment}));
std::shared_ptr<Array> incremented_array = std::move(incremented_datum).make_array();
这个示例使用了从std::shared_ptr<Array>Datum的隐式类型转换
  1. 直接调用 arrow 的 APIs

Compute 的输入以一个通用的Datum来表示,这是一个 std::variant(可变类型),包含多种类型,ScalarArray 和 ChunkedArray,如下所示:
enum Kind { NONE, SCALAR, ARRAY, CHUNKED_ARRAY, RECORD_BATCH, TABLE };

std::variant<Empty, std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
               std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
               std::shared_ptr<Table>>
大部分 compute functions 的输入即支持 Array(chunked or not)又支持 Scalar ,然而,有些函数则强制要求特定的输入类型。例如,array_sort_indices函数要求其第一个且唯一一个输入必须是数组(array),而泛化的sort_indices函数则接受数组(array)、chunked array, record batch 或者 table。
调用 arrow::compute::Add() 实现加法
std::shared_ptr<arrow::Array> numbers_array = ...;
std::shared_ptr<arrow::Scalar> increment = ...;
arrow::Datum incremented_datum;

ARROW_ASSIGN_OR_RAISE(incremented_datum,
                      arrow::compute::Add(numbers_array, increment));
std::shared_ptr<Array> incremented_array = std::move(incremented_datum).make_array();
一些函数接受或需要一个选项结构,这个结构决定了函数的确切语义:
ScalarAggregateOptions scalar_aggregate_options;
scalar_aggregate_options.skip_nulls = false;

std::shared_ptr<arrow::Array> array = ...;
arrow::Datum min_max;

ARROW_ASSIGN_OR_RAISE(min_max,
                      arrow::compute::CallFunction("min_max", {array},
                                                   &scalar_aggregate_options));

// Unpack struct scalar result (a two-field {"min", "max"} scalar)
std::shared_ptr<arrow::Scalar> min_value, max_value;
min_value = min_max.scalar_as<arrow::StructScalar>().value[0];
max_value = min_max.scalar_as<arrow::StructScalar>().value[1];
上述的 compute 计算涉及到 Datum,其结构如下, 其实就是 Arrow 的各种 Array 类型
std::variant<Empty, std::shared_ptr<Scalar>, std::shared_ptr<ArrayData>,
               std::shared_ptr<ChunkedArray>, std::shared_ptr<RecordBatch>,
               std::shared_ptr<Table>>
  1. Function Registry

在了解了使用方式后,我们将进一步的分析 Compute 计算的功能和实现。Function Registry 管理了 Compute 中 所有 Arrow 用到的函数,其核心是存在两个 std::unordered_map 通过 name 来找对应的 Function 和 FunctinOptionsType。
class FunctionRegistry::FunctionRegistryImpl {
...
  std::unordered_map<std::string, std::shared_ptr<Function>> name_to_function_;
  std::unordered_map<std::string, const FunctionOptionsType*> name_to_options_type_;
};
关于 FunctionRegistry 的使用
auto default_registry = GetFunctionRegistry();

auto registry = FunctionRegistry::Make(default_registry);
for (std::string func_name : {"f1", "f2"}) {
  std::shared_ptr<Function> func = std::make_shared<ScalarFunction>(
      func_name, Arity::Unary(), /*doc=*/FunctionDoc::Empty());
  ASSERT_OK(registry->CanAddFunction(func));
  ASSERT_OK(registry->AddFunction(func));
  ASSERT_RAISES(KeyError, registry->CanAddFunction(func));
  ASSERT_RAISES(KeyError, registry->AddFunction(func));
  ASSERT_OK(default_registry->CanAddFunction(func));
}
  1. Function 与 Kernel

Arrow 的 Function 是所有函数的 base class,可以通过 Function Registry 获取到对应的 Function,然后调用 Execute 执行该函数。Kernel 是每一个 Function 的具体实现,例如 Add(int32_t, int32_t), Add(int64_t, int64_t) 的具体实现就属于 Kernel,而 Function 可以认为是 Add(T, T), 是一个模板。Function 可以通过 DispatchExact/DispatchBest 得到具体的 Kernel。
Function 的 kind 如下:
enum Kind {
    /// A function that performs scalar data operations on whole arrays of
    /// data. Can generally process Array or Scalar values. The size of the
    /// output will be the same as the size (or broadcasted size, in the case
    /// of mixing Array and Scalar inputs) of the input.
    SCALAR,

    /// A function with array input and output whose behavior depends on the
    /// values of the entire arrays passed, rather than the value of each scalar
    /// value.
    VECTOR,

    /// A function that computes scalar summary statistics from array input.
    SCALAR_AGGREGATE,

    /// A function that computes grouped summary statistics from array input
    /// and an array of group identifiers.
    HASH_AGGREGATE,

    /// A function that dispatches to other functions and does not contain its
    /// own kernels.
    META
  };
 
FunctionImpl 内部会保留所有 Add 进来的 Kernel
template <typename KernelType>
class FunctionImpl : public Function {
...
  std::vector<KernelType> kernels_;
};
上面 kind 中指定的 5 中 Function 都是通过下面的 AddKernel 函数来保存到 FunctionImpl::kernels_ 内部。
function.h

class ScalarFunction : public detail::FunctionImpl<ScalarKernel> {
...
  Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
                   ArrayKernelExec exec, KernelInit init = NULLPTR);
  Status AddKernel(ScalarKernel kernel);
};

class VectorFunction : public detail::FunctionImpl<VectorKernel> {
...
  Status AddKernel(std::vector<InputType> in_types, OutputType out_type,
                   ArrayKernelExec exec, KernelInit init = NULLPTR);
  Status AddKernel(VectorKernel kernel);
};

class HashAggregateFunction : public detail::FunctionImpl<HashAggregateKernel> {
...
  Status AddKernel(HashAggregateKernel kernel);
};

class MetaFunction : public Function {
// 无 AddKernel
};
  1. Kernel

Kernel 是某个函数的具体实现,并且还会保留函数计算过程的状态,相对 Function 会复杂一些。与 Kernel 先关的关键几个类,ExecContext, SelectionVector,ExecBatch(ExecSpan),Executor,KernelState, KernelContext
Kernel 的部分 uml 如下所示:
KernelState 的部分 uml 如下所示:
 
Kernel 保存的是具体的函数的实现,而 KernelState 保留的是该函数计算的中间状态和结果。
由于对同一种算法会根据输入数据的类型有不同 Kernel(例如对 加法,输入参数有 int32, int64 等),因此,Kernel 在构造的时候就参考了不同算法的输入类型和输出类型,KernelSignature 这个类其实也是根据不同算法的 输入类型和输出类型 来生成不同的对象。
 
  1. 如何搜索具体的 Function

InputType::Matches 会根据当前 kind 的类型选择不同的比较方式,如果 kind == EXACT_TYPE 则会走 TypeEquals 利用 Arrow DataTypeEqualityComparable 类定义的 operator== 来比较,否则 根据typeid 来匹配输入参数是否一样。对于嵌套类型,则会走 VisitTypeInline 来逐个比较。
 
如果 kind == USE_TYPE_MATCHER则会走 xxxMatcher::Matches() 来比较。
  1. Function 的执行

Function 的执行涉及到两个核心类 FunctionExecutor, KernelExecutor根据前面介绍 Kernel 才是某个函数执行的具体实现,因此 FunctionExecutor 只是封装了调用 KernelExecutor 的相关接口
KernelExecutor 的 uml 图如下所示:
具体的 kernel 保存在 KernelExecutorImpl::kernel_ 与 KernelExecutorImpl::kernel_ctx_。
函数的执行过程的流程图:

目前只有 ScalarKernelVectorKernelArrayKernelExec exec 因此 kernels 下面以 scaler_xxx.cc 和 vector_xxx.cc 里面包含了具体的 exec 的实现。
Init 的过程

 

  1. Function 与 Kernel 的关系

Function 与 kernel 的关系如下图所示,Arrow 将 Function 和 Kernel 等同起来,一个 Function 就是一个抽象的函数模板,而每个具体的 Kernel 就是该 Function 对应的具体类型的模板的特化

 

  1. 总结

FunctionKernel 共同组成 Arrow 中的 Compute 中的函数,Function Registry 则管理了这些函数,开发人员可以通过在 Function Registry 中找到计算某种类型的函数就能使用。
文章来自个人专栏
学习大数据
2 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0