大数据之 Hadoop-6-MapReduce

一、MapReduce 简介

MapReduce 是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用” 的核心框架。

MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。

二、MapReduce优缺点

1、优点

  • 1.MapReduce 易于编程

它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

  • 2.良好的扩展性

当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

  • 3.高容错性

MapReduce设计的初衷就是使程序能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

  • 4.适合PB级以上海量数据的离线处理

这里加红字体离线处理,说明它适合离线处理而不适合在线处理。比如像毫秒级别的返回一个结果,MapReduce很难做到。

2、缺点

  • 不擅长实时计算
  • 不擅长流式计算
    流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。
  • 不擅长DAG(有向图)计算

MapReduce不擅长做实时计算、流式计算、DAG(有向图)计算

1.实时计算(Spark擅长)

MapReduce无法像Mysql一样,在毫秒或者秒级内返回结果。

2.流式计算(Spark Stream擅长)

流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

3.DAG(有向图)计算

多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

三、MapReduce核心思想

1、MapReduce核心编程思想

file

1)分布式的运算程序往往需要分成至少2个阶段。
2)第一个阶段的MapTask并发实例,完全并行运行,互不相干。
3)第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出。
4)MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。
总结:分析WordCount数据流走向深入理解MapReduce核心思想。

2、WordCount案例实操

file

file

代码实战:

package com.homay.hadoopstudy;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.StringTokenizer;

/**
 * 单词计数类
 *
 * @author: kaiyi
 * @Date 2021/7/16 0:30
 */
public class WordCount {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        // 初始化conf类
        Configuration conf = new Configuration();

        // 通过实例化对象GenericOptionsParser可以获得程序执行所传入的参数
        String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();

        if(otherArgs.length < 2){
            System.err.println("Usage: wordcount <in> [<in> ...] <out>");
            System.exit(2);
        }

        // 构建任务对象
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);

        // 设置输出结果的数据类型
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        for(int i = 0; i < otherArgs.length - 1; i++){
            // 设置需要统计的文件的输入路径
            FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
        }

        // 设置统计结果的输出路径
        FileOutputFormat.setOutputPath(job, new Path(
                otherArgs[(otherArgs.length - 1)]));

        // 提交任务给Hadoop集群
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

    public static class IntSumReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

        private IntWritable result = new IntWritable();   // IntWritable -> Int(java)

        @Override
        public void reduce(Text key, Iterable<IntWritable> values,
                           Reducer<Text, IntWritable, Text, IntWritable>.Context context)
            throws IOException, InterruptedException{
            // 统计单词总数
            int sum = 0;
            for(IntWritable val : values){
                sum += val.get();
            }
            this.result.set(sum);

            // 输出统计结果
            context.write(key, this.result);
        }
    }

    /**
     * 自定义Mapper 内部类
     */
    public static class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable>{
        private static final IntWritable one = new IntWritable(1);
        private Text word = new Text();

        @Override
        public void map(Object key, Text value, Mapper<Object, Text, Text, IntWritable>.Context context) throws IOException, InterruptedException {
            // 默认根据空格、制表符\t、换行符\n、回车符\r 分割字符串
            StringTokenizer itr = new StringTokenizer(value.toString());
            // 循环输出每个单词与数量
            while (itr.hasMoreTokens()){
                this.word.set(itr.nextToken());

                // 输出单词与数量
                context.write(this.word, one);
            }
        }

    }

}

为者常成,行者常至