searchusermenu
  • 发布文章
  • 消息中心
点赞
收藏
评论
分享
原创

深入HDFS EC写流程

2023-05-30 06:28:33
105
0

一、背景

 由于正式进入DT时代,人们每天产生的数据量越来越多,而数据随着时间增长价值会不断降低,如果用低成本保存低价值的数据成为每个公司必须要考虑的问题。而早期的HDFS是使用多副本策略对数据进行保存,假设副本数为3,导致数据的价值度为33%。虽然多副本策略更能适应“移动计算,不移动数据”的分布式计算原则,读取数据时也不会引入额外的计算,但是由于冷数据的访问频率很低,多副本策略成本太高。在这种场景下,就需要利用时间去解决空间的问题。

 HDFS EC的引入就是为了解决存储效率过低的问题。目前EC有两种方式,一种是XOR方式,与磁盘高可用方式中的raid5类似,通过两块数据块生成一块校验块,该方式存储效率为67%(2/3),只能容忍一块数据块出故障;另外一种方式是Reed Solomon码,通过生成矩阵产生N个校验块,能够容忍N个块出现故障。

下面是不同副本策略的比较:

下面将以RS(6,3)为例,深入分析EC的流程

二、HDFS EC引入的概念

 HDFS为了引入EC,对数据块的布局进行了改造,并引入了新的对象

 之前的HDFS是以Block为单位写入数据的,写入一个Block,并通过PipeLine依次同步到其他Datanode上后,再写入写一个block

而HDFS EC改变了这个方式,它以cell为单位写入每一个目标Datanode,以RS(6,3)为例,写完6个cell后,会根据这些cell生成3个校验的cell,而所有9个cell会组成一个stripe。

可以看到副本策略中,同一个block中是相邻的数据,而EC策略中,同一个block中的不同cell的数据不相邻

为了使原来的代码兼容EC策略,HDFS源码中使用BlockinfoContiguous代表副本策略的Block,而BlockinfoStripe代表了EC策略的Block

三、HDFS EC对写流程的改造

(一)为某个路径设置EC策略

hdfs ec -setPolicy -path <path> -policy <policyName>

##根据EC策略的名字,创建EC相关xAttr
WritableUtils.writeString(dOut, ecPolicy.getName());
ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_POLICY,
          bOut.toByteArray());
xattrs.add(ecXAttr);

##将xAttr绑定到路径对应的inode上
inode.addXAttrFeature(new XAttrFeature(xAttrs), snapshotId);

(二)客户端启动写入相关对象

 熟悉HDFS写入过程都听见过DFSOutputStream和DataStreamer,其中DFSOutputStream是写数据的入口,而DataStreamer负责将数据写入pipeline,而HDFS EC分别引入了相关的子类DFSOutputStream->DFSStripedOutputStream,DataStreamer->StripedDataStreamer

DFSStripedOutputStream中保存了每个target对应的StripedDataStreamer,并且包含了MultipleBlockingQueue、Coordinator和CellBuffers。MultipleBlockingQueue用于保存需要发送给不同StripedDataStreamer的数据块或者其他信息,Coordinator用于管理多个MultipleBlockingQueue,CellBuffers用于缓存后续需要进行编码的数据以及编码后的数据。在DFSStripedOutputStream初始化中会设置CurrentStreamer为第一个StripedDataStreamer。

for (short i = 0; i < numAllBlocks; i++) {
  StripedDataStreamer streamer = new StripedDataStreamer(stat,
      dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
      favoredNodes, i, coordinator, getAddBlockFlags());
  streamers.add(streamer);
}
setCurrentStreamer(0)

客户端在调用create请求创建文件后,就会创建DFSStripedOutputStream以及StripedDataStreamer,并启动所有的StripedDataStreamer。

if(stat.getErasureCodingPolicy() != null) {
  out = new DFSStripedOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes);
} else {
  out = new DFSOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes, true);
}
out.start();
######
for (StripedDataStreamer streamer : streamers) {
  streamer.start();
}

(三)写入流程

EC的写入流程从DFSStripedOutputStream的writeChunk方法开始

1、先将数据写入cellBuffers中,用于后面计算校验块

