您好,登錄后才能下訂單哦!
flink1.8 對(duì)hive 的支持不夠好,造成300W的數(shù)據(jù),居然讀了2個(gè)小時(shí),打算將程序遷移至spark。 先把代碼貼上。 后發(fā)現(xiàn)sql不應(yīng)該有where條件,去掉后速度還行。
maven
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.1.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.2</version>
</dependency>
<dependency>
<groupId>jdk.tools</groupId>
<artifactId>jdk.tools</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>${JAVA_HOME}/lib/tools.jar</systemPath>
</dependency>
java
private final static String driverName = "org.apache.hive.jdbc.HiveDriver";// jdbc驅(qū)動(dòng)路徑
private final static String url = ";";// hive庫地址+庫名
private final static String user = "";// 用戶名
private final static String password = "!";// 密碼
private final static String table="";
private final static String sql = " ";
public static void main(String[] arg) throws Exception {
long time=System.currentTimeMillis();
HttpClientUtil.sendDingMessage("開始同步hive-"+table+";"+Utils.getTimeString());
/**
* 初始化環(huán)境
*/
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(4);
try {
TypeInformation[] types = new TypeInformation[]{BasicTypeInfo.STRING_TYPE_INFO, BasicTypeInfo.STRING_TYPE_INFO};
String[] colName = new String[]{"user","name"};
RowTypeInfo rowTypeInfo = new RowTypeInfo(types, colName);
JDBCInputFormatBuilder builder = JDBCInputFormat.buildJDBCInputFormat().setDrivername(driverName)
.setDBUrl(url)
.setUsername(user).setPassword(password);
Calendar calendar = Calendar.getInstance();
calendar.setTime(new Date());
calendar.add(Calendar.DATE, -1); //用昨天產(chǎn)出的數(shù)據(jù)
SimpleDateFormat sj = new SimpleDateFormat("yyyyMMdd");
String d=sj.format(calendar.getTime());
JDBCInputFormat jdbcInputFormat = builder.setQuery(sql+" and dt='"+d+"' limit 100000000").setRowTypeInfo(rowTypeInfo).finish();
DataSource<Row> rowlist = env.createInput(jdbcInputFormat);
DataSet<RedisDataModel> temp= rowlist.filter(new FilterFunction<Row>(){
@Override
public boolean filter(Row row) throws Exception {
String key=row.getField(0).toString();
String value=row.getField(1).toString();
if(key.length()<5 || key.startsWith("-") || key.startsWith("$") || value.length()<5 || value.startsWith("-") || value.startsWith("$")) {
return false;
}else {
return true;
}
}
}).map(new MapFunction<Row, RedisDataModel>(){
@Override
public RedisDataModel map(Row value) throws Exception {
RedisDataModel m=new RedisDataModel();
m.setExpire(-1);
m.setKey(JobConstants.REDIS_FLINK_IMEI_USER+value.getField(0).toString());
m.setGlobal(true);
m.setValue(value.getField(1).toString());
return m;
}
});
HttpClientUtil.sendDingMessage("同步hive-"+table+"完成;開始推送模型,共有"+temp.count()+"條;"+Utils.getTimeString());
RedisOutputFormat redisOutput = RedisOutputFormat.buildRedisOutputFormat()
.setHostMaster(AppConfig.getProperty(JobConstants.REDIS_HOST_MASTER))
.setHostSentinel(AppConfig.getProperty(JobConstants.REDIS_HOST_SENTINELS))
.setMaxIdle(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXIDLE)))
.setMaxTotal(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXTOTAL)))
.setMaxWaitMillis(Integer.parseInt(AppConfig.getProperty(JobConstants.REDIS_MAXWAITMILLIS)))
.setTestOnBorrow(Boolean.parseBoolean(AppConfig.getProperty(JobConstants.REDIS_TESTONBORROW)))
.finish();
temp.output(redisOutput);
env.execute("hive-"+table+" sync");
HttpClientUtil.sendDingMessage("同步hive-"+table+"完成,耗時(shí):"+(System.currentTimeMillis()-time)/1000+"s");
} catch (Exception e) {
logger.error("",e);
HttpClientUtil.sendDingMessage("同步hive-"+table+"失敗,時(shí)間戳:"+time+",原因:"+e.toString());
}
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請(qǐng)聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。