searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

Apache Arrow 进程间通信消息格式

2024-08-29 09:42:17
40
0

01 数据流格式

在 Apache Arrow 的进程间通信(Inter-Process Communication)中提供了 Streaming 和 RandomAccessFile 两种二进制格式,用于读取和写入 RecordBatch 数据

  • Streaming Format:这种格式用于发送任意长度的 RecordBatch 序列,但它必须从开始到结束顺序处理,不支持随机访问,所以 Streaming Format 适用于数据流或网络传输等应用场景
  • RandomAccessFile Format:这种格式用于序列化固定数量的 RecordBatch,由于该格式包含有关 RecordBatch 数量和位置的元数据,所以​支持随机访问​,因此在与内存映射一起使用时非常有用

Arrow IPC Stream 通常是按照 Schema 后面跟一个或多个 RecordBatch 的方式组织;如下图所示 Stream Format 中使用三种消息类型来传输数据:Schema, RecordBatch, DictionaryBatch,如果 Schema 中包含使用字段编码(dictionary-encoded)的数组,那么 Stream 中还会有 DictionaryBatch 格式的消息

image.png

Arrow IPC 传输数据时封装的消息格式 Stream Format 如下图所示,(1)消息开头是 8 个字节 0xFFFFFFFF表示有效消息的开始;(2)接着是一个表示 Schema 元数据信息长度的 32-bit 整数,该整数使用小端字节序编码(Little Endian Int);(3)然后是实际的 Schema 元数据;(4)接着是为了确保数据结构的对齐实现跨平台特性,可能包含填充字节使得整个消息的总长度是 8 字节的倍数;(5)最后就是包含要传输数据 RecordBatchs 的消息体

image.png

RandomAccessFile Format 的文件格式只是 Streaming Format 的一个扩展,如下图所示仅比 Streaming Format 多了表示文件开头结尾的 Magic String 和包含元数据信息的Footer,具体来说包括:

  1. ​​**Magic String (ARROW1​)**​:表示文件开头,其中 ARROW1是一个固定的字节序列在源码cpp\src\arrow\ipc\metadata_internal.h中被定义为kArrowMagicBytes,用于快速检查文件是否是 Arrow 格式
  2. Empty padding to 8-byte boundary​:为了确保文件结构的对齐,可能包含空的填充字节,使得从Magic String开始到实际数据部分的开始是 8 字节边界对齐的
  3. Data Using the Streaming Format with EOS indicator​:文件的主体,包含了使用 Streaming Format序列化的数据;数据最后面可能跟着一个 End Of Stream(EOS)指示器,表示没有更多的数据
  4. FlatBuffer Footer Message Bytes​:这部分是 FlatBuffer 形式的 Footer 消息,包含了Streaming Format中 Schema 的拷贝和文件中每个数据块在内存中的偏移量(Offset)和长度,这些元数据信息是 RandomAccessFile Format 文件格式实现随机访问的关键
  5. Footer Size​:一个 32 位的整数采用小端字节序编码,表示 Footer 消息的字节长度
  6. ​​**Magic String (ARROW1)**​:文件结尾标识,与文件开始处相同,用于标识文件的结束

image.png

02 元数据序列化

Arrow IPC Format 中可以看到消息格式中的元数据信息都是使用 FlatBuffers 来进行序列化的,为什么这么做呢?

对于进行间通信而言,消息在不同进程之间传递需要进行序列化和反序列化(serialization/deserialization),数据序列化和反序列化的速度越快,数据发送的速度就越快;所以,对于 IPC 过程来说,序列化和反序列化的开销十分重要

为了优化序列化和反序列化的开销,Apache Arrow IPC Format 采用了高性能序列化方式 FlatBuffers 来对元数据(metadata)进行序列化,对于消息体直接使用数据缓冲区(raw data buffers)传输数据而不需要进行序列化操作(那么对于跨节点的操作,无法直接使用内存的情况下是怎么传输的呢?)

在介绍物理布局时(article url)已经详细介绍过了 raw data buffers 的传输格式,下面详细介绍一下元数据的序列化方式 FlatBuffers

2.1 FlatBuffers 数据结构定义

首先简单介绍一下 FlatBuffers,它是一个由 Google 开发的跨平台序列化库,允许直接访问序列化的数据结构,无需解析成中间格式;支持​零拷贝反序列化​,即在反序列化过程中无需复制数据到内存的其他部分,从而避免了额外的内存分配和临时对象的创建;基于诸多高性能特性,访问 FlatBuffers 序列化的数据比访问 JSON、CSV 和Protocol Buffers 等需要解析和复制步骤的格式要高效得多

