hadoop表连接

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

hadoop表连接

建立四个类

在这里插入图片描述

CustomerOrders

写入getset方法

重写这三个方法

@Override
    public int compareTo(CustomerOrders o) {
        return 0;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeInt(customerId);
        dataOutput.writeUTF(customerName);
        dataOutput.writeUTF(orderStatus);
        dataOutput.writeInt(orderId);
        dataOutput.writeUTF(flag);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        customerId = dataInput.readInt();
        customerName = dataInput.readUTF();
        orderStatus = dataInput.readUTF();
        orderId = dataInput.readInt();
        flag = dataInput.readUTF();
    }

ReduceJoinMapper

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, CustomerOrders> {
    String name = "";
    CustomerOrders customerOrders = new CustomerOrders();

    @Override
    protected void setup(Mapper<LongWritable, Text, Text, CustomerOrders>.Context context) throws IOException, InterruptedException {
        FileSplit inputSplit = (FileSplit) context.getInputSplit();
        System.out.println("setup method:" + inputSplit.getPath().toString());
        this.name = inputSplit.getPath().getName();//orders.csv / customers.csv
    }

    @Override
    protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CustomerOrders>.Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split(",");
        if (name.startsWith("order")) {  //fields是订单表里的字段
            customerOrders.setOrderId(Integer.parseInt(fields[0]));
            customerOrders.setOrderStatus(fields[3]);
            customerOrders.setCustomerId(Integer.parseInt(fields[2]));
            customerOrders.setCustomerName("");
            customerOrders.setFlag("1");
        }else {     //fields是customer表里的字段
            customerOrders.setOrderId(0);
            customerOrders.setOrderStatus("");
            customerOrders.setCustomerId(Integer.parseInt(fields[0]));
            customerOrders.setCustomerName(fields[1] + "·" + fields[2]);
            customerOrders.setFlag("0");
        }
        Text text = new Text(customerOrders.getOrderId().toString());
        context.write(text,customerOrders);
    }
}

ReduceJoinReducer

import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;

public class ReduceJoinReducer extends Reducer<Text, CustomerOrders, CustomerOrders, NullWritable> {

    @Override
    protected void reduce(Text key, Iterable<CustomerOrders> values, Reducer<Text, CustomerOrders, CustomerOrders, NullWritable>.Context context) throws IOException, InterruptedException {
        CustomerOrders bean = new CustomerOrders(); //用来存入Customer Info
        ArrayList<CustomerOrders> orderList = new ArrayList<>(); //用来存入Order Info

        for (CustomerOrders customerOrderBean:values) {
            if (customerOrderBean.getFlag().equals("0")){ //是Customer Info
                try {
                    BeanUtils.copyProperties(bean,customerOrderBean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }else {                                       //是Order Info
                CustomerOrders orderbean = new CustomerOrders();
                try {
                    BeanUtils.copyProperties(orderbean,customerOrderBean);
                    orderList.add(orderbean);
                } catch (IllegalAccessException e) {
                    e.printStackTrace();
                } catch (InvocationTargetException e) {
                    e.printStackTrace();
                }
            }
        }
        //  orderList : orders info
        //  bean : customer info
        for (CustomerOrders orderBean : orderList) {
            orderBean.setCustomerName(bean.getCustomerName());
            context.write(orderBean,NullWritable.get());
        }
        // 完成reduce Task 阶段的join
    }
}

ReduceJoinDriver

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class ReduceJoinDriver {
    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
        Configuration configuration = new Configuration();
        Job job = Job.getInstance(configuration);

        //指定任务的启动类
        job.setJarByClass(ReduceJoinDriver.class);

        //指定Map任务 Map阶段的输出 <Key类型, Value类型>
        job.setMapperClass(ReduceJoinMapper.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(CustomerOrders.class);

        //指定Reduce任务 Reduce阶段的输出 <Key类型, Value类型>
        job.setReducerClass(ReduceJoinReducer.class);
        job.setOutputKeyClass(CustomerOrders.class);
        job.setOutputValueClass(NullWritable.class);

        //输入路径
        Path inPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopstu\\in\\demo3");//指定目录不是文件
        FileInputFormat.setInputPaths(job,inPath);

        //输出路径
        Path outPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopstu\\out3");

        FileSystem fs = FileSystem.get(outPath.toUri(), configuration);
        if (fs.exists(outPath))
            fs.delete(outPath,true);

        //指定任务执行结果的输出位置
        FileOutputFormat.setOutputPath(job,outPath);

        boolean result = job.waitForCompletion(true);
        System.out.println(result);

    }
}

Tips

要注意包不能引错包

要注意包不能引错包

要注意包不能引错包

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6
标签: Hadoop

“hadoop表连接” 的相关文章