Hadoop3教程(十六):MapReduce中的OutputFormat

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

文章目录

105OutputFormat概述

我们之前讲过了Map阶段的InputFormat对应的Reduce阶段也有自己的OutputFormat。

Reducer在执行完reduce()之后接下来就会通过OutputFormat来将处理结果输出至外界环境。

Hadoop里默认使用的是TextOutputFormat即将reduce()的处理结果按行输出到文件。

而OutputFormat是MapReduce输出的基类所有实现了MR输出的程序都必须实现OutputFormat接口。

OutputFormat有几种官方自带的实现类具体功能就不展开了

  • NullOutputFormat
  • FileOutputFormat
    • MapFileOutputFormat
    • SequenceFileOutputFormat
    • TextOutputFormat默认
  • FilterOutputFormat
    • LazyOutputFormat
  • DBOutputFormat

OutputFormat类的核心方法public abstract RecordWriter<K,V> getRecordWriter(...)

最终结果怎么写以什么形式写写到哪儿等等这些都是在getRecordWriter()里控制的。

当然这些自带的实现类在日常的生产中肯定是不足以满足各种情况的所以多数情况下我们会实现自定义的OutputFormat类

自定义OutputFormat实现类需要

  • 继承FileOutputFormat
  • 改写RecordWriter具体改写输出数据的方法write()

106自定义OutputFormat案例需求分析

需求输入是一个日志文件即log.txt里面是罗列了一些访问过的网站现在需要把其中包含atguigu的网站输出到a.log不包含atguigu的网站输出到b.log。

输入数据形如

http://www.baidu.com
http://www.atguibu.com
...

我们需要自定义一个OutputFormat类即创建一个类LogRecordWriter继承RecordWriter然后创建两个文件输出流一个是atguiguOut一个是otherOut。如果输入数据包含atguigu就输出到atguiguOut反之则输出到otherOut流。

最后还需要在驱动类里注册一下

job.setOutputFormatClass(LogOutputFormat.class);

附注

其实这个需求从直观上来讲是可以通过分区来实现类似功能的但是很遗憾分区的话无法控制输出文件的名字所以没法严格符合需求。

107/108自定义OutputFormat案例实现

这里直接复制了教程里的代码来介绍一下如何针对上一小节提出的需求自定义OutputFormat。

自定义Mapper

首先需要创建一个自定义的Mapper类如class LogMapper extends Mapper<LongWritable, Text, Text, NullWritable>

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LogMapper extends Mapper<LongWritable, Text,Text, NullWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        //不做任何处理,直接写出一行log数据
        context.write(value,NullWritable.get());
    }
}

自定义Reducer

然后新建一个自定义Reducer类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LogReducer extends Reducer<Text, NullWritable,Text, NullWritable> {
    @Override
    protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        // 防止有相同的数据,迭代写出
        for (NullWritable value : values) {
            context.write(key,NullWritable.get());
        }
    }
}

自定义OutputFormat

这里是最重要的一步就是自定义一个OutputFormat类继承RecordWriter

  • 创建两个文件的输出流atguiguOut、otherOut
  • 如果输入数据中含有atguigu则输出至atguiguOut反之则输出到otherOut;

首先自定义OutputFormat类重写RecordWriter方法将我们自定义的LogRecordWriter放进去。

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogOutputFormat extends FileOutputFormat<Text, NullWritable> {
    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
        //创建一个自定义的RecordWriter返回
        LogRecordWriter logRecordWriter = new LogRecordWriter(job);
        return logRecordWriter;
    }
}

然后编写LogRecordWriter类

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;

public class LogRecordWriter extends RecordWriter<Text, NullWritable> {

    private FSDataOutputStream atguiguOut;
    private FSDataOutputStream otherOut;

    public LogRecordWriter(TaskAttemptContext job) {
        try {
            //获取文件系统对象
            FileSystem fs = FileSystem.get(job.getConfiguration());
            //用文件系统对象创建两个输出流对应不同的目录
            atguiguOut = fs.create(new Path("d:/hadoop/atguigu.log"));
            otherOut = fs.create(new Path("d:/hadoop/other.log"));
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void write(Text key, NullWritable value) throws IOException, InterruptedException {
        String log = key.toString();
        //根据一行的log数据是否包含atguigu,判断两条输出流输出的内容
        if (log.contains("atguigu")) {
            atguiguOut.writeBytes(log + "\n");
        } else {
            otherOut.writeBytes(log + "\n");
        }
    }

    @Override
    public void close(TaskAttemptContext context) throws IOException, InterruptedException {
        //关流
        IOUtils.closeStream(atguiguOut);
        IOUtils.closeStream(otherOut);
    }
}

Driver

最后编写LogDriver驱动类把我们前面自定义的的类统统在驱动类里注册上

package com.atguigu.mapreduce.outputformat;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class LogDriver {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf);

        job.setJarByClass(LogDriver.class);
        job.setMapperClass(LogMapper.class);
        job.setReducerClass(LogReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);

        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);

        //设置自定义的outputformat
        job.setOutputFormatClass(LogOutputFormat.class);

        FileInputFormat.setInputPaths(job, new Path("D:\\input"));
        //虽然我们自定义了outputformat但是因为我们的outputformat继承自fileoutputformat
        //而fileoutputformat要输出一个_SUCCESS文件所以在这还得指定一个输出目录
        FileOutputFormat.setOutputPath(job, new Path("D:\\logoutput"));

        boolean b = job.waitForCompletion(true);
        System.exit(b ? 0 : 1);
    }
}

至此需求完成。

参考文献

  1. 【尚硅谷大数据Hadoop教程hadoop3.x搭建到集群调优百万播放】
阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Hadoop