2、第一次写数据时,首先需要分配数据块,即确定需要写到哪些Datanode上,调用namenode.addblock方法分配数据块

1)确定目标target的数目,并选择Datanode,最后封装成LocatedBlock

1、计算目标Datanode的数量
FSDirWriteFileOp.ValidateAddBlockResult r =
          FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
                                            previous, onRetryBlock);
###########
if (blockType == BlockType.STRIPED) {
  ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(fsn, iip);
  numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
          + ecPolicy.getSchema().getNumParityUnits());
} else {
  numTargets = pendingFile.getFileReplication();
}
2、选择目标Datanode
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
        blockManager, src, excludedNodes, favoredNodes, flags, r);
3、创建Block,并加入对应的inode,最后将location信息封装
LocatedBlock  lb = FSDirWriteFileOp.storeAllocatedBlock(
          this, src, fileId, clientName, previous, targets);

2)addblock调用返回的LocatedStripedBlock对应了BlockGroup,通过parseStripedBlockGroup解析LocatedStripedBlock,并构造internalBlock(6个DataBlock和3个parityBlock)

final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
        (LocatedStripedBlock) lb, cellSize, numDataBlocks,
        numAllBlocks - numDataBlocks);

3)将构造的内部块放到对应的followingBlocks的队列中

coordinator.getFollowingBlocks().offer(i, blocks[i])

4)StripedDataStreamer的run方法里,将从对应的followingBlocks的队列中获取新构造的内部块

coordinator.getFollowingBlocks().poll(index)

并通过createBlockOutputStream方法与相应的Datanode建立输入和输出流

createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,false);

3、调用DFSOutputStream的writeChunk方法写入数据

当前DFSStripedOutputStream中的currentStreamer指向了第一个StripedDataStreamer

1)将当前chunk对应的校验和和数据写入packet中

2)如果该packet中的chunk达到了一个packet能够包含的chunk的最大数量,就将该packet发送到currentStreamer对应的dataQueue中

3)DataStreamer的run方法中会从dataQueue中获取第二步放入的packet

4)首先会将待发送的packet发送到ackqueue中,用于后续保证数据块的发送,然后通过之前与Datanode建立的输出流将packet发往Datanode。

如果写满一个cell,DFSStripedOutputStream的currentStreamer将指向下一个StripedDataStreamer,继续写出数据。

4、如果写满第六个cell,即达到numDataBlocks,就会计算检验块,输入数据是Cellbuffer里的buffers数组,输出是Cellbuffer里的checksumArrays

5、调用DFSOutputStream的writeChunk方法写入校验数据,步骤与第三步一样

注意:这里有两种校验数据,一种是用于保证数据正确性的,最后会写入到block对应的meta文件里,另外一种是为了保证数据高可用而写出的校验块,校验块也有自己本身的数据和校验数据

输出完一组data+parity后,就产生了一个strip,然后又重新从第一个StripedDataStreamer开始写出新的strip对应的数据

6、结束一个BlockGroup

当写入的数据量满足一个BlockGroup的大小

currentBlockGroup != null &&currentBlockGroup.getNumBytes() == blockSize * numDataBlocks

1)向所有的StripedDataStreamer发送空的packet,表示一个数据块结束

2)每个StripedDataStreamer的run方法中,会获取空的packet,首先会向endBlocks所有队列发送数据块

coordinator.offerEndBlock(index, block.getCurrentBlock());

其次会关闭该StripedDataStreamer对应Datanode的输出输入流

7、当有新数据写入时会重新申请数据块组

当申请数据块组前,会等待之前的所有的内部数据块结束,并检查数据块

for (int i = 0; i < numAllBlocks; i++) {
    waitEndBlocks(i);
}
#######
ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i)
StripedBlockUtil.checkBlocks(currentBlockGroup, i, b)

0条评论
0 / 1000
王****聪
2文章数
0粉丝数
王****聪
2 文章 | 0 粉丝
王****聪
2文章数
0粉丝数
王****聪
2 文章 | 0 粉丝
原创

深入HDFS EC写流程

2023-05-30 06:28:33
105
0