FlatBuffers 通过 Schema 定义语言来描述要序列化的数据结构,其语法和结构与 C 语言类似是一种类型安全的 IDL(接口描述语言)。下面结合 Apache Arrow 中定义 Schema 的 format\Schema.fbs实例来说明其基本的语法:

  1. namespace​:定义命名空间,用于组织和区分不同的数据结构
namespace org.apache.arrow.flatbuf;
  1. enum​:定义枚举类型,用于限定变量的取值范围
enum Endianness:short { Little, Big }
  1. struct​:定义结构体,类似于 C 语言中的struct,用于创建复杂的数据类型
struct Buffer {
  offset: long;
  length: long;
}
  1. table​:定义表,类似于数据库中的表,可以包含多个字段,并且字段的值可以是不同的类型
table Schema {
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];

  /// Features used in the stream/file.
  features : [ Feature ];
}
  1. root_type​:指定序列化文件的根类型,即文件中最顶层的数据类型
root_type Schema;
  1. file_identifier​:为文件定义一个唯一的标识符,用于文件格式的验证。
file_identifier:"SCHEMA";
  1. attribute​:定义属性,可以用于表或结构体,提供额外的元数据
table Person {
  id:int;
  [deprecated] name:string; // 表示 name 字段已经废弃
}
  1. array​:定义固定长度的数组
array Vec3[3] = [ v1, v2, v3 ];
  1. vector​:定义动态数组,用于存储一系列相同类型的元素
vector inventory:[ubyte];
  1. union​:定义联合体,类似于 C 语言的联合体,可以存储多种不同但兼容的类型
union Type {
  Null,
  Int,
  FloatingPoint,
  Binary,
  Utf8,
  Bool,
  Decimal,
  Date,
  Time,
  Timestamp,
  Interval,
  List,
  Struct_,
  Union,
  FixedSizeBinary,
  FixedSizeList,
  Map,
  Duration,
  LargeBinary,
  LargeUtf8,
  LargeList,
  RunEndEncoded,
  BinaryView,
  Utf8View,
  ListView,
  LargeListView,
}
  1. string​:定义字符串类型
table KeyValue {
  key: string;
  value: string;
}

通过这些关键字,可以定义复杂的数据结构,并使用 FlatBuffers 工具生成相应的序列化和反序列化代码;一个使用上述关键字定义的 Arrow 中 Schema 的部分数据结构示例如下:

// 定义命名空间
namespace org.apache.arrow.flatbuf;

// 定义枚举类型
enum Endianness:short { Little, Big }

// 定义结构体
struct Buffer {
  /// The relative offset into the shared memory page where the bytes for this
  /// buffer starts
  offset: long;

  /// The absolute length (in bytes) of the memory buffer. The memory is found
  /// from offset (inclusive) to offset + length (non-inclusive). When building
  /// messages using the encapsulated IPC message, padding bytes may be written
  /// after a buffer, but such padding bytes do not need to be accounted for in
  /// the size here.
  length: long;
}

// 定义表,包含字节序类型、字段、额外元数据等
table Schema {

  /// endianness of the buffer
  /// it is Little Endian by default
  /// if endianness doesn't match the underlying system then the vectors need to be converted
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];

  /// Features used in the stream/file.
  features : [ Feature ];
}

// 指定文件的根类型
root_type Schema;

2.2 FlatBuffers 序列化

FlatBuffers 序列化的主要逻辑是将对象数据存储于一个连续的一维字节数组 ByteBuffer 中;在该数组中,每个对象被划分为两个关键部分,元数据部分:包含必要的索引信息;真实数据部分:存储具体的数据值

与大多数内存中的数据结构不同的是,FlatBuffers 遵循严格的内存对齐规则和字节顺序序规范,确保了序列化数据的跨平台兼容性。此外,对于 table 类型的对象,FlatBuffers 还提供了前向和后向兼容性,以及对 optional字段的支持,从而适应数据格式的演变

除了解析效率以外,二进制格式还带来了另一个优势,数据的二进制表示通常更具有效率;可以使用 4 字节的 uint 而不是 10 个字符来存储 10 位数字的整数

