01 数据流格式
在 Apache Arrow 的进程间通信(Inter-Process Communication)中提供了 Streaming 和 RandomAccessFile 两种二进制格式,用于读取和写入 RecordBatch 数据
- Streaming Format:这种格式用于发送任意长度的 RecordBatch 序列,但它必须从开始到结束顺序处理,不支持随机访问,所以 Streaming Format 适用于数据流或网络传输等应用场景
- RandomAccessFile Format:这种格式用于序列化固定数量的 RecordBatch,由于该格式包含有关 RecordBatch 数量和位置的元数据,所以支持随机访问,因此在与内存映射一起使用时非常有用
Arrow IPC Stream 通常是按照 Schema 后面跟一个或多个 RecordBatch 的方式组织;如下图所示 Stream Format 中使用三种消息类型来传输数据:Schema, RecordBatch, DictionaryBatch
,如果 Schema 中包含使用字段编码(dictionary-encoded)的数组,那么 Stream 中还会有 DictionaryBatch 格式的消息
Arrow IPC 传输数据时封装的消息格式 Stream Format 如下图所示,(1)消息开头是 8 个字节 0xFFFFFFFF
表示有效消息的开始;(2)接着是一个表示 Schema 元数据信息长度的 32-bit 整数,该整数使用小端字节序编码(Little Endian Int);(3)然后是实际的 Schema 元数据;(4)接着是为了确保数据结构的对齐实现跨平台特性,可能包含填充字节使得整个消息的总长度是 8 字节的倍数;(5)最后就是包含要传输数据 RecordBatchs 的消息体
RandomAccessFile Format 的文件格式只是 Streaming Format 的一个扩展,如下图所示仅比 Streaming Format 多了表示文件开头结尾的 Magic String
和包含元数据信息的Footer
,具体来说包括:
- **Magic String (ARROW1)**:表示文件开头,其中
ARROW1
是一个固定的字节序列在源码cpp\src\arrow\ipc\metadata_internal.h
中被定义为kArrowMagicBytes
,用于快速检查文件是否是 Arrow 格式 - Empty padding to 8-byte boundary:为了确保文件结构的对齐,可能包含空的填充字节,使得从
Magic String
开始到实际数据部分的开始是 8 字节边界对齐的 - Data Using the Streaming Format with EOS indicator:文件的主体,包含了使用
Streaming Format
序列化的数据;数据最后面可能跟着一个End Of Stream(EOS)
指示器,表示没有更多的数据 - FlatBuffer Footer Message Bytes:这部分是 FlatBuffer 形式的 Footer 消息,包含了
Streaming Format
中 Schema 的拷贝和文件中每个数据块在内存中的偏移量(Offset)和长度,这些元数据信息是RandomAccessFile Format
文件格式实现随机访问的关键 - Footer Size:一个 32 位的整数采用小端字节序编码,表示 Footer 消息的字节长度
- **Magic String (ARROW1)**:文件结尾标识,与文件开始处相同,用于标识文件的结束
02 元数据序列化
Arrow IPC Format 中可以看到消息格式中的元数据信息都是使用 FlatBuffers 来进行序列化的,为什么这么做呢?
对于进行间通信而言,消息在不同进程之间传递需要进行序列化和反序列化(serialization/deserialization),数据序列化和反序列化的速度越快,数据发送的速度就越快;所以,对于 IPC 过程来说,序列化和反序列化的开销十分重要
为了优化序列化和反序列化的开销,Apache Arrow IPC Format 采用了高性能序列化方式 FlatBuffers 来对元数据(metadata)进行序列化,对于消息体直接使用数据缓冲区(raw data buffers)传输数据而不需要进行序列化操作(那么对于跨节点的操作,无法直接使用内存的情况下是怎么传输的呢?)
在介绍物理布局时(article url)已经详细介绍过了 raw data buffers 的传输格式,下面详细介绍一下元数据的序列化方式 FlatBuffers
2.1 FlatBuffers 数据结构定义
首先简单介绍一下 FlatBuffers,它是一个由 Google 开发的跨平台序列化库,允许直接访问序列化的数据结构,无需解析成中间格式;支持零拷贝反序列化,即在反序列化过程中无需复制数据到内存的其他部分,从而避免了额外的内存分配和临时对象的创建;基于诸多高性能特性,访问 FlatBuffers 序列化的数据比访问 JSON、CSV 和Protocol Buffers 等需要解析和复制步骤的格式要高效得多
FlatBuffers 通过 Schema 定义语言来描述要序列化的数据结构,其语法和结构与 C 语言类似是一种类型安全的 IDL(接口描述语言)。下面结合 Apache Arrow 中定义 Schema 的 format\Schema.fbs
实例来说明其基本的语法:
- namespace:定义命名空间,用于组织和区分不同的数据结构
namespace org.apache.arrow.flatbuf;
- enum:定义枚举类型,用于限定变量的取值范围
enum Endianness:short { Little, Big }
- struct:定义结构体,类似于 C 语言中的
struct
,用于创建复杂的数据类型
struct Buffer {
offset: long;
length: long;
}
- table:定义表,类似于数据库中的表,可以包含多个字段,并且字段的值可以是不同的类型
table Schema {
endianness: Endianness=Little;
fields: [Field];
// User-defined metadata
custom_metadata: [ KeyValue ];
/// Features used in the stream/file.
features : [ Feature ];
}
- root_type:指定序列化文件的根类型,即文件中最顶层的数据类型
root_type Schema;
- file_identifier:为文件定义一个唯一的标识符,用于文件格式的验证。
file_identifier:"SCHEMA";
- attribute:定义属性,可以用于表或结构体,提供额外的元数据
table Person {
id:int;
[deprecated] name:string; // 表示 name 字段已经废弃
}
- array:定义固定长度的数组
array Vec3[3] = [ v1, v2, v3 ];
- vector:定义动态数组,用于存储一系列相同类型的元素
vector inventory:[ubyte];
- union:定义联合体,类似于 C 语言的联合体,可以存储多种不同但兼容的类型
union Type {
Null,
Int,
FloatingPoint,
Binary,
Utf8,
Bool,
Decimal,
Date,
Time,
Timestamp,
Interval,
List,
Struct_,
Union,
FixedSizeBinary,
FixedSizeList,
Map,
Duration,
LargeBinary,
LargeUtf8,
LargeList,
RunEndEncoded,
BinaryView,
Utf8View,
ListView,
LargeListView,
}
- string:定义字符串类型
table KeyValue {
key: string;
value: string;
}
通过这些关键字,可以定义复杂的数据结构,并使用 FlatBuffers 工具生成相应的序列化和反序列化代码;一个使用上述关键字定义的 Arrow 中 Schema 的部分数据结构示例如下:
// 定义命名空间
namespace org.apache.arrow.flatbuf;
// 定义枚举类型
enum Endianness:short { Little, Big }
// 定义结构体
struct Buffer {
/// The relative offset into the shared memory page where the bytes for this
/// buffer starts
offset: long;
/// The absolute length (in bytes) of the memory buffer. The memory is found
/// from offset (inclusive) to offset + length (non-inclusive). When building
/// messages using the encapsulated IPC message, padding bytes may be written
/// after a buffer, but such padding bytes do not need to be accounted for in
/// the size here.
length: long;
}
// 定义表,包含字节序类型、字段、额外元数据等
table Schema {
/// endianness of the buffer
/// it is Little Endian by default
/// if endianness doesn't match the underlying system then the vectors need to be converted
endianness: Endianness=Little;
fields: [Field];
// User-defined metadata
custom_metadata: [ KeyValue ];
/// Features used in the stream/file.
features : [ Feature ];
}
// 指定文件的根类型
root_type Schema;
2.2 FlatBuffers 序列化
FlatBuffers 序列化的主要逻辑是将对象数据存储于一个连续的一维字节数组 ByteBuffer 中;在该数组中,每个对象被划分为两个关键部分,元数据部分:包含必要的索引信息;真实数据部分:存储具体的数据值
与大多数内存中的数据结构不同的是,FlatBuffers 遵循严格的内存对齐规则和字节顺序序规范,确保了序列化数据的跨平台兼容性。此外,对于 table 类型的对象,FlatBuffers 还提供了前向和后向兼容性,以及对 optional
字段的支持,从而适应数据格式的演变
除了解析效率以外,二进制格式还带来了另一个优势,数据的二进制表示通常更具有效率;可以使用 4 字节的 uint
而不是 10 个字符来存储 10 位数字的整数
FlatBuffers 序列化基本使用原则:
- 小端模式:FlatBuffers 对各种基本数据的存储都是按照小端模式来进行的,因为这种模式目前和大部分处理器的存储模式是一致的,可以加快数据读写的数据。
- 写入数据方向和读取数据方向不同
FlatBuffers 向 ByteBuffer 中写入数据的顺序是从 ByteBuffer 的尾部向头部填充,由于这种增长方向和 ByteBuffer 默认的增长方向不同,因此 FlatBuffers 在向 ByteBuffer 中写入数据的时候就不能依赖 ByteBuffer 的 position 来标记有效数据位置,而是自己维护了一个 space 变量来指明有效数据的位置,在分析 FlatBuffersBuilder 的时候要特别注意这个变量的增长特点
但是,和数据的写入方向不同的是,FlatBuffers 从 ByteBuffer 中解析数据的时候又是按照 ByteBuffer 正常的顺序来进行的。FlatBuffers 这样组织数据存储的好处是,在从左到右解析数据的时候,能够保证最先读取到的就是整个 ByteBuffer 的概要信息(例如 Table 类型的 vtable 字段),方便解析。
2.3 FlatBuffers 反序列化
基于序列化阶段的严格处理,FlatBuffers 的反序列化过程就很简单了;由于在序列化阶段每个字段的偏移量 offset 已被精确记录,因此反序列化本质上是从 ByteBuffer 中的指定偏移量处提取数据
反序列化过程从 root table 开始,沿着二进制流逐步向后读取:
首先,从包含字段偏移量信息的 vtable 中检索所需的偏移量 offset
然后,根据对应的 offset 在相应的 object 中定位到具体的字段,如果字段是引用类型(string、vector、table 等),则读取出 offset 并解析这些引用的 offset,进一步检索它们指向的实际偏移量;对于非引用类型,则直接根据 vtable 中的 offset 定位到数据直接读取
对于标量字段需要区分默认值和非默认值两种情况:对于默认值字段,反序列化时会直接采用由 flatc
编译器后生成的文件中记录的默认值读取出来;而对于非默认值字段,二进制流中就会记录该字段的 offset,值也会存储在二进制流中,反序列化时直接根据 offset 读取字段值即可
整个反序列化过程是零拷贝(zero-copy)的,这意味着它不会消耗额外的内存资源,也不会创建数据的副本。并且 FlatBuffers 可以读取任意字段,而不是像 Json 和 protocol buffer 需要读取整个对象以后才能获取某个字段。FlatBuffers 的主要优势就在反序列化这里了。所以 FlatBuffers 可以做到解码速度极快,或者说无需解码直接读取。
03 IPC example
下面通过一个简单的 IPC 实例来加深对 Arrow 中二进制数据流传输的理解,该例子没有真正意义上的多进程之间通信,而是简单模拟了向 Arrow IPC Stream 中写入数据,然后从中读取数据并打印,具体过程包括:
- 根据 JSON 字符串创建一个 RecordBatch
- 将数据写入到 Arrow IPC Stream 中
- 从 Arrow IPC Stream 读取该数据并打印
- 引用必要的库文件
引用与 Arrow IPC Stream 操作相关的 arrow 头文件,以及 json 数据处理等库文件
#include <arrow/api.h>
#include <arrow/ipc/writer.h>
#include <arrow/json/reader.h>
#include <arrow/memory_pool.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <iostream>
#include <string>
- 根据 JSON 字符串创建一个 RecordBatch
在介绍 RecordBatch 时提到其由元数据 Schema 和具体数据 Arrays 组成,其中 Schema 又由表示 Arrays 名称和类型的 Fields 构成,下面创建 RecordBatch 也按这样的组织方式进行构建
首先,使用arrow::field
函数创建一个字段,接受字段名和字段类型作为参数,并将将创建的字段添加到字段类型列表 fields
中
然后,使用字段列表 fields 创建一个 Arrow 模式(Schema)对象 schema
接着,使用 Arrow JSON 读取器解析 JSON 数据,并根据 JSON 数据创建一个 Arrow 数组 array
最后,准备好了构建 RecordBatch 的所有组成部分,使用schema
和array
创建一个 RecordBatch
std::shared_ptr<arrow::MemoryPool> mem = arrow::default_memory_pool();
arrow::DataTypeVector fields;
auto field = arrow::field("list", arrow::list(arrow::binary()));
fields.push_back(field);
auto schema = std::make_shared<arrow::Schema>(fields);
// Create a JSON reader from a string of data
auto input_data = R"([
["index1"],
["index3", "tag_int"], ["index5", "tag_int"],
["index6", "tag_int"], ["index7", "tag_int"],
["index7", "tag_int"],
["index8"]
])";
auto reader = arrow::json::Reader::Make(arrow::default_memory_pool(), schema, input_data);
// Read the data into an Array
std::shared_ptr<arrow::Array> array;
ARROW_ASSIGN_OR_RAISE(array, reader->Read());
// Create a RecordBatch from the Array
auto record = arrow::RecordBatch::Make(schema, array->length(), {array});
- 将数据写入到 Arrow IPC Stream 中
使用 arrow::ipc::MakeStreamWriter
创建的 IPC 流写入器,将使用 Slice 从 RecordBatch 提取出的部分数据序列化并写入到流中
// Create a slice of the RecordBatch
auto slice = record->Slice(1, 2);
// Write the sliced RecordBatch to an IPC stream
ARROW_ASSIGN_OR_RAISE(auto output, arrow::ipc::MakeStreamWriter(arrow::default_memory_pool(), &slice->schema()));
output->WriteRecordBatch(*slice);
- 从 Arrow IPC Stream 读取该数据并打印
创建一个的 arrow::ipc::RecordBatchStreamReader
对象 reader2,该读取器从 output 对象的序列化输出中创建的,output->Finish()
方法返回一个包含序列化数据的 Buffer;然后从 reader2 中读取数据并打印输出
// Read the IPC stream back into a RecordBatch
std::shared_ptr<arrow::RecordBatchReader> reader2;
ARROW_ASSIGN_OR_RAISE(reader2, arrow::ipc::RecordBatchStreamReader::Open(std::make_shared<arrow::io::BufferReader>(output->Finish())));
// Print the contents of the RecordBatch
std::shared_ptr<arrow::RecordBatch> readbatch;
ARROW_ASSIGN_OR_RAISE(readbatch, reader2->ReadRecordBatch());
std::cout << readbatch->ToString() << std::endl;
上述实例的完整代码实现如下:
#include <arrow/api.h>
#include <arrow/ipc/writer.h>
#include <arrow/json/reader.h>
#include <arrow/memory_pool.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <iostream>
#include <string>
int main() {
std::shared_ptr<arrow::MemoryPool> mem = arrow::default_memory_pool();
arrow::DataTypeVector fields;
auto field = arrow::field("list", arrow::list(arrow::binary()));
fields.push_back(field);
auto schema = std::make_shared<arrow::Schema>(fields);
// Create a JSON reader from a string of data
auto input_data = R"([
["index1"],
["index3", "tag_int"], ["index5", "tag_int"],
["index6", "tag_int"], ["index7", "tag_int"],
["index7", "tag_int"],
["index8"]
])";
auto reader = arrow::json::Reader::Make(arrow::default_memory_pool(), schema, input_data);
// Read the data into an Array
std::shared_ptr<arrow::Array> array;
ARROW_ASSIGN_OR_RAISE(array, reader->Read());
// Create a RecordBatch from the Array
auto record = arrow::RecordBatch::Make(schema, array->length(), {array});
// Create a slice of the RecordBatch
auto slice = record->Slice(1, 2);
// Write the sliced RecordBatch to an IPC stream
ARROW_ASSIGN_OR_RAISE(auto output, arrow::ipc::MakeStreamWriter(arrow::default_memory_pool(), &slice->schema()));
output->WriteRecordBatch(*slice);
// Read the IPC stream back into a RecordBatch
std::shared_ptr<arrow::RecordBatchReader> reader2;
ARROW_ASSIGN_OR_RAISE(reader2, arrow::ipc::RecordBatchStreamReader::Open(std::make_shared<arrow::io::BufferReader>(output->Finish())));
// Print the contents of the RecordBatch
std::cout << slice->ToString() << std::endl;
return 0;
}