searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

向量化执行插件Arrow功能介绍

2024-10-10 02:06:30
13
0

 Arrow重要数据结构

1)   GarrowType

GarrowTypeArrow中的数据类型,比较重要的两种数据类型为GArrowArrayGArrowTable。其中GArrowArray代表Arrow 数组,通常用于表示一维数据,如一个整型数组或一个字符串数组,在teledbx中向量化引擎中GArrowArray用来存放向量化batch中的一列数据;其中GArrowTable代表Arrow表,是二维数据,包含多列。

2)   GArrowRecordBatch

GArrowRecordBatchArrow中的一种数据类型,代表一个固定大小的行集合,每行包含多个列。在teledbx中向量化执行引擎中GArrowRecordBatch为核心数据结构之一,每个执行算子会同时对一个GArrowRecordBatch的数据进行处理,然后返回一个GArrowRecordBatch数据,与非向量化执行引擎中的TupleTableSlot相对应。

3)   GArrowSchema

GArrowSchemaArrow中的一种数据类型,用于对GArrowRecordBatch结构进行描述,包括batch中各个列的名字、类型以及其他元数据信息。在teledbx中向量化执行引擎中GArrowSchema也是对一个batch进行描述,与非向量化引擎中的TupleDesc相对应。

4)   GArrowField

GArrowFieldArrow中的一种数据类型,用户表示一个字段,在Arrow中每个字段都包含了字段的名称、数据类型以及是否为空等元信息。在teledbx向量化引擎中表中每列信息代表一个字段。

5)   GArrowExecutePlan

GArrowExecutePlan用于表示Arrow中的执行计划,包含了输入数据、输出模式以及一些列操作节点。

typedef struct _GArrowExecutePlan {
  GArrowSchema *output_schema;   // 执行计划输出的模式
  guint n_inputs;                // 输入记录批次的数量
  GArrowRecordBatch **inputs;    // 指向GArrowRecordBatch对象的指针数组,每个元素代表一个输入记录批次
  guint n_outputs;               // 输出记录批次的数量
  GArrowRecordBatch **outputs;   // 指向GArrowRecordBatch对象的指针数组,每个元素代表一个输出记录批次
  gboolean is_valid;             // 标志位,指示执行计划是否有效
  // 其他内部状态和辅助数据结构
} GArrowExecutePlan;

 Arrow重要函数接口

1)   GArrowExecutePlan *garrow_execute_plan_new(void)

创建一个执行计划对象。

2)   garrow_source_node_options_new_record_batch_reader

创建一个新的GArrowSourceNodeOptions 对象,该对象用于指定从文件或其他数据源读取数据的方式。

GArrowSourceNodeOptions *
garrow_source_node_options_new_record_batch_reader(
const gchar *format,
  const gchar *uri,
  GArrowFileSource *file_source,
  GError **error);

3)    garrow_execute_plan_build_source_node

用于在执行计划中构建一个源节点。源节点负责从指定的数据源读取数据,并将其提供给执行计划中的后续节点进行处理。
GArrowSourceNode *
garrow_execute_plan_build_source_node(GArrowExecutePlan *plan,
                                      GArrowSourceNodeOptions *options,
                                      GError **error);

4)    garrow_sort_node_options_new

用于创建一个新的排序节点选项对象 (GArrowSortNodeOptions)。这个选项对象用于指定如何对数据进行排序。
GArrowSortNodeOptions *
garrow_sort_node_options_new(GArrowExpression *key,
                             gboolean ascending,
                                  GError **error);

5)    garrow_sort_node_new

用于创建一个新的排序节点 (GArrowSortNode),该节点可以根据给定的排序选项对输入的数据进行排序。
GArrowSortNode *
garrow_sort_node_new(GArrowRecordBatch *input,
                     GArrowSortNodeOptions *options,
                     GError **error);

6)    garrow_aggregate_node_options_new

用于创建一个新的聚合节点选项对象 (GArrowAggregateNodeOptions),这个选项对象用于指定如何对数据进行聚合操作
GArrowAggregateNodeOptions *
garrow_aggregate_node_options_new(
GArrowExpression *group_key,
      GArrowExpression *aggregate_expr,
      GArrowAggregateFunction aggregate_function,
      GError **error);

7)    garrow_aggregate_node_new

用于创建一个新的聚合节点 (GArrowAggregateNode),并将聚合节点添加到执行计划中。
GArrowAggregateNode *
garrow_aggregate_node_new(GArrowRecordBatch *input,
                          GArrowAggregateNodeOptions *options,
                              GError **error);

8)    garrow_execute_plan_start