FlatBuffers 序列化基本使用原则:

  • 小端模式:FlatBuffers 对各种基本数据的存储都是按照小端模式来进行的,因为这种模式目前和大部分处理器的存储模式是一致的,可以加快数据读写的数据。
  • 写入数据方向和读取数据方向不同

image.png

FlatBuffers 向 ByteBuffer 中写入数据的顺序是从 ByteBuffer 的尾部向头部填充,由于这种增长方向和 ByteBuffer 默认的增长方向不同,因此 FlatBuffers 在向 ByteBuffer 中写入数据的时候就不能依赖 ByteBuffer 的 position 来标记有效数据位置,而是自己维护了一个 space 变量来指明有效数据的位置,在分析 FlatBuffersBuilder 的时候要特别注意这个变量的增长特点

但是,和数据的写入方向不同的是,FlatBuffers 从 ByteBuffer 中解析数据的时候又是按照 ByteBuffer 正常的顺序来进行的。FlatBuffers 这样组织数据存储的好处是,在从左到右解析数据的时候,能够保证最先读取到的就是整个 ByteBuffer 的概要信息(例如 Table 类型的 vtable 字段),方便解析。

2.3 FlatBuffers 反序列化

基于序列化阶段的严格处理,FlatBuffers 的反序列化过程就很简单了;由于在序列化阶段每个字段的偏移量 offset 已被精确记录,因此反序列化本质上是从 ByteBuffer 中的指定偏移量处提取数据

反序列化过程从 root table 开始,沿着二进制流逐步向后读取:

首先,从包含字段偏移量信息的 vtable 中检索所需的偏移量 offset

然后,根据对应的 offset 在相应的 object 中定位到具体的字段,如果字段是引用类型(string、vector、table 等),则读取出 offset 并解析这些引用的 offset,进一步检索它们指向的实际偏移量;对于非引用类型,则直接根据 vtable 中的 offset 定位到数据直接读取

对于标量字段需要区分默认值和非默认值两种情况:对于默认值字段,反序列化时会直接采用由 flatc编译器后生成的文件中记录的默认值读取出来;而对于非默认值字段,二进制流中就会记录该字段的 offset,值也会存储在二进制流中,反序列化时直接根据 offset 读取字段值即可

整个反序列化过程是零拷贝(zero-copy)的,这意味着它不会消耗额外的内存资源,也不会创建数据的副本。并且 FlatBuffers 可以读取任意字段,而不是像 Json 和 protocol buffer 需要读取整个对象以后才能获取某个字段。FlatBuffers 的主要优势就在反序列化这里了。所以 FlatBuffers 可以做到解码速度极快,或者说无需解码直接读取。

03 IPC example

下面通过一个简单的 IPC 实例来加深对 Arrow 中二进制数据流传输的理解,该例子没有真正意义上的多进程之间通信,而是简单模拟了向 Arrow IPC Stream 中写入数据,然后从中读取数据并打印,具体过程包括:

  • 根据 JSON 字符串创建一个 RecordBatch
  • 将数据写入到 Arrow IPC Stream 中
  • 从 Arrow IPC Stream 读取该数据并打印
  1. 引用必要的库文件

引用与 Arrow IPC Stream 操作相关的 arrow 头文件,以及 json 数据处理等库文件

#include <arrow/api.h>
#include <arrow/ipc/writer.h>
#include <arrow/json/reader.h>
#include <arrow/memory_pool.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <iostream>
#include <string>
  1. 根据 JSON 字符串创建一个 RecordBatch

在介绍 RecordBatch 时提到其由元数据 Schema 和具体数据 Arrays 组成,其中 Schema 又由表示 Arrays 名称和类型的 Fields 构成,下面创建 RecordBatch 也按这样的组织方式进行构建

首先,使用arrow::field函数创建一个字段,接受字段名和字段类型作为参数,并将将创建的字段添加到字段类型列表 fields

然后,使用字段列表 fields 创建一个 Arrow 模式(Schema)对象 schema

接着,使用 Arrow JSON 读取器解析 JSON 数据,并根据 JSON 数据创建一个 Arrow 数组 array

最后,准备好了构建 RecordBatch 的所有组成部分,使用schemaarray创建一个 RecordBatch