一、背景

 由于正式进入DT时代,人们每天产生的数据量越来越多,而数据随着时间增长价值会不断降低,如果用低成本保存低价值的数据成为每个公司必须要考虑的问题。而早期的HDFS是使用多副本策略对数据进行保存,假设副本数为3,导致数据的价值度为33%。虽然多副本策略更能适应“移动计算,不移动数据”的分布式计算原则,读取数据时也不会引入额外的计算,但是由于冷数据的访问频率很低,多副本策略成本太高。在这种场景下,就需要利用时间去解决空间的问题。

 HDFS EC的引入就是为了解决存储效率过低的问题。目前EC有两种方式,一种是XOR方式,与磁盘高可用方式中的raid5类似,通过两块数据块生成一块校验块,该方式存储效率为67%(2/3),只能容忍一块数据块出故障;另外一种方式是Reed Solomon码,通过生成矩阵产生N个校验块,能够容忍N个块出现故障。

下面是不同副本策略的比较:

下面将以RS(6,3)为例,深入分析EC的流程

二、HDFS EC引入的概念

 HDFS为了引入EC,对数据块的布局进行了改造,并引入了新的对象

 之前的HDFS是以Block为单位写入数据的,写入一个Block,并通过PipeLine依次同步到其他Datanode上后,再写入写一个block

而HDFS EC改变了这个方式,它以cell为单位写入每一个目标Datanode,以RS(6,3)为例,写完6个cell后,会根据这些cell生成3个校验的cell,而所有9个cell会组成一个stripe。

可以看到副本策略中,同一个block中是相邻的数据,而EC策略中,同一个block中的不同cell的数据不相邻

为了使原来的代码兼容EC策略,HDFS源码中使用BlockinfoContiguous代表副本策略的Block,而BlockinfoStripe代表了EC策略的Block

三、HDFS EC对写流程的改造

(一)为某个路径设置EC策略

hdfs ec -setPolicy -path <path> -policy <policyName>

##根据EC策略的名字,创建EC相关xAttr
WritableUtils.writeString(dOut, ecPolicy.getName());
ecXAttr = XAttrHelper.buildXAttr(XATTR_ERASURECODING_POLICY,
          bOut.toByteArray());
xattrs.add(ecXAttr);

##将xAttr绑定到路径对应的inode上
inode.addXAttrFeature(new XAttrFeature(xAttrs), snapshotId);

(二)客户端启动写入相关对象

 熟悉HDFS写入过程都听见过DFSOutputStream和DataStreamer,其中DFSOutputStream是写数据的入口,而DataStreamer负责将数据写入pipeline,而HDFS EC分别引入了相关的子类DFSOutputStream->DFSStripedOutputStream,DataStreamer->StripedDataStreamer

DFSStripedOutputStream中保存了每个target对应的StripedDataStreamer,并且包含了MultipleBlockingQueue、Coordinator和CellBuffers。MultipleBlockingQueue用于保存需要发送给不同StripedDataStreamer的数据块或者其他信息,Coordinator用于管理多个MultipleBlockingQueue,CellBuffers用于缓存后续需要进行编码的数据以及编码后的数据。在DFSStripedOutputStream初始化中会设置CurrentStreamer为第一个StripedDataStreamer。

for (short i = 0; i < numAllBlocks; i++) {
  StripedDataStreamer streamer = new StripedDataStreamer(stat,
      dfsClient, src, progress, checksum, cachingStrategy, byteArrayManager,
      favoredNodes, i, coordinator, getAddBlockFlags());
  streamers.add(streamer);
}
setCurrentStreamer(0)

客户端在调用create请求创建文件后,就会创建DFSStripedOutputStream以及StripedDataStreamer,并启动所有的StripedDataStreamer。

if(stat.getErasureCodingPolicy() != null) {
  out = new DFSStripedOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes);
} else {
  out = new DFSOutputStream(dfsClient, src, stat,
      flag, progress, checksum, favoredNodes, true);
}
out.start();
######
for (StripedDataStreamer streamer : streamers) {
  streamer.start();
}

(三)写入流程

EC的写入流程从DFSStripedOutputStream的writeChunk方法开始

1、先将数据写入cellBuffers中,用于后面计算校验块

