输入数据
group.txt
0000001 Pdt_01 222.8
0000002 Pdt_05 722.4
0000001 Pdt_02 33.8
0000003 Pdt_06 232.8
0000003 Pdt_02 33.8
0000002 Pdt_03 522.8
0000002 Pdt_04 122.4
期望结果
期望输出数据
1 222.8
2 722.4
3 232.8
需求分析
自定义OrderBean
package com.mr.groupcomparator;
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private int orderId;
private double money;
public OrderBean() {
}
public OrderBean(int orderId, double money) {
this.orderId = orderId;
this.money = money;
}
public int getOrderId() {
return orderId;
}
public void setOrderId(int orderId) {
this.orderId = orderId;
}
public double getMoney() {
return money;
}
public void setMoney(double money) {
this.money = money;
}
@Override
public String toString() {
return
" " + orderId +
" " +money
;
}
@Override
public int compareTo(OrderBean orderBean) {
int compare = Integer.compare(this.orderId, orderBean.orderId);
if (compare == 0) {
return Double.compare(orderBean.money, this.money);
} else {
return compare;
}
}
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(orderId);
out.writeDouble(money);
}
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readInt();
this.money = in.readDouble();
}
}
自定义OrderComparator
package com.mr.groupcomparator;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.io.WritableComparable;
public class OrderComparator extends WritableComparator {
protected OrderComparator() {
super(OrderBean.class, true);
}
// 分组的逻辑:针对同一个订单id的数据,我们认为是一个组里面的
@Override
//Object--转换为WritebaleCompareble 不然没得比较哎
public int compare(WritableComparable a, WritableComparable b) {
//qiang
OrderBean orderBean = (OrderBean) a;
OrderBean orderBean2 = (OrderBean) b;
return Integer.compare(orderBean.getOrderId(), orderBean2.getOrderId());
}
}
Mapper类
package com.mr.groupcomparator;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
OrderBean orderBean = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] split = value.toString().split("\t");
orderBean.setOrderId(Integer.parseInt(split[0]));
orderBean.setMoney(Double.parseDouble(split[2]));
context.write(orderBean, NullWritable.get());
}
}
Reducer类
在这里插入代码片
Driver类
package com.mr.groupcomparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
// 0 指定路径
args = new String[]{"E:/Hadoop/src/main/resources/input/groupcomparator", "E:/Hadoop/src/main/resources/ouput/groupcomparator"};
// 1 获取配置信息configuration以及封装任务job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
// 2 设置Driver加载路径 setJarByClass
job.setJarByClass(OrderDriver.class);
// 3 设置map和reduce类 setMaper setReducer
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
// 4 设置map输出 setmapoutputkey setmapoutputvalue
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
// 5 设置最终输出kv类型 (reducer的输出kv类型) setoutoutkey setoutputvalue
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
// 6 设置本地的输入和输出路径 fileinputformat.setinputpath
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
// 一定要手动设置一下分组的类
job.setGroupingComparatorClass(OrderComparator.class);
// 7 提交
boolean completion = job.waitForCompletion(true);
System.exit(completion ? 0 : 1);
}
}
执行结果
第二种方法(比较简单)
Mapper类
package com.mr.nogroupcomparator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<Text,Text,Text, DoubleWritable> {
Text k = new Text ();
DoubleWritable v = new DoubleWritable ();
@Override
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
String id = key.toString ();
// String number = id.substring (6, 7);
k.set (id);
String[] price_name = value.toString().split ("\t");
String price = price_name[1];
v.set (Double.parseDouble (price));
context.write (k,v);
}
}
Reducer类
package com.mr.nogroupcomparator;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<Text, DoubleWritable,Text, DoubleWritable> {
DoubleWritable doubleWritable = new DoubleWritable ();
@Override
protected void reduce(Text key, Iterable<DoubleWritable> values, Context context) throws IOException, InterruptedException {
double max=0;
for (DoubleWritable value : values) {
if (value.get ()>max){
max = value.get ();
}
doubleWritable.set (max);
}
context.write (key,doubleWritable);
}
}
Driver类
package com.mr.nogroupcomparator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader;
import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class GroupDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"E:/Hadoop/src/main/resources/input/twogroup","E:/Hadoop/src/main/resources/ouput/twogroup"};
Configuration conf = new Configuration ();
conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
//获取job对象
Job job = Job.getInstance (conf);
// 设置输入格式
job.setInputFormatClass(KeyValueTextInputFormat.class);
job.setJarByClass (GroupDriver.class);
job.setMapperClass (GroupMapper.class);
job.setReducerClass (GroupReducer.class);
job.setMapOutputKeyClass (Text.class);
job.setMapOutputValueClass (DoubleWritable.class);
job.setOutputKeyClass (Text.class);
job.setOutputValueClass (DoubleWritable.class);
FileInputFormat.setInputPaths (job,new Path (args[0]));
FileOutputFormat.setOutputPath (job,new Path (args[1]));
boolean wait = job.waitForCompletion (true);
System.exit (wait?0:1);
}
}