溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務條款》

如何把JSON文件轉化為DataFrame

發(fā)布時間:2021-12-16 15:15:57 來源:億速云 閱讀:222 作者:iii 欄目:云計算

這篇文章主要介紹“如何把JSON文件轉化為DataFrame ”,在日常操作中,相信很多人在如何把JSON文件轉化為DataFrame 問題上存在疑惑,小編查閱了各式資料,整理出簡單好用的操作方法,希望對大家解答”如何把JSON文件轉化為DataFrame ”的疑惑有所幫助!接下來,請跟著小編一起來學習吧!

一:簡單了解SparkSQL。

    Spark SQL 是結構化的數(shù)據(jù)處理一個Spark模塊。與基本的Spark RDD API不同,Spark SQL 所提供的接口為Spark 提供有關數(shù)據(jù)和正在執(zhí)行的計算的結構的詳細信息。Spark SQL內部使用這些額外的信息來執(zhí)行額外的優(yōu)化。有幾種方法與Spark SQL 包括 SQL、 DataFrames API 和數(shù)據(jù)集 API 進行交互。計算結果相同的執(zhí)行引擎在使用時,獨立的 API/語言使用的表達計算。這種統(tǒng)一意味著開發(fā)人員很容易可以提供最自然的方式來表達一個給定的轉換基于各種 Api 之間來回切換。

    Spark SQL是Spark中的一個模塊,主要用于進行結構化數(shù)據(jù)的處理。它提供的最核心的編程抽象,就是DataFrame。同時Spark SQL還可以作為分布式的SQL查詢引擎。Spark SQL最重要的功能之一,就是從Hive中查詢數(shù)據(jù)。

二:簡單了解DataFrame。

    DataFrame是一個以命名列方式組織的分布式數(shù)據(jù)集,等同于關系型數(shù)據(jù)庫中的一個表,也相當于R/Python中的data frames(但是進行了更多的優(yōu)化)。DataFrame可以通過很多來源進行構建,包括:結構化的數(shù)據(jù)文件,Hive中的表,外部的關系型數(shù)據(jù)庫,以及RDD。

接下來是對 結構化數(shù)據(jù)集 與 非結構化數(shù)據(jù)集 的操作。

三:結構化數(shù)據(jù)集: 如何把JSON文件轉化為DataFrame 

3.1.在HDFS上放置了兩個JSON文件,即

people.json, 文件內容如下:

{"id": "19","name": "berg","sex": "male","age": 19}
{"id": "20","name": "cccc","sex": "female","age": 20}
{"id": "21","name": "xxxx","sex": "male","age": 21}
{"id": "22","name": "jjjj","sex": "female","age": 21}

student.json,文件內容如下:

{"id": "1","name": "china","sex": "female","age": 100}
{"id": "19","name": "xujun","sex": "male","age": 22}

3.2 通過DataFrame的API來操作數(shù)據(jù),熟悉下DataFrame中方法的使用: 

public class SparkSqlDemo {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//本地的JSON文件轉化為DataFrame
		DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");

		//輸出表結構
		df.printSchema();

		//顯示DataFrame的內容。
		df.show();

		//選擇name
		df.select(df.col("name")).show();

		// 選擇所有年齡大于21歲的人,只保留name字段
		df.filter(df.col("age").lt(21)).select("name").show();

		// 選擇name,并把age字段自增 1
		df.select(df.col("name"), df.col("age").plus(1)).show();

		//按年齡分組計數(shù):
		df.groupBy("age").count().show();  // 應該有一條數(shù)據(jù)記錄為  2 

		//把另個JSON文件轉化為DataFrame
		DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json");

		df2.show();
		
		//表的關聯(lián)。
		df.join(df2,df.col("id").equalTo(df2.col("id"))).show();
		
		
		//以編程方式運行SQL:
		//把DataFrame對象轉化為一個虛擬的表
		df.registerTempTable("people");
		sqlContext.sql("select age,count(*) from people group by age").show();

		System.out.println(  "-------------" );
		sqlContext.sql("select * from people").show();

	}
}

