溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊(cè)×
其他方式登錄
點(diǎn)擊 登錄注冊(cè) 即表示同意《億速云用戶服務(wù)條款》

flink中zk引起的重啟怎么解決

發(fā)布時(shí)間:2021-12-31 13:45:44 來(lái)源:億速云 閱讀:1763 作者:iii 欄目:大數(shù)據(jù)

這篇文章主要介紹“flink中zk引起的重啟怎么解決”,在日常操作中,相信很多人在flink中zk引起的重啟怎么解決問(wèn)題上存在疑惑,小編查閱了各式資料,整理出簡(jiǎn)單好用的操作方法,希望對(duì)大家解答”flink中zk引起的重啟怎么解決”的疑惑有所幫助!接下來(lái),請(qǐng)跟著小編一起來(lái)學(xué)習(xí)吧!

背景

最近用flink on k8s跑程序的過(guò)程中,發(fā)現(xiàn)某個(gè)時(shí)刻經(jīng)常導(dǎo)致程序重啟,定時(shí)任務(wù)每天加載一次緩存,該緩存有大量數(shù)據(jù),加載時(shí)長(zhǎng)需要60-90s左右。這個(gè)定時(shí)任務(wù)經(jīng)常會(huì)導(dǎo)致k8s重啟程序,使其極不穩(wěn)定,于是各種調(diào)優(yōu)。

內(nèi)存相關(guān)

  1. 懷疑可能是算子的sender和receiver之間因?yàn)榧虞d緩存導(dǎo)致某種通信不可達(dá),默認(rèn)的心跳時(shí)間是50s,于是修改參數(shù):heartbeat.timeout: 180000,heartbeat.interval: 20000。

  2. jobmanager和taskmanager是用akka通信,修改參數(shù)akka.ask.timeout: 240s。

這些操作之后,偶爾還是會(huì)在加載緩存的時(shí)候發(fā)現(xiàn)異常,日志截取如下

2020-10-16 17:05:05,939 WARN org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f
2020-10-16 17:05:05,948 INFO org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn - Client session timed out, have not heard from server in 29068ms for sessionid 0x30135fa8005449f, closing socket connection and attempting reconnect
2020-10-16 17:05:07,609 INFO org.apache.flink.shaded.curator.org.apache.curator.framework.state.ConnectionStateManager - State change: SUSPENDED
2020-10-16 17:05:07,611 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-10-16 17:05:07,612 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - JobManager for job 1bb3b7bdcfbc39cf760064ed9736ea80 with leader id bed26e07640e5e79197e468c85354534 lost leadership.
2020-10-16 17:05:07,613 WARN org.apache.flink.runtime.leaderretrieval.ZooKeeperLeaderRetrievalService - Connection to ZooKeeper suspended. Can no longer retrieve the leader from ZooKeeper.
2020-10-16 17:05:07,614 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Close JobManager connection for job 1bb3b7bdcfbc39cf760064ed9736ea80.
2020-10-16 17:05:07,615 INFO org.apache.flink.runtime.taskmanager.Task - Attempting to fail task externally Source: Custom Source -> Flat Map -> Timestamps/Watermarks (15/15) (052a84a37a0647ab485baa54f149b762).
2020-10-16 17:05:07,615 INFO org.apache.flink.runtime.taskmanager.Task - Source: Custom Source -> Flat Map -> Timestamps/Watermarks (15/15) (052a84a37a0647ab485baa54f149b762) switched from RUNNING to FAILED.
org.apache.flink.util.FlinkException: JobManager responsible for 1bb3b7bdcfbc39cf760064ed9736ea80 lost the leadership.
at org.apache.flink.runtime.taskexecutor.TaskExecutor.closeJobManagerConnection(TaskExecutor.java:1274)
at org.apache.flink.runtime.taskexecutor.TaskExecutor.access$1200(TaskExecutor.java:155)
at org.apache.flink.runtime.taskexecutor.TaskExecutor$JobLeaderListenerImpl.lambda$jobManagerLostLeadership$1(TaskExecutor.java:1698)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:402)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:195)
at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:152)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)
at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:170)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
at akka.actor.Actor$class.aroundReceive(Actor.scala:517)
at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
at akka.actor.ActorCell.invoke(ActorCell.scala:561)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
at akka.dispatch.Mailbox.run(Mailbox.scala:225)
at akka.dispatch.Mailbox.exec(Mailbox.scala:235)
at akka.dispatch.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at akka.dispatch.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at akka.dispatch.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at akka.dispatch.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: java.lang.Exception: Job leader for job id 1bb3b7bdcfbc39cf760064ed9736ea80 lost leadership.
... 22 more