std::shared_ptr<arrow::MemoryPool> mem = arrow::default_memory_pool();
    arrow::DataTypeVector fields;
    auto field = arrow::field("list", arrow::list(arrow::binary()));
    fields.push_back(field);
    auto schema = std::make_shared<arrow::Schema>(fields);

    // Create a JSON reader from a string of data
    auto input_data = R"([
        ["index1"], 
        ["index3", "tag_int"], ["index5", "tag_int"],
        ["index6", "tag_int"], ["index7", "tag_int"], 
        ["index7", "tag_int"],
        ["index8"]
    ])";
    auto reader = arrow::json::Reader::Make(arrow::default_memory_pool(), schema, input_data);

    // Read the data into an Array
    std::shared_ptr<arrow::Array> array;
    ARROW_ASSIGN_OR_RAISE(array, reader->Read());

    // Create a RecordBatch from the Array
    auto record = arrow::RecordBatch::Make(schema, array->length(), {array});
  1. 将数据写入到 Arrow IPC Stream 中

使用 arrow::ipc::MakeStreamWriter 创建的 IPC 流写入器,将使用 Slice 从 RecordBatch 提取出的部分数据序列化并写入到流中

// Create a slice of the RecordBatch
    auto slice = record->Slice(1, 2);

    // Write the sliced RecordBatch to an IPC stream
    ARROW_ASSIGN_OR_RAISE(auto output, arrow::ipc::MakeStreamWriter(arrow::default_memory_pool(), &slice->schema()));
    output->WriteRecordBatch(*slice);
  1. 从 Arrow IPC Stream 读取该数据并打印

创建一个的 arrow::ipc::RecordBatchStreamReader对象 reader2,该读取器从 output 对象的序列化输出中创建的,output->Finish()方法返回一个包含序列化数据的 Buffer;然后从 reader2 中读取数据并打印输出

// Read the IPC stream back into a RecordBatch
    std::shared_ptr<arrow::RecordBatchReader> reader2;
    ARROW_ASSIGN_OR_RAISE(reader2, arrow::ipc::RecordBatchStreamReader::Open(std::make_shared<arrow::io::BufferReader>(output->Finish())));

    // Print the contents of the RecordBatch
    std::shared_ptr<arrow::RecordBatch> readbatch;
    ARROW_ASSIGN_OR_RAISE(readbatch, reader2->ReadRecordBatch());
    std::cout << readbatch->ToString() << std::endl;

上述实例的完整代码实现如下:

#include <arrow/api.h>
#include <arrow/ipc/writer.h>
#include <arrow/json/reader.h>
#include <arrow/memory_pool.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <iostream>
#include <string>

int main() {
    std::shared_ptr<arrow::MemoryPool> mem = arrow::default_memory_pool();
    arrow::DataTypeVector fields;
    auto field = arrow::field("list", arrow::list(arrow::binary()));
    fields.push_back(field);
    auto schema = std::make_shared<arrow::Schema>(fields);

    // Create a JSON reader from a string of data
    auto input_data = R"([
        ["index1"], 
        ["index3", "tag_int"], ["index5", "tag_int"],
        ["index6", "tag_int"], ["index7", "tag_int"], 
        ["index7", "tag_int"],
        ["index8"]
    ])";
    auto reader = arrow::json::Reader::Make(arrow::default_memory_pool(), schema, input_data);

    // Read the data into an Array
    std::shared_ptr<arrow::Array> array;
    ARROW_ASSIGN_OR_RAISE(array, reader->Read());

    // Create a RecordBatch from the Array
    auto record = arrow::RecordBatch::Make(schema, array->length(), {array});

    // Create a slice of the RecordBatch
    auto slice = record->Slice(1, 2);

    // Write the sliced RecordBatch to an IPC stream
    ARROW_ASSIGN_OR_RAISE(auto output, arrow::ipc::MakeStreamWriter(arrow::default_memory_pool(), &slice->schema()));
    output->WriteRecordBatch(*slice);

    // Read the IPC stream back into a RecordBatch
    std::shared_ptr<arrow::RecordBatchReader> reader2;
    ARROW_ASSIGN_OR_RAISE(reader2, arrow::ipc::RecordBatchStreamReader::Open(std::make_shared<arrow::io::BufferReader>(output->Finish())));

    // Print the contents of the RecordBatch
    std::cout << slice->ToString() << std::endl;

    return 0;
}
0条评论
0 / 1000
王清欢
3文章数
0粉丝数
王清欢
3 文章 | 0 粉丝
王清欢
3文章数
0粉丝数
王清欢
3 文章 | 0 粉丝
原创