3.3 以編程方式運行 SQL 查詢并返回作為綜合結果,通過注冊表,操作sql的方式來操作數(shù)據(jù):

public class SparkSqlDemo1 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//本地的JSON文件轉化為DataFrame
		DataFrame df = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");

		//把另一個JSON文件轉化為DataFrame
		DataFrame df2 = sqlContext.read().json("hdfs://192.168.226.129:9000/txt/sparkshell/student.json");		

		//以編程方式運行SQL:
		//把DataFrame對象轉化為一個虛擬的表
		df.registerTempTable("people");
		df2.registerTempTable("student");

		// 查詢虛擬表 people 中所有數(shù)據(jù)
		sqlContext.sql("select * from people").show();

		//查看某個字段  
		sqlContext.sql("select name from people ").show();

		//查看多個字段  
		sqlContext.sql("select name,age+1 from people ").show();  

		//過濾某個字段的值  
		sqlContext.sql("select name, age from people where age>=21").show();

		//count group 某個字段的值  
		sqlContext.sql("select age,count(*) countage from people group by age").show();


		//關聯(lián): 內聯(lián) 。 
		sqlContext.sql("select * from people inner join student on people.id = student.id ").show();
		/*
	    +---+---+----+----+---+---+-----+----+
		|age| id|name| sex|age| id| name| sex|
		+---+---+----+----+---+---+-----+----+
		| 19| 19|berg|male| 22| 19|xujun|male|
		+---+---+----+----+---+---+-----+----+ 
		 */
	}
}

四:非結構化數(shù)據(jù)集:

    第一種方法使用反射來推斷架構 RDD 包含特定類型的對象。
    這種基于反射方法導致更簡潔的代碼和工程好當您已經知道該Schema編寫Spark應用程序時。

    創(chuàng)建 DataFrames 的第二個方法是通過允許您構建一個Schema,然后將它應用于現(xiàn)有 RDD 的編程接口。
    雖然這種方法更為詳細,它允許您構建 DataFrames 時直到運行時才知道的列和它們的類型。

4.1  非結構化的數(shù)據(jù)集文件,user.txt,內容如下:

1,"Hadoop",20
2,"HBase", 21
3,"Zookeeper",22
4,"Hive",23
5,"Spark",24
6,"Berg",22
7,"Xujun",23

4.2  通過 class反射來注冊一張表。

        Spark SQL 支持 JavaBeans RDD 自動轉換分布式數(shù)據(jù)集。BeanInfo,使用反射來獲取定義表的架構。目前,Spark SQL 不支持包含嵌套的 JavaBeans 或包含復雜的類型,例如列表或數(shù)組。您可以通過創(chuàng)建一個類,實現(xiàn)可序列化并有 getter 和 setter 方法的所有其字段創(chuàng)建 JavaBean。

public class SparkSqlDemo2 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//把加載的文本文件 并 每行轉換 JavaBean
		JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt");

		JavaRDD<User> userRDD = rdd.map( new Function<String, User>() {

			private static final long serialVersionUID = 1L;

			public User call(String line) throws Exception {
				String[] parts = line.split(",");
				User user = new User();
				user.setId(Integer.parseInt(parts[0].trim()));
				user.setName(parts[1].trim());
				user.setAge(Integer.parseInt(parts[2].trim()));
				return user;
			}
		});

		// collect 屬于行動算子Action 提交作業(yè)并觸發(fā)運算。
		List<User> list = userRDD.collect();
		for (User user : list) {
			System.out.println(  user );
		}

		//通過 class 反射注冊一張表
		DataFrame df = sqlContext.createDataFrame(userRDD, User.class);
		df.registerTempTable("user");

		DataFrame df1 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23");

		// 通過sql 查詢的結果是 DataFrame 即df1 它還是支持 RDD的所有正常操作。
		df1.show();
		
		//并且 結果中的行列可以按序號訪問。
		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {
			
			private static final long serialVersionUID = 1L;

			public String call(Row row) {
				return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2);
			}
		}).collect();
		
		for (String string : listString) {
			System.out.println(  string );
		}
	}
}

