pom.xml
<groupId>com.huang</groupId>
<artifactId>Hadoop</artifactId>
<version>1.0-SNAPSHOT</version>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>7</source>
<target>7</target>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.2</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>RELEASE</version>
<scope>compile</scope>
</dependency>
</dependencies>
</project>
输入数据
order.txt
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
pd.txt
01 小米
02 华为
03 格力
期望结果
需求分析
Mapper类
package com.mr.mapjoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
HashMap<String, String> pdMap = new HashMap<>();
Text text = new Text();
@Override
protected void setup(Context context) throws IOException, InterruptedException {
// 获取到缓存文件,是一个 URI 的数组
URI[] cacheFiles = context.getCacheFiles();
// 由于只有一个缓存文件 pd.txt,我们这里只需要拿到第一个元素即可
URI pdUri = cacheFiles[0];
// 获取到缓存文件的路径
String path = pdUri.getPath();
// 获取到bufferedReader对象(缓冲字符流)
// FileInputStream fileInputStream = new FileInputStream(path);
// InputStreamReader inputStreamReader = new InputStreamReader(fileInputStream);
BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(new FileInputStream(path)));
// 对每行数据做迭代,进行切割,切割后的数据放入到map中
String line;
// String line = bufferedReader.readLine();
// while (!(line = bufferedReader.readLine()).isEmpty()) {
while (StringUtils.isNotEmpty(line = bufferedReader.readLine())) {
String[] split = line.split("\t");
pdMap.put(split[0], split[1]);
}
// 关闭资源
IOUtils.closeStream(bufferedReader);
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 读取order.txt 的每行数据,并进行切割
String[] split = value.toString().split("\t");
// 获取 pid 公共字段
String pid = split[1];
// 根据pid从map中获取到pname
String pname = pdMap.get(pid);
// 拼接最后的结果
text.set(split[0] + "\t" + pname + "\t" + split[2]);
context.write(text, NullWritable.get());
}
}
Reducer类
无,这里需要,在driver里面设就行
Driver类
package com.mr.mapjoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
args = new String[]{"E:/Hadoop/src/main/resources/input/jointest", "E:/Hadoop/src/main/resources/ouput/join"};
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
job.setJarByClass(MapJoinDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setMapperClass(MapJoinMapper.class);
job.addCacheFile(new URI("file:///E:/Hadoop/src/main/resources/input/tablenum/pd.txt"));
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
boolean b = job.waitForCompletion(true);
System.exit(b ? 0 : 1);
}
}