Apache Arrow 进程间通信消息格式

2024-08-29 09:42:17
40
0

01 数据流格式

在 Apache Arrow 的进程间通信(Inter-Process Communication)中提供了 Streaming 和 RandomAccessFile 两种二进制格式,用于读取和写入 RecordBatch 数据

  • Streaming Format:这种格式用于发送任意长度的 RecordBatch 序列,但它必须从开始到结束顺序处理,不支持随机访问,所以 Streaming Format 适用于数据流或网络传输等应用场景
  • RandomAccessFile Format:这种格式用于序列化固定数量的 RecordBatch,由于该格式包含有关 RecordBatch 数量和位置的元数据,所以​支持随机访问​,因此在与内存映射一起使用时非常有用

Arrow IPC Stream 通常是按照 Schema 后面跟一个或多个 RecordBatch 的方式组织;如下图所示 Stream Format 中使用三种消息类型来传输数据:Schema, RecordBatch, DictionaryBatch,如果 Schema 中包含使用字段编码(dictionary-encoded)的数组,那么 Stream 中还会有 DictionaryBatch 格式的消息

image.png

Arrow IPC 传输数据时封装的消息格式 Stream Format 如下图所示,(1)消息开头是 8 个字节 0xFFFFFFFF表示有效消息的开始;(2)接着是一个表示 Schema 元数据信息长度的 32-bit 整数,该整数使用小端字节序编码(Little Endian Int);(3)然后是实际的 Schema 元数据;(4)接着是为了确保数据结构的对齐实现跨平台特性,可能包含填充字节使得整个消息的总长度是 8 字节的倍数;(5)最后就是包含要传输数据 RecordBatchs 的消息体

image.png

RandomAccessFile Format 的文件格式只是 Streaming Format 的一个扩展,如下图所示仅比 Streaming Format 多了表示文件开头结尾的 Magic String 和包含元数据信息的Footer,具体来说包括:

  1. ​​**Magic String (ARROW1​)**​:表示文件开头,其中 ARROW1是一个固定的字节序列在源码cpp\src\arrow\ipc\metadata_internal.h中被定义为kArrowMagicBytes,用于快速检查文件是否是 Arrow 格式
  2. Empty padding to 8-byte boundary​:为了确保文件结构的对齐,可能包含空的填充字节,使得从Magic String开始到实际数据部分的开始是 8 字节边界对齐的
  3. Data Using the Streaming Format with EOS indicator​:文件的主体,包含了使用 Streaming Format序列化的数据;数据最后面可能跟着一个 End Of Stream(EOS)指示器,表示没有更多的数据
  4. FlatBuffer Footer Message Bytes​:这部分是 FlatBuffer 形式的 Footer 消息,包含了Streaming Format中 Schema 的拷贝和文件中每个数据块在内存中的偏移量(Offset)和长度,这些元数据信息是 RandomAccessFile Format 文件格式实现随机访问的关键
  5. Footer Size​:一个 32 位的整数采用小端字节序编码,表示 Footer 消息的字节长度
  6. ​​**Magic String (ARROW1)**​:文件结尾标识,与文件开始处相同,用于标识文件的结束

image.png

02 元数据序列化

Arrow IPC Format 中可以看到消息格式中的元数据信息都是使用 FlatBuffers 来进行序列化的,为什么这么做呢?

对于进行间通信而言,消息在不同进程之间传递需要进行序列化和反序列化(serialization/deserialization),数据序列化和反序列化的速度越快,数据发送的速度就越快;所以,对于 IPC 过程来说,序列化和反序列化的开销十分重要

为了优化序列化和反序列化的开销,Apache Arrow IPC Format 采用了高性能序列化方式 FlatBuffers 来对元数据(metadata)进行序列化,对于消息体直接使用数据缓冲区(raw data buffers)传输数据而不需要进行序列化操作(那么对于跨节点的操作,无法直接使用内存的情况下是怎么传输的呢?)

在介绍物理布局时(article url)已经详细介绍过了 raw data buffers 的传输格式,下面详细介绍一下元数据的序列化方式 FlatBuffers

2.1 FlatBuffers 数据结构定义

首先简单介绍一下 FlatBuffers,它是一个由 Google 开发的跨平台序列化库,允许直接访问序列化的数据结构,无需解析成中间格式;支持​零拷贝反序列化​,即在反序列化过程中无需复制数据到内存的其他部分,从而避免了额外的内存分配和临时对象的创建;基于诸多高性能特性,访问 FlatBuffers 序列化的数据比访问 JSON、CSV 和Protocol Buffers 等需要解析和复制步骤的格式要高效得多

