需求
对每个单词进行个数统计,要求根据每个输入文件的行数来规定输出多少个切片。此案例要求每三行放入一个切片中。
输入数据
Nu.txt
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang
banzhang ni hao
xihuan hadoop banzhang banzhang ni hao
xihuan hadoop banzhang
期望输出数据
Number of splits:4
需求分析
编写代码
Mapper类
package com.mr.nline;
import com.mr.wordcount.WordCountMapper;
import com.mr.wordcount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.NLineInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class NlineWordCountDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 0 指定路径
args = new String[]{"E:/Hadoop/src/main/resources/input/nline", "E:/Hadoop/src/main/resources/ouput/nline"};
// 1 获取配置信息configuration以及封装任务job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 设置每个切片的行数
NLineInputFormat.setNumLinesPerSplit(job, 3);
// 将默认的TextInputFormat替换成NLineInputFormat
job.setInputFormatClass(NLineInputFormat.class);
// 2 设置Driver加载路径 setJarByClass
job.setJarByClass(NlineWordCountDriver.class);
// 3 设置map和reduce类 setMaper setReducer
job.setMapperClass(WordCountMapper.class);
job.setReducerClass(WordCountReducer.class);
// 4 设置map输出 setmapoutputkey setmapoutputvalue
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
// 5 设置最终输出kv类型 (reducer的输出kv类型) setoutoutkey setoutputvalue
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
// 6 设置本地的输入和输出路径 fileinputformat.setinputpath
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 7 提交
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
Reducer类
package com.mr.nline;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class NlineWordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
// 0. 将创建对象的操作提取成变量,防止在 map 方法重复创建
private Text text = new Text();
private IntWritable i = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1. 将 Hadoop 内置的text 数据类型转换为string类型
// 方便操作
String string = value.toString();
System.out.println("key:" + key);
// 2. 对字符串进行切分
String[] split = string.split(" ");
// 3. 对字符串数组遍历,将单词映射成 (单词,1)
for (String s : split) {
text.set(s);
context.write(text, i);
}
}
}
Driver类
package com.mr.nline;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class NlineWordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private IntWritable total = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
// 定义一个 sum,用来对每个键值对的 值 做 累加操作
int sum = 0;
for (IntWritable value : values) {
int i = value.get();
sum += i;
}
total.set(sum);
// 最后写出到文件
context.write(key, total);
}
}