Hadoop三大框架之MapReduce工作流程

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一、MapReduce基础

MapReduce的思想核心是“分而治之”适用于大量复杂的任务处理场景(大规模数据处理场景)。

  • Map负责“分”把复杂的任务分解为若干个“简单的任务”来并行处理。可以进行拆分的前提是这些小任务可以并行计算彼此间几乎没有依赖关系。

  • Reduce负责“合”即对map阶段的结果进行全局汇总。

  • MapReduce运行在yarn集群。ResourceManager+NodeManager这两个阶段合起来就是MapReduce思想的体现。

1.1 MapReduce设计构思

MapReduce是一个分布式运算程序的编程框架核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序并发运行在Hadoop集群上。

MapReduce设计并提供了一个同意的计算框架为程序员隐藏了绝大多数系统层面的处理细节为程序员提供了一个抽象和高层的编程接口和框架。程序员仅需要关心应用层的具体计算问题仅需要编写少量的处理应用本身计算问题的程序代码。

Map和Reduce为程序员提供了一个清晰的操作接口抽象描述。MapReduce中定义了如下的Map和Reduce两个抽象的编程接口由用户去编程实现Map和ReduceMapReduce处理的数据类型是<key,value>键值对。

一个完整的MapReduce程序在分布式运行时有三类实例进程:

  1. MRAppMaster 负责整个程序的过程调度及状态协调

  1. MapTask 负责map阶段的整个数据处理流程

  1. ReduceTask 负责reduce阶段的整个数据处理流程

1.2 MapReduce工作原理

1、分片操作:

FileInputstream首先要计算切片大小FileInputstream是一个抽象类继承InputFormat接口真正完成工作的是它的实现类默认为是TextInputFormatTextInputFormat是读取文件的,默认为一行一行读取将输入文件切分为逻辑上的多个input splitinput split是MapReduce对文件进行处理和运算的输入单位只是一个逻辑概念。

在进行Map计算之前MapReduce会根据输入文件计算的切片数开启map任务一个输入切片对应一个maptask输入分片存储的并非数据本身而是一个分片长度和一个记录数据位置的集合每个input spilt中存储着该分片的数据信息如:文件块信息、起始位置、数据长度、所在节点列表等并不是对文件实际分割成多个小文件输入切片大小往往与hdfs的block关系密切默认一个切片对应一个block大小为128M;注意:尽管我们可以使用默认块大小或自定义的方式来定义分片的大小但一个文件的大小如果是在切片大小的1.1倍以内仍作为一个片存储而不会将那多出来的0.1单独分片。

2、数据格式化操作:

TextInputFormat 会创建RecordReader去读取数据通过getCurrentkeygetCurrentvaluenextkeyvalue等方法来读取读取结果会形成keyvalue形式返回给maptaskkey为偏移量value为每一行的内容此操作的作用为在分片中每读取一条记录就调用一次map方法反复这一过程直到将整个分片读取完毕。

3、map阶段操作:

此阶段就是程序员通过需求偏写了map函数将数据格式化的<KV>键值对通过Mapper的map()方法逻辑处理形成新的<kv>键值对通过Context.write输出到OutPutCollector收集器

map端的shuffle(数据混洗)过程:溢写(分区排序合并归并)

溢写:

由map处理的结果并不会直接写入磁盘而是会在内存中开启一个环形内存缓冲区先将map结果写入缓冲区这个缓冲区默认大小为100M并且在配置文件里为这个缓冲区设了一个阀值默认为0.8同时map还会为输出操作启动一个守护线程如果缓冲区内存达到了阀值0.8这个线程会将内容写入到磁盘上这个过程叫作spill(溢写)。

分区Partition

当数据写入内存时决定数据由哪个Reduce处理从而需要分区默认分区方式采用hash函数对key进行哈布后再用Reduce任务数量进行取模表示为hash(key)modR这样就可以把map输出结果均匀分配给Reduce任务处理Partition与Reduce是一一对应关系类似于一个分片对应一个map task最终形成的形式为(分区号keyvalue)

排序Sort:

在溢出的数据写入磁盘前会对数据按照key进行排序默认采用快速排序第一关键字为分区号第二关键字为key。

合并combiner:

程序员可选是否合并数据合并在Reduce计算前对相同的key数据、value值合并减少输出量如(“a”1)(“a”1)合并之后(“a”2)

归并menge

每块溢写会成一个溢写文件这些溢写文件最终需要被归并为一个大文件生成key对应的value-list会进行归并排序<"a",1><"a"1>归并后<"a",<1,1>>。

Reduce 端的shffle

数据copy:map端的shffle结束后所有map的输出结果都会保存在map节点的本地磁盘上文件都经过分区不同的分区会被copy到不同的Recuce任务并进行并行处理每个Reduce任务会不断通过RPC向JobTracker询问map任务是否完成JobTracker检测到map位务完成后就会通过相关Reduce任务去aopy拉取数据Recluce收到通知就会从Map任务节点Copy自己分区的数据此过程一般是Reduce任务采用写个线程从不同map节点拉取