FlatBuffers 通过 Schema 定义语言来描述要序列化的数据结构,其语法和结构与 C 语言类似是一种类型安全的 IDL(接口描述语言)。下面结合 Apache Arrow 中定义 Schema 的 format\Schema.fbs实例来说明其基本的语法:

  1. namespace​:定义命名空间,用于组织和区分不同的数据结构
namespace org.apache.arrow.flatbuf;
  1. enum​:定义枚举类型,用于限定变量的取值范围
enum Endianness:short { Little, Big }
  1. struct​:定义结构体,类似于 C 语言中的struct,用于创建复杂的数据类型
struct Buffer {
  offset: long;
  length: long;
}
  1. table​:定义表,类似于数据库中的表,可以包含多个字段,并且字段的值可以是不同的类型
table Schema {
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];

  /// Features used in the stream/file.
  features : [ Feature ];
}
  1. root_type​:指定序列化文件的根类型,即文件中最顶层的数据类型
root_type Schema;
  1. file_identifier​:为文件定义一个唯一的标识符,用于文件格式的验证。
file_identifier:"SCHEMA";
  1. attribute​:定义属性,可以用于表或结构体,提供额外的元数据
table Person {
  id:int;
  [deprecated] name:string; // 表示 name 字段已经废弃
}
  1. array​:定义固定长度的数组
array Vec3[3] = [ v1, v2, v3 ];
  1. vector​:定义动态数组,用于存储一系列相同类型的元素
vector inventory:[ubyte];
  1. union​:定义联合体,类似于 C 语言的联合体,可以存储多种不同但兼容的类型
union Type {
  Null,
  Int,
  FloatingPoint,
  Binary,
  Utf8,
  Bool,
  Decimal,
  Date,
  Time,
  Timestamp,
  Interval,
  List,
  Struct_,
  Union,
  FixedSizeBinary,
  FixedSizeList,
  Map,
  Duration,
  LargeBinary,
  LargeUtf8,
  LargeList,
  RunEndEncoded,
  BinaryView,
  Utf8View,
  ListView,
  LargeListView,
}
  1. string​:定义字符串类型
table KeyValue {
  key: string;
  value: string;
}

通过这些关键字,可以定义复杂的数据结构,并使用 FlatBuffers 工具生成相应的序列化和反序列化代码;一个使用上述关键字定义的 Arrow 中 Schema 的部分数据结构示例如下:

// 定义命名空间
namespace org.apache.arrow.flatbuf;

// 定义枚举类型
enum Endianness:short { Little, Big }

// 定义结构体
struct Buffer {
  /// The relative offset into the shared memory page where the bytes for this
  /// buffer starts
  offset: long;

  /// The absolute length (in bytes) of the memory buffer. The memory is found
  /// from offset (inclusive) to offset + length (non-inclusive). When building
  /// messages using the encapsulated IPC message, padding bytes may be written
  /// after a buffer, but such padding bytes do not need to be accounted for in
  /// the size here.
  length: long;
}

// 定义表,包含字节序类型、字段、额外元数据等
table Schema {

  /// endianness of the buffer
  /// it is Little Endian by default
  /// if endianness doesn't match the underlying system then the vectors need to be converted
  endianness: Endianness=Little;

  fields: [Field];
  // User-defined metadata
  custom_metadata: [ KeyValue ];

  /// Features used in the stream/file.
  features : [ Feature ];
}

// 指定文件的根类型
root_type Schema;

2.2 FlatBuffers 序列化

FlatBuffers 序列化的主要逻辑是将对象数据存储于一个连续的一维字节数组 ByteBuffer 中;在该数组中,每个对象被划分为两个关键部分,元数据部分:包含必要的索引信息;真实数据部分:存储具体的数据值

与大多数内存中的数据结构不同的是,FlatBuffers 遵循严格的内存对齐规则和字节顺序序规范,确保了序列化数据的跨平台兼容性。此外,对于 table 类型的对象,FlatBuffers 还提供了前向和后向兼容性,以及对 optional字段的支持,从而适应数据格式的演变

除了解析效率以外,二进制格式还带来了另一个优势,数据的二进制表示通常更具有效率;可以使用 4 字节的 uint 而不是 10 个字符来存储 10 位数字的整数