用于启动执行计划,启动执行计划意味着开始按照计划中定义的顺序执行这些步骤。
gboolean
garrow_execute_plan_start(GArrowExecutePlan *plan,
                          GError **error);

9)    garrow_execute_plan_wait

用于等待执行计划中的所有操作完成。这意味着函数会阻塞调用线程,直到所有的数据处理步骤(如读取、转换、聚合等)都完成为止。
gboolean
garrow_execute_plan_wait(GArrowExecutePlan *plan,
                         GError **error);

10)    garrow_record_batch_reader_options_new

用于创建一个新的 GArrowRecordBatchReaderOptions 对象。这个对象包含了配置 GArrowRecordBatchReader 所需的各种选项,比如输入数据的位置、格式等。
GArrowRecordBatchReaderOptions *
garrow_record_batch_reader_options_new(void);

11)    garrow_record_batch_reader_new

用于创建一个新的 GArrowRecordBatchReader 对象。这个对象可以从指定的数据源中读取 Arrow RecordBatch 数据。
GArrowRecordBatchReader *
garrow_record_batch_reader_new(GArrowRecordBatchReaderOptions *options,
                               GError **error);

12)    garrow_record_batch_reader_read_next

用于从 GArrowRecordBatchReader 中读取下一个数据批次(GArrowRecordBatch)。这个函数是迭代读取数据的关键部分,尤其适用于处理大数据集,因为它不需要一次性加载所有数据到内存中。
gboolean
garrow_record_batch_reader_read_next(GArrowRecordBatchReader *reader,
                                     GArrowRecordBatch **out_batch,
                                     GError **error);

13)    g_object_unref

用于减少一个 GObject 的引用计数。当引用计数降到零时,该对象会被销毁(即调用其析构函数,并释放其占用的内存)。这是 GObject 引用计数机制的一部分,用于自动管理内存,防止内存泄漏。

 

Arrow实现数据查询实例

Apache Arrow支持多种执行算子,这些算子用于实现各种数据处理任务,例如读取数据、转换数据、聚合数据、排序数据等。以下是Apache Arrow支持的一些主要算子。

1.    Source Nodes

算子名称

功能

File Source

从文件读取数据

Memory Source

从内存读取数据

HTTP Source

HTTP/HTTPS URL 读取数据

Database Source

从数据库读取数据

 

2.   Transform Nodes

算子名称

功能

Filter

根据条件过滤数据行

Project

选择特定列,投影操作

Aggregate

对数据进行聚合计算

Sort

对数据进行排序

Union

合并多个数据集

Join

进行表连接操作

Limit

限制输出行的数量

Distinct

去除重复行

Cast

类型转换

Fill Null

填充空值

Resample

时间序列数据的重采样

GroupBy

分组操作

3.   Sink Nodes

算子名称

功能

File Sink

将数据写入文件

Memory Sink

将数据写入内存

HTTP Sink

将数据发送到 HTTP/HTTPS URL

Database Sink

将数据写入数据库

以下示例为通过Arrow扫描data.csv文件中的数据,并对数据进行排序和聚合操作。

#include <garrow-0.2/garrow.h>

