您好,登錄后才能下訂單哦!
這期內(nèi)容當(dāng)中小編將會給大家?guī)碛嘘P(guān)ApacheFlink中Flink數(shù)據(jù)流編程是怎樣的,文章內(nèi)容豐富且以專業(yè)的角度為大家分析和敘述,閱讀完這篇文章希望大家可以有所收獲。
數(shù)據(jù)源可以通過StreamExecutionEnvironment.addSource(sourceFunction)方式來創(chuàng)建,F(xiàn)link也提供了一些內(nèi)置的數(shù)據(jù)源方便使用,例如readTextFile(path) readFile(),當(dāng)然,也可以寫一個自定義的數(shù)據(jù)源(可以通過實(shí)現(xiàn)SourceFunction方法,但是無法并行執(zhí)行?;蛘邔?shí)現(xiàn)可以并行實(shí)現(xiàn)的接口ParallelSourceFunction或者繼承RichParallelSourceFunction)
首先做一個簡單入門,建立一個DataStreamSourceApp
object DataStreamSourceApp { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment socketFunction(env) env.execute("DataStreamSourceApp") } def socketFunction(env: StreamExecutionEnvironment): Unit = { val data=env.socketTextStream("192.168.152.45", 9999) data.print() } }
這個方法將會從socket中讀取數(shù)據(jù),因此我們需要在192.168.152.45中開啟服務(wù):
nc -lk 9999
然后運(yùn)行DataStreamSourceApp,在服務(wù)器上輸入:
iie4bu@swarm-manager:~$ nc -lk 9999 apache flink spark
在控制臺中也會輸出:
3> apache 4> flink 1> spark
前面的 341表示的是并行度??梢酝ㄟ^設(shè)置setParallelism來操作:
data.print().setParallelism(1)
public class JavaDataStreamSourceApp { public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); socketFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void socketFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource<String> data = executionEnvironment.socketTextStream("192.168.152.45", 9999); data.print().setParallelism(1); } }
這種方式不能并行處理。
新建一個自定義數(shù)據(jù)源
class CustomNonParallelSourceFunction extends SourceFunction[Long]{ var count=1L var isRunning = true override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning){ ctx.collect(count) count+=1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
這個方法首先定義一個初始值count=1L,然后執(zhí)行的run方法,方法主要是輸出count,并且執(zhí)行加一操作,當(dāng)執(zhí)行cancel方法時結(jié)束。調(diào)用方法如下:
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) nonParallelSourceFunction(env) env.execute("DataStreamSourceApp") } def nonParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomNonParallelSourceFunction()) data.print() }
輸出結(jié)果就是控制臺一直輸出count值。
無法設(shè)置并行度,除非設(shè)置并行度是1.
val data=env.addSource(new CustomNonParallelSourceFunction()).setParallelism(3)
那么控制臺報錯:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55) at com.vincent.course05.DataStreamSourceApp$.nonParallelSourceFunction(DataStreamSourceApp.scala:16) at com.vincent.course05.DataStreamSourceApp$.main(DataStreamSourceApp.scala:11) at com.vincent.course05.DataStreamSourceApp.main(DataStreamSourceApp.scala)
import org.apache.flink.streaming.api.functions.source.{ParallelSourceFunction, SourceFunction} class CustomParallelSourceFunction extends ParallelSourceFunction[Long]{ var isRunning = true var count = 1L override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while(isRunning){ ctx.collect(count) count+=1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning=false } }
方法的功能跟上面是一樣的。main方法如下:
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) // nonParallelSourceFunction(env) parallelSourceFunction(env) env.execute("DataStreamSourceApp") } def parallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data=env.addSource(new CustomParallelSourceFunction()).setParallelism(3) data.print() }
可以設(shè)置并行度3,輸出結(jié)果如下:
2> 1 1> 1 2> 1 2> 2 3> 2 3> 2 3> 3 4> 3 4> 3
class CustomRichParallelSourceFunction extends RichParallelSourceFunction[Long] { var isRunning = true var count = 1L override def run(ctx: SourceFunction.SourceContext[Long]): Unit = { while (isRunning) { ctx.collect(count) count += 1 Thread.sleep(1000) } } override def cancel(): Unit = { isRunning = false } }
def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment // socketFunction(env) // nonParallelSourceFunction(env) // parallelSourceFunction(env) richParallelSourceFunction(env) env.execute("DataStreamSourceApp") } def richParallelSourceFunction(env: StreamExecutionEnvironment): Unit = { val data = env.addSource(new CustomRichParallelSourceFunction()).setParallelism(3) data.print() }
import org.apache.flink.streaming.api.functions.source.SourceFunction; public class JavaCustomNonParallelSourceFunction implements SourceFunction<Long> { boolean isRunning = true; long count = 1; @Override public void run(SourceFunction.SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count+=1; Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); nonParallelSourceFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void nonParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()); data.print().setParallelism(1); }
當(dāng)設(shè)置并行度時:
DataStreamSource data = executionEnvironment.addSource(new JavaCustomNonParallelSourceFunction()).setParallelism(2);
那么報錯異常:
Exception in thread "main" java.lang.IllegalArgumentException: Source: 1 is not a parallel source at org.apache.flink.streaming.api.datastream.DataStreamSource.setParallelism(DataStreamSource.java:55) at com.vincent.course05.JavaDataStreamSourceApp.nonParallelSourceFunction(JavaDataStreamSourceApp.java:16) at com.vincent.course05.JavaDataStreamSourceApp.main(JavaDataStreamSourceApp.java:10)
import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; public class JavaCustomParallelSourceFunction implements ParallelSourceFunction<Long> { boolean isRunning = true; long count = 1; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count+=1; Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); // nonParallelSourceFunction(environment); parallelSourceFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void parallelSourceFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource data = executionEnvironment.addSource(new JavaCustomParallelSourceFunction()).setParallelism(2); data.print().setParallelism(1); }
可以設(shè)置并行度,輸出結(jié)果:
1 1 2 2 3 3 4 4 5 5
public class JavaCustomRichParallelSourceFunction extends RichParallelSourceFunction<Long> { boolean isRunning = true; long count = 1; @Override public void run(SourceContext ctx) throws Exception { while (isRunning) { ctx.collect(count); count+=1; Thread.sleep(1000); } } @Override public void cancel() { isRunning=false; } }
public static void main(String[] args) throws Exception { StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment(); // socketFunction(environment); // nonParallelSourceFunction(environment); // parallelSourceFunction(environment); richpParallelSourceFunction(environment); environment.execute("JavaDataStreamSourceApp"); } public static void richpParallelSourceFunction(StreamExecutionEnvironment executionEnvironment){ DataStreamSource data = executionEnvironment.addSource(new JavaCustomRichParallelSourceFunction()).setParallelism(2); data.print().setParallelism(1); }
1 1 2 2 3 3 4 4 5 5 6 6
上述就是小編為大家分享的ApacheFlink中Flink數(shù)據(jù)流編程是怎樣的了,如果剛好有類似的疑惑,不妨參照上述分析進(jìn)行理解。如果想知道更多相關(guān)知識,歡迎關(guān)注億速云行業(yè)資訊頻道。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進(jìn)行舉報,并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。