归并数据

Map端接取的数据会被存放到 Reduce端的缓存中如果缓存被占满就会溢写到磁盘上缓存数据来自不同的Map节点会存在很多合并的键值对当溢写启动时相同的keg会被归并最终各个溢写文件会被归并为一个大类件归并时会进行排序磁盘中多个溢写文许归并为一个大文许可能需要多次归并一次归并溢写文件默认为10个

4、Reduce阶段:

Reduce任务会执行Reduce函数中定义的各种映射输出结果存在分布式文件系统中。

二、 MapReduce编程模型

2.1 编程模型概述

2.1.1 Map阶段2个步骤

  1. 设置InputFormat类将数据切分为Key-Value(K1和V1)对输入到第二步

  1. 自定义Map逻辑将第一步的结果转换为另外的Key-Value(K2和V2)对输出结果

2.1.2 Shuffle阶段4个步骤

  1. 对输出的Key-Value进行分区

  1. 对不同分区的数据按照相同的key排序

  1. (可选)对分组过的数据初步规约降低数据的网络拷贝

  1. 对数据进行分组相同的Key和Value放入一个集合中

2.1.3 Reduce阶段2个步骤

  1. 对多个Map任务的结果进行排序以及合并编写Reduce函数实现自己的逻辑对输入的Key-Value进行处理转为新的Key-Value(K3和V3)输出

  1. 设置OutputFromat处理并保存Reduce输出的Key-Value数据

2.2 编程模型三部曲

(1)Input:一系列(K1,V1)。

(2)Map和Reduce:

Map:(K1,V1) -> list(K2,V2) (其中K2/V2是中间结果对)

Reduce:(K2,list(v2)) -> list(K3,V3)

(3)Output:一系列(K3,V3)。

三、词频统计WordCount

3.1 添加Maven依赖

# 将maven版本改为1.8
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>

# 添加如下依赖
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-common</artifactId>
    <version>${hadoop-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-hdfs</artifactId>
    <version>${hadoop-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-core</artifactId>
    <version>${hadoop-version}</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-mapreduce-client-common</artifactId>
    <version>${hadoop-version}</version>
</dependency>

3.2 代码实现

《wordcount.txt》

hello java
hello hadoop
hello java
hello java hadoop
java hadoop
hadoop java

Mapper类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        Text text = new Text();
        IntWritable intWritable = new IntWritable();

        System.out.println("WordCountMap stage Key:"+key+" Value:"+value);
        String[] words = value.toString().split(" "); // "hello world"->[hello,world]
        for (String word : words) {
            text.set(word);
            intWritable.set(1);
            context.write(text,intWritable); // <hello,1> <word,1>
        }
    }
}

Reduce类

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReduce extends Reducer<Text, IntWritable,Text, LongWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        System.out.println("Reduce stage Key:"+key+" Values"+values.toString());
        int count=0;
        for (IntWritable intWritable : values) {
            count += intWritable.get();
        }

        LongWritable longWritable = new LongWritable(count);
        System.out.println("Key:"+key+" ResultValue:"+longWritable.get());

        context.write(key,longWritable);
    }
}

Driver类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;


import java.io.IOException;

public class WordCountDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        job.setJarByClass(WordCountDriver.class);

        // 设置mapper类
        job.setMapperClass(WordCountMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);

        // 设置Reduce类
        job.setReducerClass(WordCountReduce.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 指定map输入的文件路径
        FileInputFormat.
                setInputPaths(job,new Path("D:\\Servers\\hadoopstu\\in\\wordcount.txt"));
        // 指定reduce结果的输出路径
        Path path = new Path("D:\\Servers\\hadoopstu\\out1");

        FileSystem fileSystem = FileSystem.get(path.toUri(), configuration);
        if(fileSystem.exists(path)){
            fileSystem.delete(path,true);
        }

        FileOutputFormat.setOutputPath(job,path);

        job.waitForCompletion(true);

    }
}

3.3 代码说明

3.3.1 对于map函数的方法

protected void map(LongWritable key, Text value, Context context)

继承Mapper类实现map方法重写的map方法中包含三个参数key、value就是输入的key、value键值对(<K1,V1>)context记录的是整个上下文可以通过context将数据写出去。

3.3.2 对于reduce函数的方法

protected void reduce(Text key, Iterable<IntWritable> values, Context context)

继承Reduce类实现reduce方法reduce函数输入的是一个<K,V>形式但是这里的value是以迭代器的形式Iterable<IntWritable> value。即reduce的输入是一个key对应一组value。

reduce中context参数与map中的reduce参数作用一致。

3.3.3 对于main函数的调用

  1. 创建configuration()类作用是读取MapReduce系统配置信息

  1. 创建job类

  1. 设置map函数、map函数输出的key、value类型

  1. 设置reduce函数、reduce函数输出的key、value类型即最终存储再HDFS结果文件中的key、value类型

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Hadoop