一、Sequence File的读写
Sequence File文件介绍参考本专栏的hdfs的文件存储格式及压缩算法中的介绍,如果需要更多的信息则需要自行搜索其他的资源。
1、Sequence File的格式
根据压缩类型,有3种不同的Sequence File格式:未压缩格式、record压缩格式、block压缩格式。
Sequence File由一个header和一个或多个record组成。
以上三种格式均使用相同的header结构,如下所示:
前3个字节为SEQ,表示该文件是序列文件,后跟一个字节表示实际版本号(例如SEQ4或SEQ6)。
Header中其他也包括key、value class名字、 压缩细节、metadata、Sync marker。 Sync Marker同步标记,用于可以读取任意位置的数据。
1)、未压缩格式
未压缩的Sequence File文件由header、record、sync三个部分组成。
其中record包含了4个部分:record length(记录长度)、key length(键长)、key、value。
每隔几个record(100字节左右)就有一个同步标记
2)、基于record压缩格式
基于record压缩的Sequence File文件由header、record、sync三个部分组成。
其中record包含了4个部分:record length(记录长度)、key length(键长)、key、compressed value(被压缩的值)。
每隔几个record(100字节左右)就有一个同步标记。
3)、基于block压缩格式
基于block压缩的Sequence File文件由header、block、sync三个部分组成。
block指的是record block,可以理解为多个record记录组成的块。这个block和HDFS中分块存储的block(128M)是不同的概念。
Block中包括:record条数、压缩的key长度、压缩的keys、压缩的value长度、压缩的values。每隔一个block就有一个同步标记。
block压缩比record压缩提供更好的压缩率。使用Sequence File时,通常首选块压缩。
2、Sequence File文件读写
1)、pom.xml
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
</dependencies>
2)、实现类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFileRW {
private static final String[] DATA = {
"One, two, buckle my shoe",
"Three, four, shut the door",
"Five, six, pick up sticks",
"Seven, eight, lay them straight",
"Nine, ten, a big fat hen"
};
public static void main(String[] args) throws Exception {
// 设置客户端运行身份 以root去操作访问HDFS
System.setProperty("HADOOP_USER_NAME", "alanchan");
// Configuration 用于指定相关参数属性
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://HadoopHAcluster");
conf.set("dfs.nameservices", "HadoopHAcluster");
conf.set("dfs.ha.namenodes.HadoopHAcluster", "nn1,nn2");
conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn1", "server1:8020");
conf.set("dfs.namenode.rpc-address.HadoopHAcluster.nn2", "server2:8020");
conf.set("dfs.client.failover.proxy.provider.HadoopHAcluster","org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
Path path = new Path("/testseq/test.seq");
write(conf, path);
read(conf, path);
}
public static void write(Configuration conf, Path path) throws Exception {
// sequence file key、value
IntWritable key = new IntWritable();
Text value = new Text();
// 构造Writer参数属性
SequenceFile.Writer writer = null;
CompressionCodec Codec = new GzipCodec();
SequenceFile.Writer.Option optPath = SequenceFile.Writer.file(path);
SequenceFile.Writer.Option optKey = SequenceFile.Writer.keyClass(key.getClass());
SequenceFile.Writer.Option optVal = SequenceFile.Writer.valueClass(value.getClass());
SequenceFile.Writer.Option optCom = SequenceFile.Writer.compression(SequenceFile.CompressionType.RECORD, Codec);
try {
writer = SequenceFile.createWriter(conf, optPath, optKey, optVal, optCom);
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s", writer.getLength(), key, value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
System.out.println("写完了");
}
public static void read(Configuration conf, Path path) throws Exception {
SequenceFile.Reader.Option option1 = SequenceFile.Reader.file(path);
SequenceFile.Reader.Option option2 = SequenceFile.Reader.length(374);// 这个374参数表示读取的长度
SequenceFile.Reader reader = null;
try {
reader = new SequenceFile.Reader(conf, option1, option2);
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
long position = reader.getPosition();
while (reader.next(key, value)) {
String syncSeen = reader.syncSeen() ? "*" : "";// 是否返回了Sync Mark同步标记
System.out.printf("[%s%s]\t%s\t%s\n", position, syncSeen, key, value);
position = reader.getPosition(); // beginning of next record
}
} finally {
IOUtils.closeStream(reader);
}
}
}
二、小文件合并
将所有的小文件写入到一个Sequence File中,即将文件名作为key,文件内容作为value序列化到Sequence File大文件中
import java.io.File;
import java.io.FileInputStream;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.List;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.Text;
public class SmallFilesMergeBySequenceFile {
private List<String> smallFilePaths = new ArrayList<String>();
public void addInputPath(String path) throws Exception {
File file = new File(path);
if (file.isDirectory()) {
File[] files = FileUtil.listFiles(file);
for (File sFile : files) {
smallFilePaths.add(sFile.getPath());
System.out.println("添加小文件路径:" + sFile.getPath());
}
} else {
smallFilePaths.add(file.getPath());
System.out.println("添加小文件路径:" + file.getPath());
}
}
public void mergeFile(Configuration configuration, Path path) throws Exception {
Writer.Option bigFile = Writer.file(path);
Writer.Option keyClass = Writer.keyClass(Text.class);
Writer.Option valueClass = Writer.valueClass(BytesWritable.class);
Writer writer = SequenceFile.createWriter(configuration, bigFile, keyClass, valueClass);
Text key = new Text();
for (String sfps : smallFilePaths) {
File file = new File(sfps);
long fileSize = file.length();
byte[] fileContent = new byte[(int) fileSize];
FileInputStream inputStream = new FileInputStream(file);
inputStream.read(fileContent, 0, (int) fileSize);
String md5Str = DigestUtils.md5Hex(fileContent);
System.out.println("merge小文件:" + sfps + ",md5:" + md5Str);
key.set(sfps);
// 把文件路径作为key,文件内容做为value,放入到sequencefile中
writer.append(key, new BytesWritable(fileContent));
}
writer.hflush();
writer.close();
}
public void readMergedFile(Configuration configuration, Path path) throws Exception {
Reader.Option file = Reader.file(path);
Reader reader = new Reader(configuration, file);
Text key = new Text();
BytesWritable value = new BytesWritable();
while (reader.next(key, value)) {
byte[] bytes = value.copyBytes();
String md5 = DigestUtils.md5Hex(bytes);
String content = new String(bytes, Charset.forName("GBK"));
System.out.println("读取到文件:" + key + ",md5:" + md5 + ",content:" + content);
}
}
public static void main(String[] args) throws Exception {
Configuration configuration = new Configuration();
SmallFilesMergeBySequenceFile msf = new SmallFilesMergeBySequenceFile();
Path path = new Path("");
msf.addInputPath("");//
msf.mergeFile(configuration, path);
msf.readMergedFile(configuration, path);
}
}