List<StructField> fields = new ArrayList<>();
String schemaString = “name,age”;
fields.add(DataTypes.createStructField(“name”,
DataTypes.StringType, true));
fields.add(DataTypes.createStructField(“age”,
DataTypes.IntegerType, true));
StructType schema = DataTypes.createStructType(fields);
// Convert records of the RDD (people) to Rows
JavaRDD rowRDD = distData.map( record ->{
RandomStringField randomStringField = new
RandomStringField();
randomStringField.setLength(10); BinaryIntLabelField
binaryIntLabelField = new
BinaryIntLabelField();
return RowFactory.create(randomStringField.gen(),
binaryIntLabelField.gen());
});
Dataset dataset =spark.createDataFrame(rowRDD, schema);
dataset.persist();
dataset.show();
DataFrameWriter writer = new DataFrameWriter(dataset);
writer.mode(SaveMode.Overwrite).partitionBy(“age”).
parquet(“/Users/sungaofei/gaofei”);
dataframe中每一個(gè)數(shù)據(jù)都是一行 , 也就是一個(gè)Row對(duì)象 , 而且dataframe對(duì)于每一列也就是每個(gè)schema有著嚴(yán)格的要求 。因?yàn)樗且粋€(gè)表么 。所以跟數(shù)據(jù)庫(kù)的表或者pandas中的表是一樣的 。要規(guī)定好每一列的schema以及每一行的數(shù)據(jù) 。所以首先我們先定義好schema , 定義每個(gè)schema的列名和數(shù)據(jù)類型 。然后通過DataTypes的API創(chuàng)建schema 。這樣我們的列信息就有了 。然后是關(guān)鍵的我們?nèi)绾伟岩粋€(gè)RDD轉(zhuǎn)換成dataframe需要的Row并且填充好每一行的數(shù)據(jù) 。這里我們使用RDD的map方法 , 其實(shí)dataframe也是一個(gè)特殊的RDD , 這個(gè)RDD里的每一行都是一個(gè)ROW對(duì)象而已 。所以我們使用RDD的map方法來填充我們每一行的數(shù)據(jù)并把這一行數(shù)據(jù)轉(zhuǎn)換成Row對(duì)象 。
JavaRDD rowRDD = distData.map( record ->{
RandomStringField randomStringField = new RandomStringField();
randomStringField.setLength(10);
BinaryIntLabelField binaryIntLabelField = new BinaryIntLabelField();
return RowFactory.create(randomStringField.gen(), binaryIntLabelField.gen());
});
因?yàn)橹岸xschema的時(shí)候只定義了兩列 , 分別是name和age 。所以在這里我分別用一個(gè)隨機(jī)生成String類型的類和隨機(jī)生成int類型的類來填充數(shù)據(jù) 。最后使用RowFactory.create方法來把這兩個(gè)數(shù)據(jù)生成一個(gè)Row 。map方法其實(shí)就是讓使用者處理每一行數(shù)據(jù)的方法 , record這個(gè)參數(shù)就是把行數(shù)據(jù)作為參數(shù)給我們使用 。當(dāng)然這個(gè)例子里原始RDD的每一行都是當(dāng)初生成List的時(shí)候初始化的index序號(hào) 。而我們現(xiàn)在不需要它 , 所以也就沒有使用 。直接返回隨機(jī)字符串和int類型的數(shù) 。然后我們有了這個(gè)每一行數(shù)據(jù)都是Row對(duì)象的RDD后 。就可以通過調(diào)用下面的API來生成dataframe 。
Dataset dataset =spark.createDataFrame(rowRDD, schema);
分別把row和schema傳遞進(jìn)去 , 生成dataframe的表 。最后利用DataFrameWriter保存數(shù)據(jù) 。
好了 , 這就是造數(shù)的基本原理了 , 其實(shí)也是蠻簡(jiǎn)單的 。當(dāng)然要做到嚴(yán)格控制數(shù)據(jù)分布 , 數(shù)據(jù)類型 , 特征維度等等就需要做很多特殊的處理 。這里就不展開細(xì)節(jié)了 。
測(cè)試ETL處理的正確性
輸入一份數(shù)據(jù) , 然后判斷輸出的數(shù)據(jù)是否是正確的 。只不過我們這是在大數(shù)據(jù)量下的處理和測(cè)試 , 輸入的數(shù)據(jù)是大數(shù)據(jù) , ELT輸出的也是大數(shù)據(jù) , 所以就需要一些新的測(cè)試手段 。其實(shí)這個(gè)測(cè)試手段也沒什么新奇的了 , 是我們剛才一直在講的技術(shù) , 也就是spark這種分布式計(jì)算框架 。我們以spark任務(wù)來測(cè)試這些ETL程序 , 這同樣也是為了測(cè)試自身的效率和性能 。如果單純使用hdfs client來讀取文件的話 , 掃描那么大的數(shù)據(jù)量是很耗時(shí)的 , 這是我們不能接受的 。所以我們利用大數(shù)據(jù)技術(shù)來測(cè)試大數(shù)據(jù)功能就成為了必然 。當(dāng)然也許有些同學(xué)會(huì)認(rèn)為我只是測(cè)試功能么 , 又不是測(cè)試算法的處理性能 , 沒必要使用那么大的數(shù)據(jù)量 。我們用小一點(diǎn)的數(shù)據(jù) , 比如一百行的數(shù)據(jù)就可以了 。但其實(shí)這也是不對(duì)的 , 因?yàn)樵诜植际接?jì)算中 , 大數(shù)量和小數(shù)據(jù)量的處理結(jié)果可能不是完全一致的 , 比如隨機(jī)拆分?jǐn)?shù)據(jù)這種場(chǎng)景在大數(shù)據(jù)量下可能才能測(cè)試出bug 。而且大數(shù)據(jù)測(cè)試還有另外一種場(chǎng)景就是數(shù)據(jù)監(jiān)控 , 定期的掃描線上數(shù)據(jù) , 驗(yàn)證線上數(shù)據(jù)是否出現(xiàn)異常 。這也是一種測(cè)試場(chǎng)景 , 而且線上的數(shù)據(jù)一定是海量的 。
以上關(guān)于本文的內(nèi)容,僅作參考!溫馨提示:如遇健康、疾病相關(guān)的問題,請(qǐng)您及時(shí)就醫(yī)或請(qǐng)專業(yè)人士給予相關(guān)指導(dǎo)!
「愛刨根生活網(wǎng)」www.malaban59.cn小編還為您精選了以下內(nèi)容,希望對(duì)您有所幫助:- 男人如何處理婆媳關(guān)系 聰明男人處理婆媳關(guān)系
- 目前最好的cpu排名 電腦cpu處理器最新排名
- iphone手機(jī)丟失解決措施 蘋果手機(jī)被盜怎么找回處理
- 該如何處理婆媳關(guān)系 怎么解決婆媳關(guān)系
- 老婆出軌了該怎么處理才是最好的 出軌的婚姻能撐多久
- 老婆外遇怎樣處理 怎樣處理老婆出軌的情況
- ?老公外遇怎么處理 老公有外遇怎么辦
- 老公出軌后的真實(shí)心理 男人出軌后女人錯(cuò)誤的處理方法
- 二婚夫妻怎么處理財(cái)務(wù)問題 二婚夫妻誰(shuí)管錢好
- ubuntu上不了網(wǎng)處理方法 ubuntu上不了網(wǎng)怎么辦