FlatBuffers 序列化基本使用原则:

  • 小端模式:FlatBuffers 对各种基本数据的存储都是按照小端模式来进行的,因为这种模式目前和大部分处理器的存储模式是一致的,可以加快数据读写的数据。
  • 写入数据方向和读取数据方向不同

image.png

FlatBuffers 向 ByteBuffer 中写入数据的顺序是从 ByteBuffer 的尾部向头部填充,由于这种增长方向和 ByteBuffer 默认的增长方向不同,因此 FlatBuffers 在向 ByteBuffer 中写入数据的时候就不能依赖 ByteBuffer 的 position 来标记有效数据位置,而是自己维护了一个 space 变量来指明有效数据的位置,在分析 FlatBuffersBuilder 的时候要特别注意这个变量的增长特点

但是,和数据的写入方向不同的是,FlatBuffers 从 ByteBuffer 中解析数据的时候又是按照 ByteBuffer 正常的顺序来进行的。FlatBuffers 这样组织数据存储的好处是,在从左到右解析数据的时候,能够保证最先读取到的就是整个 ByteBuffer 的概要信息(例如 Table 类型的 vtable 字段),方便解析。

2.3 FlatBuffers 反序列化

基于序列化阶段的严格处理,FlatBuffers 的反序列化过程就很简单了;由于在序列化阶段每个字段的偏移量 offset 已被精确记录,因此反序列化本质上是从 ByteBuffer 中的指定偏移量处提取数据

反序列化过程从 root table 开始,沿着二进制流逐步向后读取:

首先,从包含字段偏移量信息的 vtable 中检索所需的偏移量 offset

然后,根据对应的 offset 在相应的 object 中定位到具体的字段,如果字段是引用类型(string、vector、table 等),则读取出 offset 并解析这些引用的 offset,进一步检索它们指向的实际偏移量;对于非引用类型,则直接根据 vtable 中的 offset 定位到数据直接读取

对于标量字段需要区分默认值和非默认值两种情况:对于默认值字段,反序列化时会直接采用由 flatc编译器后生成的文件中记录的默认值读取出来;而对于非默认值字段,二进制流中就会记录该字段的 offset,值也会存储在二进制流中,反序列化时直接根据 offset 读取字段值即可

整个反序列化过程是零拷贝(zero-copy)的,这意味着它不会消耗额外的内存资源,也不会创建数据的副本。并且 FlatBuffers 可以读取任意字段,而不是像 Json 和 protocol buffer 需要读取整个对象以后才能获取某个字段。FlatBuffers 的主要优势就在反序列化这里了。所以 FlatBuffers 可以做到解码速度极快,或者说无需解码直接读取。

03 IPC example

下面通过一个简单的 IPC 实例来加深对 Arrow 中二进制数据流传输的理解,该例子没有真正意义上的多进程之间通信,而是简单模拟了向 Arrow IPC Stream 中写入数据,然后从中读取数据并打印,具体过程包括:

  • 根据 JSON 字符串创建一个 RecordBatch
  • 将数据写入到 Arrow IPC Stream 中
  • 从 Arrow IPC Stream 读取该数据并打印
  1. 引用必要的库文件

引用与 Arrow IPC Stream 操作相关的 arrow 头文件,以及 json 数据处理等库文件

#include <arrow/api.h>
#include <arrow/ipc/writer.h>
#include <arrow/json/reader.h>
#include <arrow/memory_pool.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <iostream>
#include <string>
  1. 根据 JSON 字符串创建一个 RecordBatch

在介绍 RecordBatch 时提到其由元数据 Schema 和具体数据 Arrays 组成,其中 Schema 又由表示 Arrays 名称和类型的 Fields 构成,下面创建 RecordBatch 也按这样的组织方式进行构建

首先,使用arrow::field函数创建一个字段,接受字段名和字段类型作为参数,并将将创建的字段添加到字段类型列表 fields

然后,使用字段列表 fields 创建一个 Arrow 模式(Schema)对象 schema

接着,使用 Arrow JSON 读取器解析 JSON 数据,并根据 JSON 数据创建一个 Arrow 数组 array

最后,准备好了构建 RecordBatch 的所有组成部分,使用schemaarray创建一个 RecordBatch