int main() {
  GError *error = NULL;

  // 创建执行计划
  GArrowExecutePlan *plan = garrow_execute_plan_new();

  // 创建源节点选项
  GArrowSourceNodeOptions *options = garrow_source_node_options_new_record_batch_reader(
    "csv",  // 数据格式
    "/path/to/data.csv",  // 数据文件的路径
    NULL,  // 使用默认的 RecordBatchReader
    &error);
  if (error != NULL) {
    g_printerr("Error creating source node options: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 构建源节点并将其添加到执行计划中
  GArrowSourceNode *source_node = garrow_execute_plan_build_source_node(
    plan,
    options,
    &error);
  if (error != NULL) {
    g_printerr("Error building source node: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 创建排序节点选项
  GArrowSortNodeOptions *sort_options = garrow_sort_node_options_new();
  GArrowField *sort_field = garrow_field_new("age");  // 假设数据中有 "age" 字段
  GArrowExpression *sort_expr = garrow_expression_new_field(sort_field);
  garrow_sort_node_options_set_sort_key(sort_options, sort_expr);
  garrow_sort_node_options_set_ascending(sort_options, TRUE);  // 升序排序

  // 构建排序节点并与源节点关联
  GArrowSortNode *sort_node = garrow_sort_node_new(
    source_node,
    sort_options,
    &error);
  if (error != NULL) {
    g_printerr("Error creating sort node: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 创建聚合节点选项
  GArrowAggregateNodeOptions *agg_options = garrow_aggregate_node_options_new();
  GArrowField *agg_field = garrow_field_new("salary");  // 假设数据中有 "salary" 字段
  GArrowExpression *agg_expr = garrow_expression_new_field(agg_field);
  GArrowAggregateFunction *agg_func = garrow_aggregate_function_new_sum(agg_expr);
  garrow_aggregate_node_options_set_aggregate_function(agg_options, agg_func);

  // 构建聚合节点并与排序节点关联
  GArrowAggregateNode *agg_node = garrow_aggregate_node_new(
    sort_node,
    agg_options,
    &error);
  if (error != NULL) {
    g_printerr("Error creating aggregate node: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 启动执行计划
  gboolean start_success = garrow_execute_plan_start(plan, &error);
  if (!start_success) {
    g_printerr("Error starting execute plan: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 等待执行计划完成
  gboolean wait_success = garrow_execute_plan_wait(plan, &error);
  if (!wait_success) {
    g_printerr("Error waiting for execute plan: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 创建 RecordBatchReader 选项
  GArrowRecordBatchReaderOptions *reader_options = garrow_record_batch_reader_options_new();
  garrow_record_batch_reader_options_set_input(agg_node, reader_options);

  // 创建 RecordBatchReader
  GArrowRecordBatchReader *reader = garrow_record_batch_reader_new(reader_options, &error);
  if (error != NULL) {
    g_printerr("Error creating record batch reader: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 读取数据批次
  GArrowRecordBatch *record_batch = NULL;
  gboolean has_next = TRUE;
  while (has_next) {
    has_next = garrow_record_batch_reader_read_next(reader, &record_batch, &error);
    if (has_next) {
      // 处理当前批次的数据
      garrow_record_batch_print(record_batch);
      g_object_unref(record_batch);  // 释放当前批次
    }
  }

  // 清理资源
  g_object_unref(reader);
  g_object_unref(reader_options);
  g_object_unref(agg_node);
  g_object_unref(agg_options);
  g_object_unref(sort_node);
  g_object_unref(sort_options);
  g_object_unref(source_node);
  g_object_unref(options);
  g_object_unref(plan);

  return 0;
}

 

0条评论
作者已关闭评论
c****0
2文章数
0粉丝数
c****0
2 文章 | 0 粉丝
c****0
2文章数
0粉丝数
c****0
2 文章 | 0 粉丝
原创

向量化执行插件Arrow功能介绍

2024-10-10 02:06:30
13
0

 Arrow重要数据结构

1)   GarrowType

GarrowTypeArrow中的数据类型,比较重要的两种数据类型为GArrowArrayGArrowTable。其中GArrowArray代表Arrow 数组,通常用于表示一维数据,如一个整型数组或一个字符串数组,在teledbx中向量化引擎中GArrowArray用来存放向量化batch中的一列数据;其中GArrowTable代表Arrow表,是二维数据,包含多列。

2)   GArrowRecordBatch

GArrowRecordBatchArrow中的一种数据类型,代表一个固定大小的行集合,每行包含多个列。在teledbx中向量化执行引擎中GArrowRecordBatch为核心数据结构之一,每个执行算子会同时对一个GArrowRecordBatch的数据进行处理,然后返回一个GArrowRecordBatch数据,与非向量化执行引擎中的TupleTableSlot相对应。

3)   GArrowSchema

GArrowSchemaArrow中的一种数据类型,用于对GArrowRecordBatch结构进行描述,包括batch中各个列的名字、类型以及其他元数据信息。在teledbx中向量化执行引擎中GArrowSchema也是对一个batch进行描述,与非向量化引擎中的TupleDesc相对应。

4)   GArrowField

GArrowFieldArrow中的一种数据类型,用户表示一个字段,在Arrow中每个字段都包含了字段的名称、数据类型以及是否为空等元信息。在teledbx向量化引擎中表中每列信息代表一个字段。

5)   GArrowExecutePlan

GArrowExecutePlan用于表示Arrow中的执行计划,包含了输入数据、输出模式以及一些列操作节点。

typedef struct _GArrowExecutePlan {
  GArrowSchema *output_schema;   // 执行计划输出的模式
  guint n_inputs;                // 输入记录批次的数量
  GArrowRecordBatch **inputs;    // 指向GArrowRecordBatch对象的指针数组,每个元素代表一个输入记录批次
  guint n_outputs;               // 输出记录批次的数量
  GArrowRecordBatch **outputs;   // 指向GArrowRecordBatch对象的指针数组,每个元素代表一个输出记录批次
  gboolean is_valid;             // 标志位,指示执行计划是否有效
  // 其他内部状态和辅助数据结构
} GArrowExecutePlan;

 Arrow重要函数接口

1)   GArrowExecutePlan *garrow_execute_plan_new(void)

创建一个执行计划对象。

2)   garrow_source_node_options_new_record_batch_reader

创建一个新的GArrowSourceNodeOptions 对象,该对象用于指定从文件或其他数据源读取数据的方式。

GArrowSourceNodeOptions *
garrow_source_node_options_new_record_batch_reader(
const gchar *format,
  const gchar *uri,
  GArrowFileSource *file_source,
  GError **error);

3)    garrow_execute_plan_build_source_node

用于在执行计划中构建一个源节点。源节点负责从指定的数据源读取数据,并将其提供给执行计划中的后续节点进行处理。
GArrowSourceNode *
garrow_execute_plan_build_source_node(GArrowExecutePlan *plan,
                                      GArrowSourceNodeOptions *options,
                                      GError **error);

4)    garrow_sort_node_options_new

用于创建一个新的排序节点选项对象 (GArrowSortNodeOptions)。这个选项对象用于指定如何对数据进行排序。
GArrowSortNodeOptions *
garrow_sort_node_options_new(GArrowExpression *key,
                             gboolean ascending,
                                  GError **error);

5)    garrow_sort_node_new

用于创建一个新的排序节点 (GArrowSortNode),该节点可以根据给定的排序选项对输入的数据进行排序。
GArrowSortNode *
garrow_sort_node_new(GArrowRecordBatch *input,
                     GArrowSortNodeOptions *options,
                     GError **error);

6)    garrow_aggregate_node_options_new

用于创建一个新的聚合节点选项对象 (GArrowAggregateNodeOptions),这个选项对象用于指定如何对数据进行聚合操作
GArrowAggregateNodeOptions *
garrow_aggregate_node_options_new(
GArrowExpression *group_key,
      GArrowExpression *aggregate_expr,
      GArrowAggregateFunction aggregate_function,
      GError **error);

7)    garrow_aggregate_node_new

用于创建一个新的聚合节点 (GArrowAggregateNode),并将聚合节点添加到执行计划中。
GArrowAggregateNode *
garrow_aggregate_node_new(GArrowRecordBatch *input,
                          GArrowAggregateNodeOptions *options,
                              GError **error);

8)    garrow_execute_plan_start

用于启动执行计划,启动执行计划意味着开始按照计划中定义的顺序执行这些步骤。
gboolean
garrow_execute_plan_start(GArrowExecutePlan *plan,
                          GError **error);

9)    garrow_execute_plan_wait