4.3   以編程方式指定 schema, 通過字段反射來映射注冊臨時表

    在某些情況下不能提前定義 JavaBean 類 (例如,記錄的結構編碼的字符串,或將解析文本數(shù)據(jù)集和領域預計將以不同的方式為不同的用戶),
    三個步驟,可以以編程方式創(chuàng)建分布式數(shù)據(jù)集。

    1. 從原始 RDD; 創(chuàng)建行 RDD

    2. 創(chuàng)建由 StructType 中 RDD 在步驟 1 中創(chuàng)建的行結構相匹配的schema。

    3.適用于行 RDD 通過 createDataFrame 方法由 SQLContext 提供的schema。

public class SparkSqlDemo3 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//創(chuàng)建了 sqlContext的上下文,注意,它是DataFrame的起點。
		SQLContext sqlContext = new SQLContext(sc);

		//把加載的文本文件 并 每行轉換 JavaBean
		JavaRDD<String> rdd = sc.textFile("hdfs://192.168.226.129:9000/txt/sparkshell/user.txt");

		// schema 以字符串形式編碼
		String schemaString = "id name age";

		// 基于 字符串的schema生成 schema。
		List<StructField> fields = new ArrayList<StructField>();
		
		String[] str = schemaString.split(" ");
		fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));
		fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));
		fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));

		StructType schema = DataTypes.createStructType(fields);  //  id name age

		JavaRDD<Row> rowRDD = rdd.map( new Function<String, Row>() {

			private static final long serialVersionUID = 1L;
			public Row call(String record) throws Exception {
				String[] fields = record.split(",");
				return RowFactory.create(Integer.parseInt(fields[0].trim()), fields[1].trim(),Integer.parseInt(fields[2].trim()));
			}
		});

		List<Row> list = rowRDD.collect();
		for (Row row : list) {
			System.out.println(  row.getInt(0) + "\t"+ row.getString(1) + "\t"+row.getInt(2)  );
		}

		//對RDD應用schema 并注冊一張表:
		DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
		System.out.println( "df : " + df);
		df.registerTempTable("user");

		df.show();
		DataFrame df2 = sqlContext.sql("SELECT id,name,age FROM user WHERE age >= 21 AND age <= 23");

		// 通過sql 查詢的結果是 DataFrame 即df1 它還是支持 RDD的所有正常操作。
		df2.show();
		// 并且 結果中的行列可以按序號訪問。
		List<String> listString = df2.javaRDD().map(new Function<Row, String>() {

			private static final long serialVersionUID = 1L;
			public String call(Row row) {
				System.out.println( row );
				return "Id: " + row.getInt(0) + ", Name: "+row.getString(1) + ", Age: " + row.getInt(2);
			}
		}).collect();

		for (String string : listString) {
			System.out.println(  string );
		}

	}
}

注意如果將上述代碼段中的一段,即:

        String[] str = schemaString.split(" ");

        fields.add(DataTypes.createStructField(str[0], DataTypes.IntegerType, true));

        fields.add(DataTypes.createStructField(str[1], DataTypes.StringType, true));

        fields.add(DataTypes.createStructField(str[2], DataTypes.IntegerType, true));

改為下面這段代碼:

   
     for (String fieldName: schemaString.split(" ")) {


            fields.add(DataTypes.createStructField(fieldName, DataTypes.StringType, true));

        }

將會出現(xiàn)以下錯誤:

        Caused by: scala.MatchError: 1 (of class java.lang.Integer)

到此,關于“如何把JSON文件轉化為DataFrame ”的學習就結束了,希望能夠解決大家的疑惑。理論與實踐的搭配能更好的幫助大家學習,快去試試吧!若想繼續(xù)學習更多相關知識,請繼續(xù)關注億速云網站,小編會繼續(xù)努力為大家?guī)砀鄬嵱玫奈恼拢?/p>

向AI問一下細節(jié)

免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關證據(jù),一經查實,將立刻刪除涉嫌侵權內容。

AI