本文介绍MapReduce 工作流。
本文前提:hadoop环境可用。
一、MapReduce 工作流介绍
多个MR作业,先后依次执行来计算得出最终结果。这类作业类似于DAG的任务,各个作业之间有依赖关系,比如说,这一个作业的输入,依赖上一个作业的输出等等。
一般实际的业务场景中,可能使用定时调度工具进行调度,但本示例仅仅说明mapreduce自身也可以做到。
- JobControl类:工作流job控制器,一次可以提交、管理多个job。JobControl类实现了线程Runnable接口。需要实例化一个线程来让它启动。
- ControlledJob类:可以将普通作业包装成受控作业。并且支持设置依赖关系。Hadoop会根据依赖的关系,先后执行job任务,每个任务的运行都是独立的。
二、使用示例
MapReduce的join操作 将上述的Reduce side join 的例子连续起来运行,即第一步未排序输出,第二步针对上一步的输出进行排序。
1、实现
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.jobcontrol.ControlledJob;
import org.apache.hadoop.mapreduce.lib.jobcontrol.JobControl;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.hadoop.mr.join.reducerside.ReduceSideSortDriver;
import org.hadoop.mr.join.reducerside.ReduceSideSortMapper;
import org.hadoop.mr.join.reducerside.ReduceSideSortReducer;
import org.hadoop.mr.join.reducerside.ReducerSideJoinDriver;
import org.hadoop.mr.join.reducerside.ReducerSideJoinMapper;
import org.hadoop.mr.join.reducerside.ReducerSideJoinReducer;
public class MRFlowDriver {
static String in = "D:/workspace/bigdata-component/hadoop/test/in/join";
static String tempOut = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/unsortjoin";
static String out = "D:/workspace/bigdata-component/hadoop/test/out/reduceside/joinsort";
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(out))) {
fs.delete(new Path(out), true);
}
// 第一个作业的配置
Job unSortjob = getJob(conf, "Reduce Side Join DependingJob Testing ------ unSortjob", ReducerSideJoinDriver.class,
ReducerSideJoinMapper.class, Text.class, Text.class, ReducerSideJoinReducer.class, Text.class,
NullWritable.class, 1, in, tempOut);
// 将普通作业包装成受控作业
ControlledJob unSortControlledJob = new ControlledJob(conf);
unSortControlledJob.setJob(unSortjob);
// 第二个作业的配置
Job sortedjob = getJob(conf, "Reduce Side Join DependingJob Testing ------ sortedjob", ReduceSideSortDriver.class,
ReduceSideSortMapper.class, Text.class, Text.class, ReduceSideSortReducer.class, Text.class,
NullWritable.class, 1, tempOut, out);
ControlledJob sortedControlledJob = new ControlledJob(conf);
sortedControlledJob.setJob(sortedjob);
// 设置job的依赖关系
sortedControlledJob.addDependingJob(unSortControlledJob);
// 主控制容器
JobControl jobControl = new JobControl("jobControl");
// 添加到总的JobControl里,进行控制
jobControl.addJob(unSortControlledJob);
jobControl.addJob(sortedControlledJob);
// 在线程启动
Thread t = new Thread(jobControl);
t.start();
while (true) {
if (jobControl.allFinished()) {
System.out.println("jobControl" + jobControl.getSuccessfulJobList());
jobControl.stop();
break;
}
}
}
/**
*
* @param conf
* @param jobName
* @param cls
* @param clsMapper
* @param clsMapOutKey
* @param clsMapOutValue
* @param clsReducer
* @param clsReducerOutKey
* @param clsReducerOutValue
* @param tasks
* @return
* @throws Exception
*/
static Job getJob(Configuration conf, String jobName, Class<?> cls, Class<? extends Mapper> clsMapper,
Class<?> clsMapOutKey, Class<?> clsMapOutValue, Class<? extends Reducer> clsReducer,
Class<?> clsReducerOutKey, Class<?> clsReducerOutValue, int tasks, String in, String out) throws Exception {
Job job = Job.getInstance(conf, jobName);
// 设置作业驱动类
job.setJarByClass(cls);
// 设置mapper相关信息
job.setMapperClass(clsMapper);
job.setMapOutputKeyClass(clsMapOutKey);
job.setMapOutputValueClass(clsMapOutValue);
// 设置reducer相关信息
job.setReducerClass(clsReducer);
job.setOutputKeyClass(clsReducerOutKey);
job.setOutputValueClass(clsReducerOutValue);
job.setNumReduceTasks(tasks);
// 设置输入的文件的路径
FileInputFormat.setInputPaths(job, new Path(in));
FileSystem fs = FileSystem.get(conf);
if (fs.exists(new Path(out))) {
fs.delete(new Path(out), true);
}
FileOutputFormat.setOutputPath(job, new Path(out));
return job;
}
}
2、验证
运行日志
jobControl[job name: Reduce Side Join DependingJob Testing ------ unSortjob
job id: jobControl0
job state: SUCCESS
job mapred id: job_local1023947416_0001
job message: just initialized
job has no depending job:
, job name: Reduce Side Join DependingJob Testing ------ sortedjob
job id: jobControl1
job state: SUCCESS
job mapred id: job_local1967863010_0002
job message: just initialized
job has 1 dependeng jobs:
depending job 0: Reduce Side Join DependingJob Testing ------ unSortjob
]
实际的功能与本示例中对应的链接示例结果一致,不再赘述。 至此,MapReduce的工作流示例介绍结束。