MapperTask -> Shuffle(分区排序分组) -> ReducerTask
MapReduce执行步骤
- Map处理任务
- 读取文件每一行,解析成<key、value>,调用map函数
- 处理逻辑对key、value处理,行成新的key、value
- 数据分区
- Reduce处理任务
- 拷贝map任务输出到reduce节点,对map任务输出合并,排序
- 处理逻辑处理key、value,行成新的key、value
- 保存到文件中
- 准备文件
vim word.txt
hello Jack
hello Tom
hello Jimi
hello Mili
hello Make
- 上传文件
hadoop fs -put word.txt /word.txt
hadoop fs -ls / # 查看
- 运行任务
cd hadoop-2.8.5/share/hadoop/mapreduce
hadoop jar hadoop-mapreduce-examples-2.8.5.jar wordcount /word.txt /wcout
- 查看任务结果
hadoop fs -ls /wcout
hadoop fs -cat /wcout/part-r-00000
Jack 1
Jimi 1
Make 1
Mili 1
Tom 1
hello 5
java示例
- mapper
package mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
/**
* 继承Mapper 实现map计算
* 传递的参数需要实现序列化,通过网络传输
*/
public class MapDemo extends Mapper<LongWritable, Text, Text, LongWritable>{
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException
{
// 接收数据
String line = value.toString();
// 切分单词
String[] words = line.split(" ");
// 将每个单词转为数字
for(String word: words)
{
context.write(new Text(word), new LongWritable(1));
}
}
}
- reducer
package mr;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/**
* 继承Reducer,实现reduce计算
*/
public class ReduceDemo extends Reducer<Text, LongWritable, Text, LongWritable> {
@Override
protected void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException
{
// 定义计数器
long count = 0;
// 统计
for (LongWritable counter : values)
{
count += counter.get();
}
// 输出结果
context.write(key, new LongWritable(count));
}
}
- job
package mr;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
/**
* 统计单词个数
* 运行:hadoop jar hdfsdemo.jar
* 根据实际路径指定输入输出文件
*/
public class WordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 构建Job对象
Job job = Job.getInstance(new Configuration());
// 注意:main方法所在类
job.setJarByClass(WordCount.class);
// 设置输入文件路径
FileInputFormat.setInputPaths(job, new Path(args[0]));
// 设置Mapper属性
job.setMapperClass(MapDemo.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(LongWritable.class);
// 设置Reducer属性
job.setReducerClass(ReduceDemo.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
// 设置输出文件路径
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 提交任务
job.waitForCompletion(true);
}
}
将WordCount
类的打包为jar,上传至服务器,运行
hadoop jar hdfsdemo.jar /word.txt /out
查看输出文件,和haoop中自带的wordcount输出一致
Jack 1
Jimi 1
Make 1
Mili 1
Tom 1
hello 5
总结
导入依赖jar包
hadoop-2.8.5/share/hadoop/mapreduce/
自定义任务
- 分析业务逻辑,确定输入输出样式
- 继承Mapper
- 继承Reducer
- 通过job对象组装Mapper和Reducer