用于等待执行计划中的所有操作完成。这意味着函数会阻塞调用线程,直到所有的数据处理步骤(如读取、转换、聚合等)都完成为止。
gboolean
garrow_execute_plan_wait(GArrowExecutePlan *plan,
                         GError **error);

10)    garrow_record_batch_reader_options_new

用于创建一个新的 GArrowRecordBatchReaderOptions 对象。这个对象包含了配置 GArrowRecordBatchReader 所需的各种选项,比如输入数据的位置、格式等。
GArrowRecordBatchReaderOptions *
garrow_record_batch_reader_options_new(void);

11)    garrow_record_batch_reader_new

用于创建一个新的 GArrowRecordBatchReader 对象。这个对象可以从指定的数据源中读取 Arrow RecordBatch 数据。
GArrowRecordBatchReader *
garrow_record_batch_reader_new(GArrowRecordBatchReaderOptions *options,
                               GError **error);

12)    garrow_record_batch_reader_read_next

用于从 GArrowRecordBatchReader 中读取下一个数据批次(GArrowRecordBatch)。这个函数是迭代读取数据的关键部分,尤其适用于处理大数据集,因为它不需要一次性加载所有数据到内存中。
gboolean
garrow_record_batch_reader_read_next(GArrowRecordBatchReader *reader,
                                     GArrowRecordBatch **out_batch,
                                     GError **error);

13)    g_object_unref

用于减少一个 GObject 的引用计数。当引用计数降到零时,该对象会被销毁(即调用其析构函数,并释放其占用的内存)。这是 GObject 引用计数机制的一部分,用于自动管理内存,防止内存泄漏。

 

Arrow实现数据查询实例

Apache Arrow支持多种执行算子,这些算子用于实现各种数据处理任务,例如读取数据、转换数据、聚合数据、排序数据等。以下是Apache Arrow支持的一些主要算子。

1.    Source Nodes

算子名称

功能

File Source

从文件读取数据

Memory Source

从内存读取数据

HTTP Source

HTTP/HTTPS URL 读取数据

Database Source

从数据库读取数据

 

2.   Transform Nodes