2、第一次写数据时,首先需要分配数据块,即确定需要写到哪些Datanode上,调用namenode.addblock方法分配数据块

1)确定目标target的数目,并选择Datanode,最后封装成LocatedBlock

1、计算目标Datanode的数量
FSDirWriteFileOp.ValidateAddBlockResult r =
          FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
                                            previous, onRetryBlock);
###########
if (blockType == BlockType.STRIPED) {
  ecPolicy = FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(fsn, iip);
  numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
          + ecPolicy.getSchema().getNumParityUnits());
} else {
  numTargets = pendingFile.getFileReplication();
}
2、选择目标Datanode
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
        blockManager, src, excludedNodes, favoredNodes, flags, r);
3、创建Block,并加入对应的inode,最后将location信息封装
LocatedBlock  lb = FSDirWriteFileOp.storeAllocatedBlock(
          this, src, fileId, clientName, previous, targets);

2)addblock调用返回的LocatedStripedBlock对应了BlockGroup,通过parseStripedBlockGroup解析LocatedStripedBlock,并构造internalBlock(6个DataBlock和3个parityBlock)

final LocatedBlock[] blocks = StripedBlockUtil.parseStripedBlockGroup(
        (LocatedStripedBlock) lb, cellSize, numDataBlocks,
        numAllBlocks - numDataBlocks);

3)将构造的内部块放到对应的followingBlocks的队列中

coordinator.getFollowingBlocks().offer(i, blocks[i])

4)StripedDataStreamer的run方法里,将从对应的followingBlocks的队列中获取新构造的内部块

coordinator.getFollowingBlocks().poll(index)

并通过createBlockOutputStream方法与相应的Datanode建立输入和输出流

createBlockOutputStream(nodes, storageTypes, storageIDs, 0L,false);

3、调用DFSOutputStream的writeChunk方法写入数据

当前DFSStripedOutputStream中的currentStreamer指向了第一个StripedDataStreamer

1)将当前chunk对应的校验和和数据写入packet中

2)如果该packet中的chunk达到了一个packet能够包含的chunk的最大数量,就将该packet发送到currentStreamer对应的dataQueue中

3)DataStreamer的run方法中会从dataQueue中获取第二步放入的packet

4)首先会将待发送的packet发送到ackqueue中,用于后续保证数据块的发送,然后通过之前与Datanode建立的输出流将packet发往Datanode。

如果写满一个cell,DFSStripedOutputStream的currentStreamer将指向下一个StripedDataStreamer,继续写出数据。

4、如果写满第六个cell,即达到numDataBlocks,就会计算检验块,输入数据是Cellbuffer里的buffers数组,输出是Cellbuffer里的checksumArrays

5、调用DFSOutputStream的writeChunk方法写入校验数据,步骤与第三步一样

注意:这里有两种校验数据,一种是用于保证数据正确性的,最后会写入到block对应的meta文件里,另外一种是为了保证数据高可用而写出的校验块,校验块也有自己本身的数据和校验数据

输出完一组data+parity后,就产生了一个strip,然后又重新从第一个StripedDataStreamer开始写出新的strip对应的数据

6、结束一个BlockGroup

当写入的数据量满足一个BlockGroup的大小

currentBlockGroup != null &&currentBlockGroup.getNumBytes() == blockSize * numDataBlocks

1)向所有的StripedDataStreamer发送空的packet,表示一个数据块结束

2)每个StripedDataStreamer的run方法中,会获取空的packet,首先会向endBlocks所有队列发送数据块

coordinator.offerEndBlock(index, block.getCurrentBlock());

其次会关闭该StripedDataStreamer对应Datanode的输出输入流

7、当有新数据写入时会重新申请数据块组

当申请数据块组前,会等待之前的所有的内部数据块结束,并检查数据块

for (int i = 0; i < numAllBlocks; i++) {
    waitEndBlocks(i);
}
#######
ExtendedBlock b = coordinator.endBlocks.takeWithTimeout(i)
StripedBlockUtil.checkBlocks(currentBlockGroup, i, b)

文章来自个人专栏
大数据组件内核
2 文章 | 1 订阅
0条评论
0 / 1000
请输入你的评论
1
1