Arrow重要数据结构
1) GarrowType
GarrowType是Arrow中的数据类型,比较重要的两种数据类型为GArrowArray和GArrowTable。其中GArrowArray代表Arrow 数组,通常用于表示一维数据,如一个整型数组或一个字符串数组,在teledbx中向量化引擎中GArrowArray用来存放向量化batch中的一列数据;其中GArrowTable代表Arrow表,是二维数据,包含多列。
2) GArrowRecordBatch
GArrowRecordBatch是Arrow中的一种数据类型,代表一个固定大小的行集合,每行包含多个列。在teledbx中向量化执行引擎中GArrowRecordBatch为核心数据结构之一,每个执行算子会同时对一个GArrowRecordBatch的数据进行处理,然后返回一个GArrowRecordBatch数据,与非向量化执行引擎中的TupleTableSlot相对应。
3) GArrowSchema
GArrowSchema是Arrow中的一种数据类型,用于对GArrowRecordBatch结构进行描述,包括batch中各个列的名字、类型以及其他元数据信息。在teledbx中向量化执行引擎中GArrowSchema也是对一个batch进行描述,与非向量化引擎中的TupleDesc相对应。
4) GArrowField
GArrowField是Arrow中的一种数据类型,用户表示一个字段,在Arrow中每个字段都包含了字段的名称、数据类型以及是否为空等元信息。在teledbx向量化引擎中表中每列信息代表一个字段。
5) GArrowExecutePlan
GArrowExecutePlan用于表示Arrow中的执行计划,包含了输入数据、输出模式以及一些列操作节点。
typedef struct _GArrowExecutePlan {
GArrowSchema *output_schema; // 执行计划输出的模式
guint n_inputs; // 输入记录批次的数量
GArrowRecordBatch **inputs; // 指向GArrowRecordBatch对象的指针数组,每个元素代表一个输入记录批次
guint n_outputs; // 输出记录批次的数量
GArrowRecordBatch **outputs; // 指向GArrowRecordBatch对象的指针数组,每个元素代表一个输出记录批次
gboolean is_valid; // 标志位,指示执行计划是否有效
// 其他内部状态和辅助数据结构
} GArrowExecutePlan;
Arrow重要函数接口
1) GArrowExecutePlan *garrow_execute_plan_new(void)
创建一个执行计划对象。
2) garrow_source_node_options_new_record_batch_reader
创建一个新的GArrowSourceNodeOptions 对象,该对象用于指定从文件或其他数据源读取数据的方式。
GArrowSourceNodeOptions *
garrow_source_node_options_new_record_batch_reader(
const gchar *format,
const gchar *uri,
GArrowFileSource *file_source,
GError **error);
3) garrow_execute_plan_build_source_node
用于在执行计划中构建一个源节点。源节点负责从指定的数据源读取数据,并将其提供给执行计划中的后续节点进行处理。
GArrowSourceNode *
garrow_execute_plan_build_source_node(GArrowExecutePlan *plan,
GArrowSourceNodeOptions *options,
GError **error);
4) garrow_sort_node_options_new
用于创建一个新的排序节点选项对象 (GArrowSortNodeOptions)。这个选项对象用于指定如何对数据进行排序。
GArrowSortNodeOptions *
garrow_sort_node_options_new(GArrowExpression *key,
gboolean ascending,
GError **error);
5) garrow_sort_node_new
用于创建一个新的排序节点 (GArrowSortNode),该节点可以根据给定的排序选项对输入的数据进行排序。
GArrowSortNode *
garrow_sort_node_new(GArrowRecordBatch *input,
GArrowSortNodeOptions *options,
GError **error);
6) garrow_aggregate_node_options_new
用于创建一个新的聚合节点选项对象 (GArrowAggregateNodeOptions),这个选项对象用于指定如何对数据进行聚合操作
GArrowAggregateNodeOptions *
garrow_aggregate_node_options_new(
GArrowExpression *group_key,
GArrowExpression *aggregate_expr,
GArrowAggregateFunction aggregate_function,
GError **error);
7) garrow_aggregate_node_new
用于创建一个新的聚合节点 (GArrowAggregateNode),并将聚合节点添加到执行计划中。
GArrowAggregateNode *
garrow_aggregate_node_new(GArrowRecordBatch *input,
GArrowAggregateNodeOptions *options,
GError **error);
8) garrow_execute_plan_start
用于启动执行计划,启动执行计划意味着开始按照计划中定义的顺序执行这些步骤。
gboolean
garrow_execute_plan_start(GArrowExecutePlan *plan,
GError **error);
9) garrow_execute_plan_wait
用于等待执行计划中的所有操作完成。这意味着函数会阻塞调用线程,直到所有的数据处理步骤(如读取、转换、聚合等)都完成为止。
gboolean
garrow_execute_plan_wait(GArrowExecutePlan *plan,
GError **error);
10) garrow_record_batch_reader_options_new
用于创建一个新的 GArrowRecordBatchReaderOptions 对象。这个对象包含了配置 GArrowRecordBatchReader 所需的各种选项,比如输入数据的位置、格式等。
GArrowRecordBatchReaderOptions *
garrow_record_batch_reader_options_new(void);
11) garrow_record_batch_reader_new
用于创建一个新的 GArrowRecordBatchReader 对象。这个对象可以从指定的数据源中读取 Arrow RecordBatch 数据。
GArrowRecordBatchReader *
garrow_record_batch_reader_new(GArrowRecordBatchReaderOptions *options,
GError **error);
12) garrow_record_batch_reader_read_next
用于从 GArrowRecordBatchReader 中读取下一个数据批次(GArrowRecordBatch)。这个函数是迭代读取数据的关键部分,尤其适用于处理大数据集,因为它不需要一次性加载所有数据到内存中。
gboolean
garrow_record_batch_reader_read_next(GArrowRecordBatchReader *reader,
GArrowRecordBatch **out_batch,
GError **error);
13) g_object_unref
用于减少一个 GObject 的引用计数。当引用计数降到零时,该对象会被销毁(即调用其析构函数,并释放其占用的内存)。这是 GObject 引用计数机制的一部分,用于自动管理内存,防止内存泄漏。
Arrow实现数据查询实例
Apache Arrow支持多种执行算子,这些算子用于实现各种数据处理任务,例如读取数据、转换数据、聚合数据、排序数据等。以下是Apache Arrow支持的一些主要算子。
1. Source Nodes
算子名称 |
功能 |
File Source |
从文件读取数据 |
Memory Source |
从内存读取数据 |
HTTP Source |
从 HTTP/HTTPS URL 读取数据 |
Database Source |
从数据库读取数据 |
2. Transform Nodes
算子名称 |
功能 |
Filter |
根据条件过滤数据行 |
Project |
选择特定列,投影操作 |
Aggregate |
对数据进行聚合计算 |
Sort |
对数据进行排序 |
Union |
合并多个数据集 |
Join |
进行表连接操作 |
Limit |
限制输出行的数量 |
Distinct |
去除重复行 |
Cast |
类型转换 |
Fill Null |
填充空值 |
Resample |
时间序列数据的重采样 |
GroupBy |
分组操作 |
3. Sink Nodes
算子名称 |
功能 |
File Sink |
将数据写入文件 |
Memory Sink |
将数据写入内存 |
HTTP Sink |
将数据发送到 HTTP/HTTPS URL |
Database Sink |
将数据写入数据库 |
以下示例为通过Arrow扫描data.csv文件中的数据,并对数据进行排序和聚合操作。
#include <garrow-0.2/garrow.h>
int main() {
GError *error = NULL;
// 创建执行计划
GArrowExecutePlan *plan = garrow_execute_plan_new();
// 创建源节点选项
GArrowSourceNodeOptions *options = garrow_source_node_options_new_record_batch_reader(
"csv", // 数据格式
"/path/to/data.csv", // 数据文件的路径
NULL, // 使用默认的 RecordBatchReader
&error);
if (error != NULL) {
g_printerr("Error creating source node options: %s\n", error->message);
g_error_free(error);
return 1;
}
// 构建源节点并将其添加到执行计划中
GArrowSourceNode *source_node = garrow_execute_plan_build_source_node(
plan,
options,
&error);
if (error != NULL) {
g_printerr("Error building source node: %s\n", error->message);
g_error_free(error);
return 1;
}
// 创建排序节点选项
GArrowSortNodeOptions *sort_options = garrow_sort_node_options_new();
GArrowField *sort_field = garrow_field_new("age"); // 假设数据中有 "age" 字段
GArrowExpression *sort_expr = garrow_expression_new_field(sort_field);
garrow_sort_node_options_set_sort_key(sort_options, sort_expr);
garrow_sort_node_options_set_ascending(sort_options, TRUE); // 升序排序
// 构建排序节点并与源节点关联
GArrowSortNode *sort_node = garrow_sort_node_new(
source_node,
sort_options,
&error);
if (error != NULL) {
g_printerr("Error creating sort node: %s\n", error->message);
g_error_free(error);
return 1;
}
// 创建聚合节点选项
GArrowAggregateNodeOptions *agg_options = garrow_aggregate_node_options_new();
GArrowField *agg_field = garrow_field_new("salary"); // 假设数据中有 "salary" 字段
GArrowExpression *agg_expr = garrow_expression_new_field(agg_field);
GArrowAggregateFunction *agg_func = garrow_aggregate_function_new_sum(agg_expr);
garrow_aggregate_node_options_set_aggregate_function(agg_options, agg_func);
// 构建聚合节点并与排序节点关联
GArrowAggregateNode *agg_node = garrow_aggregate_node_new(
sort_node,
agg_options,
&error);
if (error != NULL) {
g_printerr("Error creating aggregate node: %s\n", error->message);
g_error_free(error);
return 1;
}
// 启动执行计划
gboolean start_success = garrow_execute_plan_start(plan, &error);
if (!start_success) {
g_printerr("Error starting execute plan: %s\n", error->message);
g_error_free(error);
return 1;
}
// 等待执行计划完成
gboolean wait_success = garrow_execute_plan_wait(plan, &error);
if (!wait_success) {
g_printerr("Error waiting for execute plan: %s\n", error->message);
g_error_free(error);
return 1;
}
// 创建 RecordBatchReader 选项
GArrowRecordBatchReaderOptions *reader_options = garrow_record_batch_reader_options_new();
garrow_record_batch_reader_options_set_input(agg_node, reader_options);
// 创建 RecordBatchReader
GArrowRecordBatchReader *reader = garrow_record_batch_reader_new(reader_options, &error);
if (error != NULL) {
g_printerr("Error creating record batch reader: %s\n", error->message);
g_error_free(error);
return 1;
}
// 读取数据批次
GArrowRecordBatch *record_batch = NULL;
gboolean has_next = TRUE;
while (has_next) {
has_next = garrow_record_batch_reader_read_next(reader, &record_batch, &error);
if (has_next) {
// 处理当前批次的数据
garrow_record_batch_print(record_batch);
g_object_unref(record_batch); // 释放当前批次
}
}
// 清理资源
g_object_unref(reader);
g_object_unref(reader_options);
g_object_unref(agg_node);
g_object_unref(agg_options);
g_object_unref(sort_node);
g_object_unref(sort_options);
g_object_unref(source_node);
g_object_unref(options);
g_object_unref(plan);
return 0;
}