用Google cloud跑一个Hadoop MapReduce 的 job, 想建一个 inverted index。
输入文件5个,第一个词是文件的ID,后面就是文件内容,ID 和内容之间用 tab 隔开,内容词与此之间用空格隔开,如下:
我期望的结果是找出每个词出现在哪些文件中,以及出现的频率。map 结束后,输出应该是这样:
Reduce 完成后应该得到这样的结果:
我的代码如下:
import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import java.lang.StringBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class BuildIndex {
public static class IndexMapper extends Mapper<Object, Text, Text, Text>{
private Text word = new Text();
private Text docID = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] pair = value.toString().split("\t");
docID.set(pair[0]);
StringTokenizer itr = new StringTokenizer(pair[1]);
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, docID);
}
}
}
public static class IndexReducer extends Reducer<Text,Text,Text,Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
Map<String, Integer> map = new HashMap<>();
for (Text val : values) {
Integer counter = map.get(val.toString());
if (counter == null) {
counter = 1;
} else {
counter += 1;
}
map.put(val.toString(), counter);
}
StringBuilder sb = new StringBuilder();
for (String s : map.keySet()) {
sb.append(s + ":" + map.get(s) + " ");
}
result.set(sb.toString());
context.write(key, result);
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "build inverted index");
job.setJarByClass(BuildIndex.class);
job.setMapperClass(IndexMapper.class);
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
但是我的输出结果却是这样:
感觉reducer完全没有工作,请问这是为什么?
假设a.txt文件里数据是{tom jack tom jack rose tom}
那么map的输出就是
<tom,1> <jack,1> <tom,1> <jack,1> <rose,1> <tom,1>
然后汇总工作到reduce去做.reduce接收的就是这样的数据
<tom,[1,1,1]><jack,[1,1]><rose,[1]>
reducer输出的是<tom,3><jack,2><rose,1>, 这样就统计出来了. 当然这是普通的做法
你现在想要实现的是同级单词在a文件和b文件...中各出现的次数.
假设有两个文件a.txt和b.txt.
a文件里面的数据是{tom jack tom jack rose tom}
b文件里面的数据是{google apple tom google rose}
把编号换成了文件名.
map的输出就是这样<tom:a.txt,1> <jack:a.txt,1> <tom:a.txt,1><jack:a.txt,1> <rose:a.txt,1>
这样的数据给到reducer,reducer统计不了.因为key不相同.key有的是<tom:a.txt,1><tom:b.txt,1>.
为了解决这个问题,map输入的内容不要直接到reducer中,中间加一层combiner来处理汇总数据
combiner接收<tom:a.txt,1><tom:b.txt,1>
combiner把key做一下切割 .切割成<tom, a.txt:1 ><tom , b.txt:2> ,这样key相同了.就可以统计了
下面把代码贴上 注[我用的是文件的名称,不是文件里的开头编号,要用的话还得把文件名换成编号,这样做有写问题,你可以下去试一试.我找到解决办法在补充.]
mapper类
combiner类
reducer类:
注[本地链接的linux环境hadoop] ,要在本机的/etc/hosts文件中添加 export HADOOP_USER_NAME=hdfs
a.txt和b.txt都是单词,以空格分割,你可以做假数据测试一下.
这是测试结果.