hadoop表连接
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
hadoop表连接
建立四个类
CustomerOrders
写入getset方法
重写这三个方法
@Override
public int compareTo(CustomerOrders o) {
return 0;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeInt(customerId);
dataOutput.writeUTF(customerName);
dataOutput.writeUTF(orderStatus);
dataOutput.writeInt(orderId);
dataOutput.writeUTF(flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
customerId = dataInput.readInt();
customerName = dataInput.readUTF();
orderStatus = dataInput.readUTF();
orderId = dataInput.readInt();
flag = dataInput.readUTF();
}
ReduceJoinMapper
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class ReduceJoinMapper extends Mapper<LongWritable, Text, Text, CustomerOrders> {
String name = "";
CustomerOrders customerOrders = new CustomerOrders();
@Override
protected void setup(Mapper<LongWritable, Text, Text, CustomerOrders>.Context context) throws IOException, InterruptedException {
FileSplit inputSplit = (FileSplit) context.getInputSplit();
System.out.println("setup method:" + inputSplit.getPath().toString());
this.name = inputSplit.getPath().getName();//orders.csv / customers.csv
}
@Override
protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, CustomerOrders>.Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split(",");
if (name.startsWith("order")) { //fields是订单表里的字段
customerOrders.setOrderId(Integer.parseInt(fields[0]));
customerOrders.setOrderStatus(fields[3]);
customerOrders.setCustomerId(Integer.parseInt(fields[2]));
customerOrders.setCustomerName("");
customerOrders.setFlag("1");
}else { //fields是customer表里的字段
customerOrders.setOrderId(0);
customerOrders.setOrderStatus("");
customerOrders.setCustomerId(Integer.parseInt(fields[0]));
customerOrders.setCustomerName(fields[1] + "·" + fields[2]);
customerOrders.setFlag("0");
}
Text text = new Text(customerOrders.getOrderId().toString());
context.write(text,customerOrders);
}
}
ReduceJoinReducer
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class ReduceJoinReducer extends Reducer<Text, CustomerOrders, CustomerOrders, NullWritable> {
@Override
protected void reduce(Text key, Iterable<CustomerOrders> values, Reducer<Text, CustomerOrders, CustomerOrders, NullWritable>.Context context) throws IOException, InterruptedException {
CustomerOrders bean = new CustomerOrders(); //用来存入Customer Info
ArrayList<CustomerOrders> orderList = new ArrayList<>(); //用来存入Order Info
for (CustomerOrders customerOrderBean:values) {
if (customerOrderBean.getFlag().equals("0")){ //是Customer Info
try {
BeanUtils.copyProperties(bean,customerOrderBean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}else { //是Order Info
CustomerOrders orderbean = new CustomerOrders();
try {
BeanUtils.copyProperties(orderbean,customerOrderBean);
orderList.add(orderbean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
// orderList : orders info
// bean : customer info
for (CustomerOrders orderBean : orderList) {
orderBean.setCustomerName(bean.getCustomerName());
context.write(orderBean,NullWritable.get());
}
// 完成reduce Task 阶段的join
}
}
ReduceJoinDriver
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class ReduceJoinDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration);
//指定任务的启动类
job.setJarByClass(ReduceJoinDriver.class);
//指定Map任务 Map阶段的输出 <Key类型, Value类型>
job.setMapperClass(ReduceJoinMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(CustomerOrders.class);
//指定Reduce任务 Reduce阶段的输出 <Key类型, Value类型>
job.setReducerClass(ReduceJoinReducer.class);
job.setOutputKeyClass(CustomerOrders.class);
job.setOutputValueClass(NullWritable.class);
//输入路径
Path inPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopstu\\in\\demo3");//指定目录不是文件
FileInputFormat.setInputPaths(job,inPath);
//输出路径
Path outPath = new Path("D:\\kb21\\myworkspace\\njzb\\hadoopstu\\out3");
FileSystem fs = FileSystem.get(outPath.toUri(), configuration);
if (fs.exists(outPath))
fs.delete(outPath,true);
//指定任务执行结果的输出位置
FileOutputFormat.setOutputPath(job,outPath);
boolean result = job.waitForCompletion(true);
System.out.println(result);
}
}