算子名称

功能

Filter

根据条件过滤数据行

Project

选择特定列,投影操作

Aggregate

对数据进行聚合计算

Sort

对数据进行排序

Union

合并多个数据集

Join

进行表连接操作

Limit

限制输出行的数量

Distinct

去除重复行

Cast

类型转换

Fill Null

填充空值

Resample

时间序列数据的重采样

GroupBy

分组操作

3.   Sink Nodes

算子名称

功能

File Sink

将数据写入文件

Memory Sink

将数据写入内存

HTTP Sink

将数据发送到 HTTP/HTTPS URL

Database Sink

将数据写入数据库

以下示例为通过Arrow扫描data.csv文件中的数据,并对数据进行排序和聚合操作。

#include <garrow-0.2/garrow.h>

int main() {
  GError *error = NULL;

  // 创建执行计划
  GArrowExecutePlan *plan = garrow_execute_plan_new();

  // 创建源节点选项
  GArrowSourceNodeOptions *options = garrow_source_node_options_new_record_batch_reader(
    "csv",  // 数据格式
    "/path/to/data.csv",  // 数据文件的路径
    NULL,  // 使用默认的 RecordBatchReader
    &error);
  if (error != NULL) {
    g_printerr("Error creating source node options: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 构建源节点并将其添加到执行计划中
  GArrowSourceNode *source_node = garrow_execute_plan_build_source_node(
    plan,
    options,
    &error);
  if (error != NULL) {
    g_printerr("Error building source node: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 创建排序节点选项
  GArrowSortNodeOptions *sort_options = garrow_sort_node_options_new();
  GArrowField *sort_field = garrow_field_new("age");  // 假设数据中有 "age" 字段
  GArrowExpression *sort_expr = garrow_expression_new_field(sort_field);
  garrow_sort_node_options_set_sort_key(sort_options, sort_expr);
  garrow_sort_node_options_set_ascending(sort_options, TRUE);  // 升序排序

  // 构建排序节点并与源节点关联
  GArrowSortNode *sort_node = garrow_sort_node_new(
    source_node,
    sort_options,
    &error);
  if (error != NULL) {
    g_printerr("Error creating sort node: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 创建聚合节点选项
  GArrowAggregateNodeOptions *agg_options = garrow_aggregate_node_options_new();
  GArrowField *agg_field = garrow_field_new("salary");  // 假设数据中有 "salary" 字段
  GArrowExpression *agg_expr = garrow_expression_new_field(agg_field);
  GArrowAggregateFunction *agg_func = garrow_aggregate_function_new_sum(agg_expr);
  garrow_aggregate_node_options_set_aggregate_function(agg_options, agg_func);

  // 构建聚合节点并与排序节点关联
  GArrowAggregateNode *agg_node = garrow_aggregate_node_new(
    sort_node,
    agg_options,
    &error);
  if (error != NULL) {
    g_printerr("Error creating aggregate node: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 启动执行计划
  gboolean start_success = garrow_execute_plan_start(plan, &error);
  if (!start_success) {
    g_printerr("Error starting execute plan: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 等待执行计划完成
  gboolean wait_success = garrow_execute_plan_wait(plan, &error);
  if (!wait_success) {
    g_printerr("Error waiting for execute plan: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 创建 RecordBatchReader 选项
  GArrowRecordBatchReaderOptions *reader_options = garrow_record_batch_reader_options_new();
  garrow_record_batch_reader_options_set_input(agg_node, reader_options);

  // 创建 RecordBatchReader
  GArrowRecordBatchReader *reader = garrow_record_batch_reader_new(reader_options, &error);
  if (error != NULL) {
    g_printerr("Error creating record batch reader: %s\n", error->message);
    g_error_free(error);
    return 1;
  }

  // 读取数据批次
  GArrowRecordBatch *record_batch = NULL;
  gboolean has_next = TRUE;
  while (has_next) {
    has_next = garrow_record_batch_reader_read_next(reader, &record_batch, &error);
    if (has_next) {
      // 处理当前批次的数据
      garrow_record_batch_print(record_batch);
      g_object_unref(record_batch);  // 释放当前批次
    }
  }

  // 清理资源
  g_object_unref(reader);
  g_object_unref(reader_options);
  g_object_unref(agg_node);
  g_object_unref(agg_options);
  g_object_unref(sort_node);
  g_object_unref(sort_options);
  g_object_unref(source_node);
  g_object_unref(options);
  g_object_unref(plan);

  return 0;
}

 

文章来自个人专栏
simulinks
2 文章 | 1 订阅
0条评论
作者已关闭评论
作者已关闭评论
0
0