您好,登錄后才能下訂單哦!
1、適用場景:
一張表很大,一張表很小
2、解決方案:
在map端緩存多張表,提前處理業(yè)務(wù)邏輯,這樣增加map端業(yè)務(wù),減少reduce端的數(shù)據(jù)壓力,盡可能減少數(shù)據(jù)傾斜。
3、具體方法:采用分布式緩存
(1)在mapper的setup階段,將文件讀取到緩存集合中
(2)在driver中加載緩存,job.addCacheFile(new URI("file:/e:/mapjoincache/pd.txt"));// 緩存普通文件到task運(yùn)行節(jié)點(diǎn)。
4、實例
//order.txt
訂單id 商品id 商品數(shù)量
1001 01 1
1002 02 2
1003 03 3
1004 01 4
1005 02 5
1006 03 6
//pd.txt
商品id 商品名
01 小米
02 華為
03 格力
要將order中的商品id替換為商品名稱,緩存 pd.txt 這個小表
mapper:
package MapJoin;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.*;
import java.util.HashMap;
import java.util.Map;
public class MapJoinMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
Map<String, String> productMap = new HashMap<String, String>();
Text k = new Text();
/**
*
* 將 pd.txt加載到hashmap中,只加載一次
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
BufferedReader productReader = new BufferedReader(new InputStreamReader(new FileInputStream(new File("G:\\test\\A\\mapjoin\\pd.txt"))));
String line;
while (StringUtils.isNotEmpty(line = productReader.readLine())) {
String[] fields = line.split("\t");
productMap.put(fields[0], fields[1]);
}
productReader.close();
}
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
String productName = productMap.get(fields[1]);
k.set(fields[0] + "\t" + productName + "\t" + fields[2]);
context.write(k, NullWritable.get());
}
@Override
protected void cleanup(Context context) throws IOException, InterruptedException {
super.cleanup(context);
}
}
driver:
package MapJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
public class MapJoinDriver {
public static void main(String[] args) throws IOException, URISyntaxException, ClassNotFoundException, InterruptedException {
args = new String[]{"G:\\test\\A\\mapjoin\\order.txt", "G:\\test\\A\\mapjoin\\join2\\"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MapJoinDriver.class);
job.setMapperClass(MapJoinMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
//將重復(fù)使用的小文件加載到緩存中
job.addCacheFile(new URI("file:///G:/test/A/mapjoin/pd.txt"));
job.setNumReduceTasks(0);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
1、分析思路
通過將關(guān)聯(lián)條件作為map的輸出的key,也就是使用商品ID來作為key,將兩表滿足join條件的數(shù)據(jù)并攜帶數(shù)據(jù)所來源的文件信息,發(fā)往同一個reduce task,在reduce中進(jìn)行數(shù)據(jù)的串聯(lián)
輸入的數(shù)據(jù)和上面的map join一樣,輸出的結(jié)果也和上面的類似
bean:
package ReduceJoin;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class OrderBean implements Writable {
private String orderID;
private String productID;
private int amount;
private String productName;
private String flag;
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.orderID);
dataOutput.writeUTF(this.productID);
dataOutput.writeInt(this.amount);
dataOutput.writeUTF(this.productName);
dataOutput.writeUTF(this.flag);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.orderID = dataInput.readUTF();
this.productID = dataInput.readUTF();
this.amount = dataInput.readInt();
this.productName = dataInput.readUTF();
this.flag = dataInput.readUTF();
}
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(this.orderID);
sb.append("\t");
sb.append(this.productName);
sb.append("\t");
sb.append(this.amount);
sb.append("\t");
sb.append(this.flag);
return sb.toString();
}
}
map:
package ReduceJoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import java.io.IOException;
public class OrderMapper extends Mapper<LongWritable, Text, Text, OrderBean> {
Text k = new Text();
OrderBean v = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split("\t");
FileSplit inputSplit = (FileSplit)context.getInputSplit();
String fileName = inputSplit.getPath().getName();
//將商品id作為map輸出的key
if (fileName.startsWith("order")) {
k.set(fields[1]);
v.setOrderID(fields[0]);
v.setProductID(fields[1]);
v.setAmount(Integer.parseInt(fields[2]));
v.setFlag("0");
v.setProductName("");
} else {
k.set(fields[0]);
v.setOrderID("");
v.setAmount(0);
v.setProductID(fields[0]);
v.setProductName(fields[1]);
v.setFlag("1");
}
context.write(k, v);
}
}
reduce:
package ReduceJoin;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.ArrayList;
public class OrderReducer extends Reducer<Text, OrderBean, OrderBean, NullWritable> {
@Override
protected void reduce(Text key, Iterable<OrderBean> values, Context context) throws IOException, InterruptedException {
//key是productID,如果訂單表和商品名稱表的productID相同,則key相同,會merge在一起
//<productID,[商品名稱V, 訂單列表V1,訂單列表V2]>
//reduce輸出是將每個訂單列表輸出的
ArrayList<OrderBean> orderBeans = new ArrayList<>();
OrderBean pdBean = new OrderBean();
OrderBean tmp = new OrderBean();
for(OrderBean bean : values) {
if ("0".equals(bean.getFlag())) {
try {
BeanUtils.copyProperties(tmp, bean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
orderBeans.add(tmp);
//orderBeans.add(bean);
} else {
//取出商品名稱的KV
try {
BeanUtils.copyProperties(pdBean, bean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
}
}
//獲取當(dāng)前的KV的productName,并輸出
for (OrderBean o : orderBeans) {
o.setProductName(pdBean.getProductName());
context.write(o, NullWritable.get());
}
}
}
driver:
package ReduceJoin;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class OrderDriver {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
args = new String[]{"G:\\test\\A\\mapjoin\\", "G:\\test\\A\\reducejoin12\\"};
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(OrderDriver.class);
job.setMapperClass(OrderMapper.class);
job.setReducerClass(OrderReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderBean.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
}
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。