• MapReduce编程实例:单词计数

    本节介绍如何编写基本的 MapReduce 程序实现数据分析。本节代码是基于 Hadoop 2.7.3 开发的。

    任务准备

    单词计数(WordCount)的任务是对一组输入文档中的单词进行分别计数。假设文件的量比较大,每个文档又包含大量的单词,则无法使用传统的线性程序进行处理,而这类问题正是 MapReduce 可以发挥优势的地方。

    在前面《MapReduce实例分析:单词计数》教程中已经介绍了用 MapReduce 实现单词计数的基本思路和具体执行过程。下面将介绍如何编写具体实现代码及如何运行程序。

    首先,在本地创建 3 个文件:file00l、file002 和 file003,文件具体内容如表 1 所示。

    表 1 单词计数输入文件
    文件名 file001 file002 file003
    文件内容 Hello world
    Connected world
    One world 
    One dream
    Hello Hadoop 
    Hello Map 
    Hello Reduce

    再使用 HDFS 命令创建一个 input 文件目录。

    hadoop fs -mkdir input

    然后,把 file001、file002 和 file003 上传到 HDFS 中的 input 目录下。

    hadoop fs -put file001 input
    hadoop fs -put file002 input
    hadoop fs -put file003 input

    编写 MapReduce 程序的第一个任务就是编写 Map 程序。在单词计数任务中,Map 需要完成的任务就是把输入的文本数据按单词进行拆分,然后以特定的键值对的形式进行输出。

    编写 Map 程序

    Hadoop MapReduce 框架已经在类 Mapper 中实现了 Map 任务的基本功能。为了实现 Map 任务,开发者只需要继承类 Mapper,并实现该类的 Map 函数。

    为实现单词计数的 Map 任务,首先为类 Mapper 设定好输入类型和输出类型。这里,Map 函数的输入是 <key,value> 形式,其中,key 是输入文件中一行的行号,value 是该行号对应的一行内容。

    所以,Map 函数的输入类型为 <LongWritable,Text>。Map 函数的功能为完成文本分割工作,Map 函数的输出也是 <key,value> 形式,其中,key 是单词,value 为该单词出现的次数。所以,Map 函数的输出类型为 <Text,LongWritable>。

    以下是单词计数程序的 Map 任务的实现代码。

    public static class CoreMapper extends Mapper<Object,Text,Text,IntWritable> {
        private static final IntWritable one = new IntWritable(1);
        private static Text label = new Text();
        public void map(Object key,Text value,Mapper<Object,Text,Text,IntWritable> Context context)throws IOException,InterruptedException {
            StringTokenizer tokenizer = new StringTokenizer(value.toString());
            while(tokenizer.hasMoreTokens()) {
                label.set(tokenizer.nextToken());
                context.write(label,one);
            }
        }
    }

    在上述代码中,实现 Map 任务的类为 CoreMapper。该类首先将需要输出的两个变量 one 和 label 进行初始化。

    • 变量 one 的初始值直接设置为 1,表示某个单词在文本中出现过。
    • Map 函数的前两个参数是函数的输入参数,value 为 Text 类型,是指每次读入文本的一行,key 为 Object 类型,是指输入的行数据在文本中的行号。

    StringTokenizer 类机器方法将 value 变量中文本的一行文字进行拆分,拆分后的单词放在 tokenizer 列表中。然后程序通过循环对每一个单词进行处理,把单词放在 label 中,把 one 作为单词计数。

    在函数的整个执行过程中,one 的值一直是 1。在该实例中,key 没有被明显地使用到。context 是 Map 函数的一种输出方式,通过使用该变量,可以直接将中间结果存储在其中。

    根据上述代码,Map 任务结束后,3 个文件的输出结果如表 2 所示。

    表 2 单词计数 Map 任务输出结果
    文件名/Map file001/Map1 file002/Map2 file003/Map3
    Map 任务输出结果  <"Hello",1>
    <"world",1>
    <"Connected",1>
    <"world",1>
    <"One",1>
    <"world",1>
    <"One",1>
    <"dream",1> 
    <"Hello",1>
    <"Hadoop",1>
    <"Hello",1>
    <"Map",1>
    <"Hello", 1> 
    <"Reduce",1>

    编写 Reduce 程序

    编写 MapReduce 程序的第二个任务就是编写 Reduce 程序。在单词计数任务中,Reduce 需要完成的任务就是把输入结果中的数字序列进行求和,从而得到每个单词的出现次数。

    在执行完 Map 函数之后,会进入 Shuffle 阶段,在这个阶段中,MapReduce 框架会自动将 Map 阶段的输出结果进行排序和分区,然后再分发给相应的 Reduce 任务去处理。经过 Map 端 Shuffle 阶段后的结果如表 3 所示。

    表 3 单词计数 Map 端 Shuffle 阶段输出结果
    文件名/Map file001/Map1 file002/Map2 file003/Map3
    Map 端
    Shuffle 阶段输出结果
    <"Connected",1> 
    <"Hello", 1>
    <"world",<1,1>>
    <"dream",1>
    <"One", <1, 1>>
    <"world", 1>

    <"Map", 1>
    <"Hadoop",1>
    <"Hello",<1,1,1>>
    <"Reduce", 1>

    Reduce 端接收到各个 Map 端发来的数据后,会进行合并,即把同一个 key,也就是同一单词的键值对进行合并,形成<key, <V1, V2, .. Vn>> 形式的输出。经过 Map 端 Shuffle 阶段后的结果如表 4 所示。

    表 4 单词计数 Reduce端Shuffle阶段输出结果
    Reduce 端 <"Connected",1>

    Shuffle 阶段输出结果 
    < "dream",1> 
    <"Hadoop",1> 
    <"Hello",<1,1,1,1>> 
    <"Map",1>
    <"One",<1,1>>
    <"world", <1,1,1>>
    <"Reduce", 1>

    Reduce 阶段需要对上述数据进行处理从而得到每个单词的出现次数。从 Reduce 函数的输入已经可以理解 Reduce 函数需要完成的工作,就是首先对输入数据 value 中的数字序列进行求 和。以下是单词计数程序的 Reduce 任务的实现代码。

    public static class CoreReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
    
        private IntWritable count = new IntWritable ();
        public void reduce(Text key,Iterable<IntWritable> values,Reducer<Text,IntWritable, Text,IntWritable> Context context)throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable intWritable : values){
                sum += intWritable.get();
            }
            count.set(sum);
            context.write(key, count);
        }
    }

    与 Map 任务实现相似,Reduce 任务也是继承 Hadoop 提供的类 Reducer 并实现其接口。 Reduce 函数的输入、输出类型与 Map 函数的输出类型本质上是相同的。

    在 Reduce 函数的开始部分,首先设置 sum 参数用来记录每个单词的出现次数,然后遍历 value 列表,并对其中的数字进行累加,最终就可以得到每个单词总的出现次数。在输出的时候,仍然使用 context 类型的变量存储信息。当 Reduce 阶段结束时,就可以得到最终需要的结果,如表 5 所示。

    表 5 单词计数 Reduce 任务输出结果
    Reduce 任务输出结果 <"Connected", 1>
      <"dream", 1>
      <"Hadoop", 1>
      <"Hello", 4>
      <"Map", 1>
      <"One", 2>
      <"world", 3>
      <"Reduce", 1>


    编写 main 函数

    为了使用 CoreMapper 和 CoreReducer 类进行真正的数据处理,还需要在 main 函数中通过 Job 类设置 Hadoop MapReduce 程序运行时的环境变量,以下是具体代码。

    public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        String[] otherArgs = new GenericOptionsParser(conf,args).getRemainingArgs();
        if (otherArgs.length != 2) {
            System.err.printIn("Usage:wordcount <in> <out>");
            System.exit(2);
        }
        Job job = new Job (conf, "WordCount"); //设置环境参数
        job.setJarByClass (WordCount.class); //设置程序的类名
        job.setMapperClass(CoreMapper.class); //添加 Mapper 类
        job.setReducerClass(CoreReducer.class); //添加 Reducer类
        job.setOutputKeyClass (Text.class); //设置输出 key 的类型
        job.setOutputValueClass (IntWritable.class);  
        //设置输出 value 的类型
        FileInputFormat.addInputPath (job, new Path (otherArgs [0]));
        //设置输入文件路径
        FileOutputFormat.setOutputPath (job,new Path (otherArgs [1]));
        //设置输入文件路径
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    代码首先检查参数是不是正确,如果不正确就提醒用户。随后,通过 Job 类设置环境参数,并设置程序的类、Mapper 类和 Reducer 类。然后,设置了程序的输出类型,也就是 Reduce 函数的输出结果 <key,value> 中 key 和 value 各自的类型。最后,根据程序运行时的参数,设置输入、输出文件路径。

    核心代码包

    编写 MapReduce 程序需要引用 Hadoop 的以下几个核心组件包,它们实现了 Hadoop MapReduce 框架。

    import java.io.IOException;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper; 
    import org.apache.hadoop.mapreduce.Reducer;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;

    这些核心组件包的基本功能描述如表 6 所示。

    表 6 Hadoop MapReduce 核心组件包的基本功能
    功能
    org.apache.hadoop.conf 定义了系统参数的配置文件处理方法
    org.apache.hadoop.fs 定义了抽象的文件系统 API
    org.apache.hadoop.mapreduce Hadoop MapReduce 框架的实现,包括任务的分发调度等
    org.apache.hadoop.io 定义了通用的 I/O API,用于网络、数据库和文件数据对象 进行读写操作

    运行代码

    在运行代码前,需要先把当前工作目录设置为 /user/local/Hadoop。编译 WordCount 程序需要以下 3 个 Jar,为了简便起见,把这 3 个 Jar 添加到 CLASSPATH 中。

    $export
    CLASSPATH=/usr/local/hadoop/share/hadoop/common/hadoop-common-2.7.3.jar:$CLASSPATH
    $export
    CLASSPATH=/usr/local/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-2.7.3.jar:$CLAS SPATH
    $export
    CLASSPATH=/usr/local/hadoop/share/hadoop/common/lib/common-cli-1.2.jar:$CLASSPATH

    使用 JDK 包中的工具对代码进行编译。

    $ javac WordCount.java

    编译之后,在文件目录下可以发现有 3 个“.class”文件,这是 Java 的可执行文件,将它们打包并命名为 wordcount.jar。

    $ jar -cvf wordcount.jar *.class

    这样就得到了单词计数程序的 Jar 包。在运行程序之前,需要启动 Hadoop 系统,包括启动 HDFS 和 MapReduce。然后,就可以运行程序了。

    $ ./bin/Hadoop jar wordcount.jar WordCount input output

    最后,可以运行下面的命令查看结果。

    $ ./bin/Hadoop fs -cat output/*

更多...

加载中...