溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點(diǎn)擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

不使用Sqoop流程,利用CacheManager直接完成SparkSQL數(shù)據(jù)流直接回寫Oracle

發(fā)布時(shí)間:2020-07-28 07:16:19 來源:網(wǎng)絡(luò) 閱讀:6748 作者:Rawirm 欄目:大數(shù)據(jù)

以前都是使用Sqoop來完成數(shù)據(jù)從生成的hdfs數(shù)據(jù)存儲上來抽取至oracle的數(shù)據(jù)庫:sqoop抽取語句:
sqoop export --connect "jdbc:oracle:thin:@ip:port:sid" --username 用戶名 --password 密碼 --table sid.表名 --export-dir hdfs://nameservice1/user/XXX(hdfs地址) --fields-terminated-by "\001" --null-non-string '' --null-string '' -m 10;

 由于項(xiàng)目需求我們現(xiàn)在要完成在代碼中省城所需字段之后,直接回寫到oracle中,因?yàn)閿?shù)據(jù)量每天都很大,用實(shí)例或者List存有很大的局限性,可能會出現(xiàn)內(nèi)存異常等不可預(yù)料的東西,所以我通過緩存器機(jī)制來存儲數(shù)據(jù),然后進(jìn)行生成結(jié)果的臨時(shí)表直接回寫(后面做的hbase接口封裝批量提交也比較類似)
 廢話不多說直接上代碼:
 1、建立緩存實(shí)體
 package usi.java.oracle;

/**

  • @author HK
  • @date 2011-2-15 下午06:45:57
    */
    public class Cache {
    private String key;
    private Object value;
    private long timeOut;
    private boolean expired;
    public Cache() {
    super();
    }

    public Cache(String key, String value, long timeOut, boolean expired) {
    this.key = key;
    this.value = value;
    this.timeOut = timeOut;
    this.expired = expired;
    }

    public String getKey() {
    return key;
    }

    public long getTimeOut() {
    return timeOut;
    }

    public Object getValue() {
    return value;
    }

    public void setKey(String string) {
    key = string;
    }

    public void setTimeOut(long l) {
    timeOut = l;
    }

    public void setValue(Object object) {
    value = object;
    }

    public boolean isExpired() {
    return expired;
    }

    public void setExpired(boolean b) {
    expired = b;
    }
    }

2、建立緩存控制器
package usi.java.oracle;

import java.util.Date;
import java.util.HashMap;

