您好,登錄后才能下訂單哦!
現(xiàn)在有一批數(shù)據(jù):
01||zs||18
02||ls||19
03||jj||10
每一行的數(shù)據(jù)的分割符是||,是一個(gè)多字節(jié)的分隔符,默認(rèn)的hive只支持單字節(jié)的分隔符,上面的數(shù)據(jù)時(shí)||多字節(jié),不支持。
解決方案:
?method01:使用 RegexSerDe 通過正則表達(dá)式來抽取字段
#建表語句
create table t_bi_reg(id string,name string,age string ) row format serde 'org.apache.hadoop.hive.serde2.RegexSerDe' with serdeproperties('input.regex'='((.\*)\\\\|\\\\|(.\*)\\\\|\\\\|(.\*))','output.format.string'='%1$s %2$s %3s')
#input.regex:指定切分的正則
#output.format.string:切分之后輸出的字段
#加載數(shù)據(jù)
load data local inpath ‘/Hadoop/data/1.txt’ into table t_bi_reg
#查詢
select * from t_bi_reg;
? method2:修改源碼:
?原理是在 inputformat 讀取行的時(shí)候?qū)?shù)據(jù)中的“多字節(jié)分隔符”替換為 hive 默認(rèn)的分隔 符(ctrl+A 亦即 \001)或用于替代的單字符分隔符,以便 hive 在 serde 操作時(shí)按照默認(rèn)的 單字節(jié)分隔符進(jìn)行字段抽取。
代碼
com.zy.hive.delimit2.BiDelimiterInputFormat:
package com.zy.hive.delimit2;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;
public class BiDelimiterInputFormat extends TextInputFormat {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit,
JobConf job, Reporter reporter)throws IOException {
reporter.setStatus(genericSplit.toString());
BiRecordReader reader = new BiRecordReader(job,(FileSplit)genericSplit);
// MyRecordReader reader = new MyRecordReader(job,(FileSplit)genericSplit);
return reader;
}
}
com.zy.hive.delimit2.BiRecordReader
package com.zy.hive.delimit2;
import java.io.IOException;
import java.io.InputStream;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.Seekable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.CodecPool;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.Decompressor;
import org.apache.hadoop.io.compress.SplitCompressionInputStream;
import org.apache.hadoop.io.compress.SplittableCompressionCodec;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.LineRecordReader;
import org.apache.hadoop.mapred.RecordReader;
public class BiRecordReader implements RecordReader<LongWritable, Text> {
private static final Log LOG = LogFactory.getLog(LineRecordReader.class
.getName());
private CompressionCodecFactory compressionCodecs = null;
private long start;
private long pos;
private long end;
private LineReader in;
int maxLineLength;
private Seekable filePosition;
private CompressionCodec codec;
private Decompressor decompressor;
/**
* A class that provides a line reader from an input stream.
* @deprecated Use {@link org.apache.hadoop.util.LineReader} instead.
*/
@Deprecated
public static class LineReader extends org.apache.hadoop.util.LineReader {
LineReader(InputStream in) {
super(in);
}
LineReader(InputStream in, int bufferSize) {
super(in, bufferSize);
}
public LineReader(InputStream in, Configuration conf)
throws IOException {
Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513
super(in, conf);
}
}
public BiRecordReader(Configuration job, FileSplit split) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
start = split.getStart();
end = start + split.getLength();
final Path file = split.getPath();
compressionCodecs = new CompressionCodecFactory(job);
codec = compressionCodecs.getCodec(file);
// open the file and seek to the start of the split
FileSystem fs = file.getFileSystem(job);
FSDataInputStream fileIn = fs.open(split.getPath());
if (isCompressedInput()) {
decompressor = CodecPool.getDecompressor(codec);
if (codec instanceof SplittableCompressionCodec) {
final SplitCompressionInputStream cIn = ((SplittableCompressionCodec) codec)
.createInputStream(fileIn, decompressor, start, end,
SplittableCompressionCodec.READ_MODE.BYBLOCK);
in = new LineReader(cIn, job);
start = cIn.getAdjustedStart();
end = cIn.getAdjustedEnd();
filePosition = cIn; // take pos from compressed stream
} else {
in = new LineReader(codec.createInputStream(fileIn,
decompressor), job);
filePosition = fileIn;
}
} else {
fileIn.seek(start);
in = new LineReader(fileIn, job);
filePosition = fileIn;
}
// If this is not the first split, we always throw away first record
// because we always (except the last split) read one extra line in
// next() method.
if (start != 0) {
start += in.readLine(new Text(), 0, maxBytesToConsume(start));
}
this.pos = start;
Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513
}
private boolean isCompressedInput() {
return (codec != null);
}
private int maxBytesToConsume(long pos) {
return isCompressedInput() ? Integer.MAX_VALUE : (int) Math.min(
Integer.MAX_VALUE, end - pos);
}
private long getFilePosition() throws IOException {
long retVal;
if (isCompressedInput() && null != filePosition) {
retVal = filePosition.getPos();
} else {
retVal = pos;
}
return retVal;
}
public BiRecordReader(InputStream in, long offset, long endOffset,
int maxLineLength) {
this.maxLineLength = maxLineLength;
this.in = new LineReader(in);
this.start = offset;
this.pos = offset;
this.end = endOffset;
this.filePosition = null;
}
public BiRecordReader(InputStream in, long offset, long endOffset,
Configuration job) throws IOException {
this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
Integer.MAX_VALUE);
this.in = new LineReader(in, job);
this.start = offset;
this.pos = offset;
this.end = endOffset;
this.filePosition = null;
}
public LongWritable createKey() {
return new LongWritable();
Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513
}
public Text createValue() {
return new Text();
}
/** Read a line. */
public synchronized boolean next(LongWritable key, Text value)
throws IOException {
// We always read one extra line, which lies outside the upper
// split limit i.e. (end - 1)
while (getFilePosition() <= end) {
key.set(pos);
int newSize = in.readLine(value,
maxLineLength,Math.max(maxBytesToConsume(pos), maxLineLength));
String str = value.toString().replaceAll("\\|\\|", "\\|");
value.set(str);
pos += newSize;
if (newSize == 0) {
return false;
}
if (newSize < maxLineLength) {
return true;
}
// line too long. try again
LOG.info("Skipped line of size " + newSize + " at pos "
+ (pos - newSize));
}
return false;
}
/**
* Get the progress within the split
*/
public float getProgress() throws IOException {
if (start == end) {
return 0.0f;
} else {
return Math.min(1.0f, (getFilePosition() - start)
Stay hungry Stay foolish -- http://blog.csdn.net/zhongqi2513
/ (float) (end - start));
}
}
public synchronized long getPos() throws IOException {
return pos;
}
public synchronized void close() throws IOException {
try {
if (in != null) {
in.close();
}
} finally {
if (decompressor != null) {
CodecPool.returnDecompressor(decompressor);
}
}
}
}
注意:上述代碼中的 api 全部使用 hadoop 的老 api 接口 org.apache.hadoop.mapred?. 然后將工程打包,并拷貝至 hive 安裝目錄的 lib 文件夾中,并重啟 hive,使用以下語句建表 即可
具體步驟:
hive> add jar /home/hadoop/apps/hive/lib/myinput.jar #將jar包導(dǎo)入hive的classpath中
hive> create table new_bi(id string,name string) #建表
row format delimited
fields terminated by '|'
stored as inputformat 'com.zy.hive.delimit2.BiDelimiterInputFormat' outputformat
outputformat
'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat';
hive> load data local inpath ‘/Hadoop/data/1.txt’ into table t_bi_reg #導(dǎo)入數(shù)據(jù)
hive>select * from t_bi_reg #查詢
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。