std::shared_ptr<arrow::MemoryPool> mem = arrow::default_memory_pool();
    arrow::DataTypeVector fields;
    auto field = arrow::field("list", arrow::list(arrow::binary()));
    fields.push_back(field);
    auto schema = std::make_shared<arrow::Schema>(fields);

    // Create a JSON reader from a string of data
    auto input_data = R"([
        ["index1"], 
        ["index3", "tag_int"], ["index5", "tag_int"],
        ["index6", "tag_int"], ["index7", "tag_int"], 
        ["index7", "tag_int"],
        ["index8"]
    ])";
    auto reader = arrow::json::Reader::Make(arrow::default_memory_pool(), schema, input_data);

    // Read the data into an Array
    std::shared_ptr<arrow::Array> array;
    ARROW_ASSIGN_OR_RAISE(array, reader->Read());

    // Create a RecordBatch from the Array
    auto record = arrow::RecordBatch::Make(schema, array->length(), {array});
  1. 将数据写入到 Arrow IPC Stream 中

使用 arrow::ipc::MakeStreamWriter 创建的 IPC 流写入器,将使用 Slice 从 RecordBatch 提取出的部分数据序列化并写入到流中

// Create a slice of the RecordBatch
    auto slice = record->Slice(1, 2);

    // Write the sliced RecordBatch to an IPC stream
    ARROW_ASSIGN_OR_RAISE(auto output, arrow::ipc::MakeStreamWriter(arrow::default_memory_pool(), &slice->schema()));
    output->WriteRecordBatch(*slice);
  1. 从 Arrow IPC Stream 读取该数据并打印

创建一个的 arrow::ipc::RecordBatchStreamReader对象 reader2,该读取器从 output 对象的序列化输出中创建的,output->Finish()方法返回一个包含序列化数据的 Buffer;然后从 reader2 中读取数据并打印输出

// Read the IPC stream back into a RecordBatch
    std::shared_ptr<arrow::RecordBatchReader> reader2;
    ARROW_ASSIGN_OR_RAISE(reader2, arrow::ipc::RecordBatchStreamReader::Open(std::make_shared<arrow::io::BufferReader>(output->Finish())));

    // Print the contents of the RecordBatch
    std::shared_ptr<arrow::RecordBatch> readbatch;
    ARROW_ASSIGN_OR_RAISE(readbatch, reader2->ReadRecordBatch());
    std::cout << readbatch->ToString() << std::endl;

上述实例的完整代码实现如下:

#include <arrow/api.h>
#include <arrow/ipc/writer.h>
#include <arrow/json/reader.h>
#include <arrow/memory_pool.h>
#include <arrow/result.h>
#include <arrow/status.h>
#include <iostream>
#include <string>

int main() {
    std::shared_ptr<arrow::MemoryPool> mem = arrow::default_memory_pool();
    arrow::DataTypeVector fields;
    auto field = arrow::field("list", arrow::list(arrow::binary()));
    fields.push_back(field);
    auto schema = std::make_shared<arrow::Schema>(fields);

    // Create a JSON reader from a string of data
    auto input_data = R"([
        ["index1"], 
        ["index3", "tag_int"], ["index5", "tag_int"],
        ["index6", "tag_int"], ["index7", "tag_int"], 
        ["index7", "tag_int"],
        ["index8"]
    ])";
    auto reader = arrow::json::Reader::Make(arrow::default_memory_pool(), schema, input_data);

    // Read the data into an Array
    std::shared_ptr<arrow::Array> array;
    ARROW_ASSIGN_OR_RAISE(array, reader->Read());

    // Create a RecordBatch from the Array
    auto record = arrow::RecordBatch::Make(schema, array->length(), {array});

    // Create a slice of the RecordBatch
    auto slice = record->Slice(1, 2);

    // Write the sliced RecordBatch to an IPC stream
    ARROW_ASSIGN_OR_RAISE(auto output, arrow::ipc::MakeStreamWriter(arrow::default_memory_pool(), &slice->schema()));
    output->WriteRecordBatch(*slice);

    // Read the IPC stream back into a RecordBatch
    std::shared_ptr<arrow::RecordBatchReader> reader2;
    ARROW_ASSIGN_OR_RAISE(reader2, arrow::ipc::RecordBatchStreamReader::Open(std::make_shared<arrow::io::BufferReader>(output->Finish())));

    // Print the contents of the RecordBatch
    std::cout << slice->ToString() << std::endl;

    return 0;
}
文章来自个人专栏
数据库研发
3 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
0
0