再經(jīng)過(guò)調(diào)查發(fā)現(xiàn),這個(gè)跟zk有關(guān)系,zk在切換leader或者遇到網(wǎng)絡(luò)波動(dòng)之類的,會(huì)觸發(fā)SUSPENDED狀態(tài),這個(gè)狀態(tài),會(huì)導(dǎo)致lost the leadership錯(cuò)誤,而遇到這個(gè)錯(cuò)誤,k8s直接就重啟程序。其實(shí)訪問(wèn)zk還是正常的。 再經(jīng)過(guò)一系列調(diào)查,這種問(wèn)題別人早就遇到,還改了代碼,就是flink官方?jīng)]合并代碼。調(diào)查的過(guò)程不表,有用的鏈接如下

  1. https://www.cnblogs.com/029zz010buct/p/10946244.html

這個(gè)有用的是升級(jí)curator包, flink用的是2.12.0,暫時(shí)沒(méi)去操作,里面提到的SessionConnectionStateErrorPolicy是在4.x版本的,應(yīng)該還是要去編譯部分代碼。

  1. https://github.com/apache/flink/pull/9066 https://issues.apache.org/jira/browse/FLINK-10052

    這個(gè)是其他人的解決方案,本人用的也是這個(gè)方法。 不把SUSPENDED狀態(tài)認(rèn)為是lost leadership,修改LeaderLatch的handleStateChange方法

            case RECONNECTED:
            {
                try
                {
                    if (!hasLeadership.get())
                    {
                        reset();
                    }
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("Could not reset leader latch", e);
                    setLeadership(false);
                }
                break;
            }

            case LOST:
            {
                setLeadership(false);
                break;
            }

編譯flink-shaded-hadoop-2-uber

找到這段代碼之后,自然是找到了flink-shaded-hadoop-2-uber-xxx.jar這個(gè)包,在flink1.10的版本,還支持hadoop的這個(gè)包,在1.11之后已經(jīng)不再主動(dòng)支持,需要的要自己去下載,因?yàn)檫@個(gè)包在打鏡像時(shí)會(huì)特意加上去,所以目標(biāo)鎖定這個(gè)包,重新編譯。簡(jiǎn)單說(shuō)下編譯過(guò)程

  1. https://github.com/apache/curator/tree/apache-curator-2.12.0 下載這個(gè)版本的源碼,修改curator-recipes下的src/main/java/org/apache/curator/framework/recipes/leader/LeaderLatch.java,修改內(nèi)容如上所示,打的包是2.12.0。

  2. https://github.com/apache/flink-shaded/tree/release-10.0 下載flink-shaded 1.10版本的源碼,修改flink-shaded-hadoop-2-parent的pom文件,增加exclusion,去掉curator-recipes的依賴,增加自己編譯的curator-recipes。觀察到不去掉依賴,默認(rèn)是2.7.1版本,應(yīng)該是這塊代碼好多年沒(méi)動(dòng)過(guò),版本一直停留在2.7.1。

		<dependency>
			<groupId>org.apache.hadoop</groupId>
			<artifactId>hadoop-common</artifactId>
			<version>${hadoop.version}</version>
			<exclusions>
				...省略若干exclusion
				<exclusion>
				<groupId>org.apache.curator</groupId>
					<artifactId>curator-recipes</artifactId>
				</exclusion>
			</exclusions>
		</dependency>
		
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>2.12.0</version>
		</dependency>
  1. 因?yàn)槲覀冇玫氖?.8.3-10.0版本的,源碼是2.4.1的,修改成<hadoop.version>2.8.3</hadoop.version>

  2. 看根目錄的readme.md,在flink-shaded-release-10.0/flink-shaded-hadoop-2-parent目錄運(yùn)行mvn package -Dshade-sources打包,打包完成之后,用工具反編譯觀察一下,SUSPENDED的代碼確實(shí)去掉了,重新打鏡像,跑程序。

到此,關(guān)于“flink中zk引起的重啟怎么解決”的學(xué)習(xí)就結(jié)束了,希望能夠解決大家的疑惑。理論與實(shí)踐的搭配能更好的幫助大家學(xué)習(xí),快去試試吧!若想繼續(xù)學(xué)習(xí)更多相關(guān)知識(shí),請(qǐng)繼續(xù)關(guān)注億速云網(wǎng)站,小編會(huì)繼續(xù)努力為大家?guī)?lái)更多實(shí)用的文章!

向AI問(wèn)一下細(xì)節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如果涉及侵權(quán)請(qǐng)聯(lián)系站長(zhǎng)郵箱:is@yisu.com進(jìn)行舉報(bào),并提供相關(guān)證據(jù),一經(jīng)查實(shí),將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI