Hadoop的HDFS和MapReduce都是针对大数据文件来设计的,在小文件的处理上不但效率低下,而且十分消耗内存资源。
在【Hadoop】HDFS高可用与高扩展原理分析(HA架构与Federation机制)这篇博客中有提到HDFS的小文件问题,集群启动时DataNode会向NameNode上报所有的Block块信息,每个块(无论大小)对象约占150byte,而NameNode的内存是有限的,那么有可能集群内部其实并没有存储多少数据,但是NameNode已经存满了,针对这个问题采用Federation机制可以缓解。
而MapReduce的小文件问题是指每个Map任务的启动需要消耗一定的时间,并且这个过程是非常消耗性能的,当小文件非常多的时候,就会切出大量InputSplit,而每个Split对应一个Map任务,这些任务的运行时间极短,但启动时间加起来却是一个大数字。
按照正常的逻辑来想,要解决这个问题其实也不难,你只需要把这些小文件组织起来合成一个大文件统一进行Map就可以了。同理,Hadoop针对这个问题,解决办法通常是选择一个容器,将这些小文件组织起来统一存储,HDFS提供了两种类型的容器,分别是SequenceFile 和 MapFile。
1. SequenceFile
要在本地机器上操作HDFS的话就必然要告诉本机集群地址,所以Configuration conf= new Configuration(); conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
这两行代码是必不可少的。因为输出文件地址必须是空的,所以若集群中在这个地址之前有文件的话先删除。且SequenceFile最终输出的结果只能通过reader逐个遍历浏览,不能直接打开查看(打开是乱码)。
SequenceFile对小文件进行合并后,形成键值对,键Key为小文件名称,值Value为小文件的内容。
package com.MR;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import sun.nio.cs.ext.GBK;
import java.io.File;
import java.io.IOException;
/**
* MapReduce小文件问题解决方案
*/
public class SmallFileSeq {
public static void main(String[] args) throws IOException {
write("D:\\smallFile", "/seqFile");
read("/seqFile");
}
private static void write(String inputDir, String outPutFile) throws IOException {
//创建配置对象
Configuration conf= new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
//删除输出文件
fileSystem.delete(new Path(outPutFile), true);
//构造opt数组,有三个元素
/*
输出路径、Key类型、Value类型
*/
SequenceFile.Writer.Option[] opts= new SequenceFile.Writer.Option[]{
SequenceFile.Writer.file(new Path(outPutFile)),
SequenceFile.Writer.keyClass(Text.class),
SequenceFile.Writer.valueClass(Text.class)};
//创建一个Writer实例
SequenceFile.Writer writer = SequenceFile.createWriter(conf, opts);
//指定要压缩的小文件目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
File[] files = inputDirPath.listFiles();
for (File file:files) {
//获取全部文件内容
String content = FileUtils.readFileToString(file, "GBK");
//文件名作为Key
Text key = new Text(file.getName());
//文件内容作为value
Text value = new Text(content);
writer.append(key, value);
}
}
writer.close();
}
private static void read(String inputFile) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(new Path(inputFile)));
Text key = new Text();
Text value = new Text();
while(reader.next(key, value)){
System.out.print("FileName:" + key.toString()+", ");
System.out.println("Content:"+value.toString()+";");
}
reader.close();
}
}
2. MapFile
MapFile是排序后的SequenceFile,其由两部分组成,分别是index和data,index作为文件的数据索引,主要记录了每个数据记录的key值(也就是小文件名),以及该记录在文件中的偏移位置。在MapFile被访问的时候,索引文件会被加载到内存,通过索引映射关系可迅速定位到指定Record所在文件位置,因此,相对SequenceFile而言,MapFile的检索效率是高效的,缺点是会消耗一部分内存来存储index数据。MapFile与SequenceFile的代码部分基本一致,改动的很小。
package com.MR;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.MapFile;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import java.io.File;
import java.io.IOException;
/**
* MapReduce小文件问题解决方案之MapFile
*/
public class SmallFileMap {
public static void main(String[] args) throws IOException {
write("D:\\smallFile", "/mapFile");
read("/mapFile");
}
private static void write(String inputDir, String outPutFile) throws IOException {
//创建配置对象
Configuration conf= new Configuration();
//指定HDFS的地址
conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
//获取操作HDFS的对象
FileSystem fileSystem = FileSystem.get(conf);
//删除输出文件
fileSystem.delete(new Path(outPutFile), true);
//构造opt数组,有三个元素
/*
Key类型、Value类型
*/
SequenceFile.Writer.Option[] opts= new SequenceFile.Writer.Option[]{
MapFile.Writer.keyClass(Text.class),
MapFile.Writer.valueClass(Text.class)};
//创建一个Writer实例
MapFile.Writer writer = new MapFile.Writer(conf, new Path(outPutFile), opts);
//指定要压缩的小文件目录
File inputDirPath = new File(inputDir);
if(inputDirPath.isDirectory()){
File[] files = inputDirPath.listFiles();
for (File file:files) {
//获取全部文件内容
String content = FileUtils.readFileToString(file, "GBK");
//文件名作为Key
Text key = new Text(file.getName());
//文件内容作为value
Text value = new Text(content);
writer.append(key, value);
}
}
writer.close();
}
private static void read(String inputFile) throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://bigdata01:9000");
MapFile.Reader reader = new MapFile.Reader(new Path(inputFile), conf);
Text key = new Text();
Text value = new Text();
while(reader.next(key, value)){
System.out.print("FileName:" + key.toString()+", ");
System.out.println("Content:"+value.toString()+";");
}
reader.close();
}
}