您好,登錄后才能下訂單哦!
學(xué)習(xí)任何的spark技術(shù)之前,請(qǐng)先正確理解spark,可以參考:正確理解spark
以下是用spark RDD java api實(shí)現(xiàn)從關(guān)系型數(shù)據(jù)庫(kù)中讀取數(shù)據(jù),這里使用的是derby本地?cái)?shù)據(jù)庫(kù),當(dāng)然可以是mysql或者oracle等關(guān)系型數(shù)據(jù)庫(kù):
package com.twq.javaapi.java7; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function; import org.apache.spark.rdd.JdbcRDD; import java.io.Serializable; import java.sql.*; public class JavaJdbcRDDSuite implements Serializable { public static void prepareData() throws ClassNotFoundException, SQLException { //使用本地?cái)?shù)據(jù)庫(kù)derby,當(dāng)然可以使用mysql等關(guān)系型數(shù)據(jù)庫(kù) Class.forName("org.apache.derby.jdbc.EmbeddedDriver"); Connection connection = DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;create=true"); try { //創(chuàng)建一張表FOO,ID是一個(gè)自增的主鍵,DATA是一個(gè)INTEGER列 Statement create = connection.createStatement(); create.execute( "CREATE TABLE FOO(" + "ID INTEGER NOT NULL GENERATED ALWAYS AS IDENTITY (START WITH 1, INCREMENT BY 1)," + "DATA INTEGER)"); create.close(); //插入數(shù)據(jù) PreparedStatement insert = connection.prepareStatement("INSERT INTO FOO(DATA) VALUES(?)"); for (int i = 1; i <= 5; i++) { insert.setInt(1, i * 2); insert.executeUpdate(); } insert.close(); } catch (SQLException e) { // If table doesn't exist... if (e.getSQLState().compareTo("X0Y32") != 0) { throw e; } } finally { connection.close(); } } public static void shutdownDB() throws SQLException { try { DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb;shutdown=true"); } catch (SQLException e) { // Throw if not normal single database shutdown // https://db.apache.org/derby/docs/10.2/ref/rrefexcept71493.html if (e.getSQLState().compareTo("08006") != 0) { throw e; } } } public static void main(String[] args) throws Exception { JavaSparkContext sc = new JavaSparkContext("local", "JavaAPISuite"); //準(zhǔn)備數(shù)據(jù) prepareData(); //構(gòu)建JdbcRDD JavaRDD<Integer> rdd = JdbcRDD.create( sc, new JdbcRDD.ConnectionFactory() { @Override public Connection getConnection() throws SQLException { return DriverManager.getConnection("jdbc:derby:target/JavaJdbcRDDSuiteDb"); } }, "SELECT DATA FROM FOO WHERE ? <= ID AND ID <= ?", 1, 5, 1, new Function<ResultSet, Integer>() { @Override public Integer call(ResultSet r) throws Exception { return r.getInt(1); } } ); //結(jié)果: [2, 4, 6, 8, 10] System.out.println(rdd.collect()); shutdownDB(); sc.stop(); } }
詳細(xì)了解RDD的api的話,可以參考: spark core RDD api原理詳解
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。