/**

  • @author HK
  • @date 2011-2-15 下午09:40:00
    */
    public class CacheManager {

    private static HashMap cacheMap = new HashMap();

    /**

    • This class is singleton so private constructor is used.
      */
      private CacheManager() {
      super();
      }

      /**

    • returns cache item from hashmap
    • @param key
    • @return Cache
      */
      private synchronized static Cache getCache(String key) {
      return (Cache)cacheMap.get(key);
      }

      /**

    • Looks at the hashmap if a cache item exists or not
    • @param key
    • @return Cache
      */
      private synchronized static boolean hasCache(String key) {
      return cacheMap.containsKey(key);
      }

      /**

    • Invalidates all cache
      */
      public synchronized static void invalidateAll() {
      cacheMap.clear();
      }

      /**

    • Invalidates a single cache item
    • @param key
      */
      public synchronized static void invalidate(String key) {
      cacheMap.remove(key);
      }

      /**

    • Adds new item to cache hashmap
    • @param key
    • @return Cache
      */
      private synchronized static void putCache(String key, Cache object) {
      cacheMap.put(key, object);
      }

      /**

    • Reads a cache item's content
    • @param key
    • @return
      */
      public static Cache getContent(String key) {
      if (hasCache(key)) {
      Cache cache = getCache(key);
      if (cacheExpired(cache)) {
      cache.setExpired(true);
      }
      return cache;
      } else {
      return null;
      }
      }

      /**

    • @param key
    • @param content
    • @param ttl
      */
      public static void putContent(String key, Object content, long ttl) {
      Cache cache = new Cache();
      cache.setKey(key);
      cache.setValue(content);
      cache.setTimeOut(ttl + new Date().getTime());
      cache.setExpired(false);
      putCache(key, cache);
      }

      /* @modelguid {172828D6-3AB2-46C4-96E2-E72B34264031} /
      private static boolean cacheExpired(Cache cache) {
      if (cache == null) {
      return false;
      }
      long milisNow = new Date().getTime();
      long milisExpire = cache.getTimeOut();
      if (milisExpire < 0) { // Cache never expires
      return false;
      } else if (milisNow >= milisExpire) {
      return true;
      } else {
      return false;
      }
      }

}

3、建立需要導(dǎo)出數(shù)據(jù)對象
package usi.java.oracle;

public class TaskAll {
private String mme_eid;
private String mme_editor;
private String entitytype_eid;
private String project_eid;
private String resource_eid;
public String getMme_eid() {
return mme_eid;
}
public void setMme_eid(String mme_eid) {
this.mme_eid = mme_eid;
}
public String getMme_editor() {
return mme_editor;
}
public void setMme_editor(String mme_editor) {
this.mme_editor = mme_editor;
}
public String getEntitytype_eid() {
return entitytype_eid;
}
public void setEntitytype_eid(String entitytype_eid) {
this.entitytype_eid = entitytype_eid;
}
public String getProject_eid() {
return project_eid;
}
public void setProject_eid(String project_eid) {
this.project_eid = project_eid;
}
public String getResource_eid() {
return resource_eid;
}
public void setResource_eid(String resource_eid) {
this.resource_eid = resource_eid;
}

}
5、執(zhí)行邏輯主體,回寫數(shù)據(jù),批量提交

package usi.java.oracle;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
//import java.sql.ResultSet;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.hive.HiveContext;

public class redict_to_171ora {
public static void main(String[] args) {
SparkConf sc = new SparkConf().setAppName("redict_to_171ora");
SparkContext jsc = new SparkContext(sc);
HiveContext hc = new HiveContext(jsc);
String hivesql1="select t.mme_eid,t.mme_editor,t.entitytype_eid,t.project_eid,t.resource_eid from usi_odso.c_taskall t limit 150000";

    DataFrame redict_to_171ora= hc.sql(hivesql1);
    //redict_to_171ora.registerTempTable("hivesql1");       
    List<Row> collect=redict_to_171ora.javaRDD().collect();

    int o=0;
    for (Row lists: collect){
        TaskAll task=new TaskAll();
        task.setMme_eid(lists.getString(0));
        task.setMme_editor(lists.getString(1));
        task.setEntitytype_eid(lists.getString(2));
        task.setProject_eid(lists.getString(3));
        task.setResource_eid(lists.getString(4));
        CacheManager.putContent(o+"", task, 30000000);
        o++;
    /* System.out.println(lists.size());
     System.out.println(lists.getString(0));
     System.out.println(lists.getString(1));
     System.out.println(lists.getString(2));
     System.out.println(lists.getString(3));
     System.out.println(lists.getString(4));*/
      }
    System.out.println(o);

        Connection con = null;// 創(chuàng)建一個(gè)數(shù)據(jù)庫連接
        PreparedStatement pre = null;// 創(chuàng)建預(yù)編譯語句對象,一般都是用這個(gè)而不用Statement
        //ResultSet result = null;// 創(chuàng)建一個(gè)結(jié)果集對象
        try
        {
            Class.forName("oracle.jdbc.driver.OracleDriver");// 加載Oracle驅(qū)動程序
            System.out.println("開始嘗試連接數(shù)據(jù)庫!");
            String url = "jdbc:oracle:" + "thin:@ip:1521:sid";// 127.0.0.1是本機(jī)地址,XE是精簡版Oracle的默認(rèn)數(shù)據(jù)庫名
            String user = "user";// 用戶名,系統(tǒng)默認(rèn)的賬戶名
            String password = "password";// 你安裝時(shí)選設(shè)置的密碼
            con = DriverManager.getConnection(url, user, password);// 獲取連接
            System.out.println("連接成功!");
            String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values(?,?,?,?,?)";// 預(yù)編譯語句,“?”代表參數(shù)
            pre = con.prepareStatement(sql);// 實(shí)例化預(yù)編譯語句
            for(int i=0;i<o;i++){
           // for (Row lists: collect){             
           // String sql = "insert into c_taskall_test(mme_eid,mme_editor,entitytype_eid,project_eid,resource_eid) values('"+task.getMme_eid()+"','"+task.getMme_editor()+"','"+task.getEntitytype_eid()+"','"+task.getProject_eid()+"','"+task.getResource_eid()+"')";// 預(yù)編譯語句,“?”代表參數(shù)                                         
           // pre.setString(1, "三星");// 設(shè)置參數(shù),前面的1表示參數(shù)的索引,而不是表中列名的索引
            TaskAll task=(TaskAll) CacheManager.getContent(""+i).getValue();
            pre.setString(1, task.getMme_eid());
            pre.setString(2, task.getMme_editor());
            pre.setString(3, task.getEntitytype_eid());
            pre.setString(4, task.getProject_eid());
            pre.setString(5, task.getResource_eid());
            pre.addBatch(); 
            if(i%20000==0){//可以設(shè)置不同的大??;如50,100,500,1000等等      
            pre.executeBatch();      
            con.commit();      
            pre.clearBatch();      
           // System.out.println("i的值"+i);
            }
           // result = pre.executeQuery();// 執(zhí)行查詢,注意括號中不需要再加參數(shù)             
            }
            pre.executeBatch();      
            con.commit();      
            pre.clearBatch();      
           // System.out.println("i的值"+i);
          /*  if (result != null)
                result.close();*/
            if (pre != null)
                pre.close();
           /* while (result.next())
                // 當(dāng)結(jié)果集不為空時(shí)
                System.out.println("usernum:" + result.getString("usernum") + "flow:"
                        + result.getString("flow"));*/
           }
        catch (Exception e)
        {
            e.printStackTrace();
        }
        finally
        {
            try
            {
                // 逐一將上面的幾個(gè)對象關(guān)閉,因?yàn)椴魂P(guān)閉的話會影響性能、并且占用資源
                // 注意關(guān)閉的順序,最后使用的最先關(guān)閉
              /*  if (result != null)
                    result.close();*/
                if (pre != null)
                    pre.close();
                if (con != null)
                    con.close();
                //System.out.println("數(shù)據(jù)庫連接已關(guān)閉!");
            }
            catch (Exception e)
            {
                e.printStackTrace();
            }
        }

    }

}

向AI問一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI