Hadoop MapReduce Job 构建 Inverted index, Reducer 没有按照预期工作

用Google cloud跑一个Hadoop MapReduce 的 job, 想建一个 inverted index。
输入文件5个,第一个词是文件的ID,后面就是文件内容,ID 和内容之间用 tab 隔开,内容词与此之间用空格隔开,如下:

图片描述

我期望的结果是找出每个词出现在哪些文件中,以及出现的频率。map 结束后,输出应该是这样:

图片描述

Reduce 完成后应该得到这样的结果:

图片描述

我的代码如下:

import java.io.IOException;
import java.util.StringTokenizer;
import java.util.Map;
import java.util.HashMap;
import java.lang.StringBuilder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class BuildIndex {

  public static class IndexMapper extends Mapper<Object, Text, Text, Text>{

    private Text word = new Text();
    private Text docID = new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
      String[] pair = value.toString().split("\t");
      docID.set(pair[0]);
      StringTokenizer itr = new StringTokenizer(pair[1]);
      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, docID);
      }
    }
  }

  public static class IndexReducer extends Reducer<Text,Text,Text,Text> {
    private Text result = new Text();

    public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
      Map<String, Integer> map = new HashMap<>();
      for (Text val : values) {
        Integer counter = map.get(val.toString());
        if (counter == null) {
          counter = 1;
        } else {
          counter += 1;
        }
        map.put(val.toString(), counter);
      }
      StringBuilder sb = new StringBuilder();
      for (String s : map.keySet()) {
        sb.append(s + ":" + map.get(s) + " ");
      }
      result.set(sb.toString());
      context.write(key, result);
    }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "build inverted index");
    job.setJarByClass(BuildIndex.class);
    job.setMapperClass(IndexMapper.class);
    job.setReducerClass(IndexReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

但是我的输出结果却是这样:

图片描述

感觉reducer完全没有工作,请问这是为什么?

阅读 3.2k
1 个回答
  1. 一个map和reduce完成不了. 中间要再加一个combiner.
  2. 捋一下思路
    假设a.txt文件里数据是{tom jack tom jack rose tom}
    那么map的输出就是
    <tom,1> <jack,1> <tom,1> <jack,1> <rose,1> <tom,1>
    然后汇总工作到reduce去做.reduce接收的就是这样的数据
    <tom,[1,1,1]><jack,[1,1]><rose,[1]>
    reducer输出的是<tom,3><jack,2><rose,1>, 这样就统计出来了. 当然这是普通的做法

你现在想要实现的是同级单词在a文件和b文件...中各出现的次数.
假设有两个文件a.txtb.txt.
a文件里面的数据是{tom jack tom jack rose tom}
b文件里面的数据是{google apple tom google rose}
把编号换成了文件名.
map的输出就是这样<tom:a.txt,1> <jack:a.txt,1> <tom:a.txt,1><jack:a.txt,1> <rose:a.txt,1>

           <google:b.txt,1><apple:b.txt,1><tom:b.txt,1>....

这样的数据给到reducer,reducer统计不了.因为key不相同.key有的是<tom:a.txt,1><tom:b.txt,1>.
为了解决这个问题,map输入的内容不要直接到reducer中,中间加一层combiner来处理汇总数据
combiner接收<tom:a.txt,1><tom:b.txt,1>
combiner把key做一下切割 .切割成<tom, a.txt:1 ><tom , b.txt:2> ,这样key相同了.就可以统计了
下面把代码贴上 注[我用的是文件的名称,不是文件里的开头编号,要用的话还得把文件名换成编号,这样做有写问题,你可以下去试一试.我找到解决办法在补充.]
mapper类

public class WCMapper extends Mapper<LongWritable,Text,Text,Text>{

    Text text = new Text();
    Text val = new Text( "1" );

    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        String line = value.toString();
        String [] strings = line.split( " " ); //根据空格切割

        FileSplit fileSplit = (FileSplit) context.getInputSplit();// 得到这行数据所在的文件切片
        String fileName = fileSplit.getPath().getName();// 根据文件切片得到文件名

        for (String s : strings){
            text.set(s + ":" + fileName);
            context.write(text,val);
        }
    }
}

combiner类

public class WCCombiner extends Reducer<Text,Text,Text,Text> {
    Text text = new Text( );
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        //map传进来的是 <apple:2 , 1> <google:2 ,1>

        int sum = 0; //统计词频
        for (Text val : values){
            sum += Integer.parseInt(val.toString());
        }

        //切割key
        int index = key.toString().indexOf( ":" );
        text.set(key.toString().substring( index + 1 ) + ":" + sum); // value ---> 2:1
        key.set( key.toString().substring( 0,index )); // key --> apple
        context.write( key,text );
    }
}

reducer类:

public class WCReduce  extends Reducer<Text,Text,Text,Text>{


    Text result = new Text(  );
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

        String file = new String();

        for (Text t : values){
            file += t.toString();
        }

        result.set( file );

        context.write( key,result);
    }
}

注[本地链接的linux环境hadoop] ,要在本机的/etc/hosts文件中添加 export HADOOP_USER_NAME=hdfs
a.txt和b.txt都是单词,以空格分割,你可以做假数据测试一下.
这是测试结果.

clipboard.png

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
宣传栏