您好,登錄后才能下訂單哦!
Beam剛剛開源不是很久,快2個月了。目前的版本是0.5.0版本。官方的源碼中提供了4個examples.無奈這四個案例都只是WordCount的四種不同的實現。作為一個從Spark進入大數據殿堂的筆者來說,用過n多次的SparkPi的我,怎么能忍受竟然沒有Pi實現的example呢。假如有了這個案例,可以非常方便的無論在開發(fā)工具中還是在集群中進行測試。于是便有了下文。筆者的文筆和技術有限。不足之處,還望朋友多多提建議。Let us come on 。
我們先來講講Pi的實現原理。我們是用概率統(tǒng)計的方法來實現的。先來想象一下,以一個單位為半徑畫圓,再畫一個圓的外切正方形。假設一個杯子的底部就被這個正方形和正方形內切圓全部填滿。做n次試驗,往杯子中扔石頭,落在圓內的次數除以總次數是不是Pi*r*r/2r*2r也就是Pi/4.
因此,Pi就是4倍的此概率。
以下是代碼的試驗。僅供參考。
可以復制代碼
package org.tongfang.beam.examples;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.spark.SparkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.*;
public class BeamPi {
public static void main(String[] args) {
// Beam Pi的自定義實現方式
//第一步驟:創(chuàng)建options,
//通過該對象可以選擇使用哪個計算框架來計算,并且設置應用的名稱
PipelineOptions options = PipelineOptionsFactory.create();
//設置job(應用)名稱
options.setJobName("Beam Pi");
//設置runner為Spark
options.setRunner(SparkRunner.class);
//創(chuàng)建管道 p
Pipeline p = Pipeline.create(options);
//100000000次的隨機試驗的次數,如果資源,
//足夠的大可以進行更多次的試驗,用大數據的理論來說,
//理論上可以進行無數次的試驗(只要不斷的橫向擴展計算的資源)。
List<Integer> list = new ArrayList<Integer>();
for(int i = 0;i<1000000;i++){
list.add(i);
}
//相當于Spark從內存中讀取數據,并通過map迭代訪問每一個元素,
//這里迭代1000000的訪問每個依次增大的數字,
//沒迭代一次,做一次試驗,當點落到圓內,計數增加1,否則不計數
//也就是什么也不做
//然后再近些Count計數,最后計數結果除以試驗次數,就是概率。
//從數學角度來看,PI的值就是4倍這個概率。從而計算出PI的值。
p.apply(Create.of(list)).apply(ParDo.of(new DoFn<Integer, Integer>() {
double x = 0;
double y = 0;
private static final long serialVersionUID = 1L;
@ProcessElement
public void processElement(ProcessContext c) {
x = Math.random() * 2 - 1;;
y = Math.random() * 2 - 1;;
if((x*x+y*y)<1){
c.output(1);
}else{
}
}
})).apply( Count.<Integer>globally()).
apply(MapElements.via(new SimpleFunction<Long, Void>() {
/**
*
*/
private static final long serialVersionUID = 1L;
public Void apply(Long input) {
Float res = (float) (4.0*(float)input / 1000000f);
System.out.println(input);
System.out.println("PI : "+res);
return null;
}
}));
//這是運行計算的關鍵,如果這個代碼不寫,
//整個代碼都是懶加載,并非真正計算。
p.run().waitUntilFinish();
}
}
免責聲明:本站發(fā)布的內容(圖片、視頻和文字)以原創(chuàng)、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。