溫馨提示×

溫馨提示×

您好,登錄后才能下訂單哦!

密碼登錄×
登錄注冊×
其他方式登錄
點擊 登錄注冊 即表示同意《億速云用戶服務(wù)條款》

從Flink client提交源碼看第三方j(luò)ar包的動態(tài)加載的解決方案是怎樣的

發(fā)布時間:2021-12-07 10:38:21 來源:億速云 閱讀:658 作者:柒染 欄目:大數(shù)據(jù)

從Flink client提交源碼看第三方j(luò)ar包的動態(tài)加載的解決方案是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。

1. flink run 提交流程源碼分析

查看flink腳本找到執(zhí)行run命令的入口類,如下:

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@

入口類為:org.apache.flink.client.cli.CliFrontend。 最終會調(diào)用 parseParameters(String[] args) 方法來執(zhí)行命令解析,run 命令會調(diào)用 run(params) 方法,如下:

switch (action) {
	case ACTION_RUN:
		run(params);
		return 0;
	case ACTION_RUN_APPLICATION:
		runApplication(params);
		return 0;
	case ACTION_LIST:
		list(params);
		return 0;
	case ACTION_INFO:
		info(params);
		return 0;
	case ACTION_CANCEL:
		cancel(params);
		return 0;
	case ACTION_STOP:
		stop(params);
		return 0;
	case ACTION_SAVEPOINT:
		savepoint(params);
		return 0;
}

run 方法代碼如下

protected void run(String[] args) throws Exception {
		LOG.info("Running 'run' command.");

		final Options commandOptions = CliFrontendParser.getRunCommandOptions();
		final CommandLine commandLine = getCommandLine(commandOptions, args, true);

		// evaluate help flag
		if (commandLine.hasOption(HELP_OPTION.getOpt())) {
			CliFrontendParser.printHelpForRun(customCommandLines);
			return;
		}

		final CustomCommandLine activeCommandLine =
				validateAndGetActiveCommandLine(checkNotNull(commandLine));

		final ProgramOptions programOptions = ProgramOptions.create(commandLine);
        # 創(chuàng)建 PackagedProgram 對象
		final PackagedProgram program =
				getPackagedProgram(programOptions);
        #解析獲取相關(guān)依賴jar
		final List<URL> jobJars = program.getJobJarAndDependencies();
		# 生成最終提交配置
        final Configuration effectiveConfiguration = getEffectiveConfiguration(
				activeCommandLine, commandLine, programOptions, jobJars);

		LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

		try {
			executeProgram(effectiveConfiguration, program);
		} finally {
			program.deleteExtractedLibraries();
		}
	}

run方法根據(jù)用戶傳入的參數(shù)如 main函數(shù),jar包等信息創(chuàng)建出 PackagedProgram 對象,這個對象封裝了用戶提交的信息。從 getPackagedProgram()方法里可以看出。

return PackagedProgram.newBuilder()
			.setJarFile(jarFile)
			.setUserClassPaths(classpaths)
			.setEntryPointClassName(entryPointClass)
			.setConfiguration(configuration)
			.setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())
			.setArguments(programArgs)
			.build();

查看PackagedProgram構(gòu)造方法,里面會創(chuàng)建幾個關(guān)鍵成員變量:

  • classpaths:用戶-C 參數(shù)傳入的信息

  • jarFile : 用戶的主函數(shù)的jar

  • extractedTempLibraries :提取出上面主jar包里 lib/ 文件夾下的所有jar包信息,供后面classloader使用

  • userCodeClassLoader : 用戶code的classloader,這個classloader會把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。該userCodeClassLoader默認采用child_first優(yōu)先策略

  • mainClass :用戶main函數(shù)方法 該構(gòu)造方法如下:

private PackagedProgram(
			@Nullable File jarFile,
			List<URL> classpaths,
			@Nullable String entryPointClassName,
			Configuration configuration,
			SavepointRestoreSettings savepointRestoreSettings,
			String... args) throws ProgramInvocationException {
		this.classpaths = checkNotNull(classpaths);
		this.savepointSettings = checkNotNull(savepointRestoreSettings);
		this.args = checkNotNull(args);

		checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null.");

		// whether the job is a Python job.
		this.isPython = isPython(entryPointClassName);

		// load the jar file if exists
		this.jarFile = loadJarFile(jarFile);

		assert this.jarFile != null || entryPointClassName != null;

		// now that we have an entry point, we can extract the nested jar files (if any)
		this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile);
		this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
			getJobJarAndDependencies(),
			classpaths,
			getClass().getClassLoader(),
			configuration);

		// load the entry point class
		this.mainClass = loadMainClass(
			// if no entryPointClassName name was given, we try and look one up through the manifest
			entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
			userCodeClassLoader);

		if (!hasMainMethod(mainClass)) {
			throw new ProgramInvocationException("The given program class does not have a main(String[]) method.");
		}
	}

PackagedProgram 里 getJobJarAndDependencies 方法,該方法收集了job所有依賴的jar包,這些jar包后續(xù)會提交到集群并加入到classpath路徑中。

PackagedProgram對象構(gòu)造完成之后,便是創(chuàng)建最終的Configuration對象了,如下方法

final Configuration effectiveConfiguration = getEffectiveConfiguration(
				activeCommandLine, commandLine, programOptions, jobJars);

這個方法會設(shè)置兩個參數(shù):

  • pipeline.classpaths: 值為getJobJarAndDependencies()和classpaths里的url

  • pipeline.jars: 值為getJobJarAndDependencies()返回的jar和lib文件夾下的依賴,后續(xù)提交集群的時候會根據(jù)這個把jar一起提交到集群

準備好 PackagedProgram和Configuration后,就開始執(zhí)行用戶程序了,

executeProgram(effectiveConfiguration, program);

詳細代碼如下:

public static void executeProgram(
			PipelineExecutorServiceLoader executorServiceLoader,
			Configuration configuration,
			PackagedProgram program,
			boolean enforceSingleJobExecution,
			boolean suppressSysout) throws ProgramInvocationException {
		checkNotNull(executorServiceLoader);
		final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();
		final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();
		try {
# 設(shè)置用戶上下文用戶類加載器
Thread.currentThread().setContextClassLoader(userCodeClassLoader);

			LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));

			ContextEnvironment.setAsContext(
				executorServiceLoader,
				configuration,
				userCodeClassLoader,
				enforceSingleJobExecution,
				suppressSysout);

			StreamContextEnvironment.setAsContext(
				executorServiceLoader,
				configuration,
				userCodeClassLoader,
				enforceSingleJobExecution,
				suppressSysout);

			try {
                # 反射調(diào)用戶的 main 函數(shù)執(zhí)行job提交
				program.invokeInteractiveModeForExecution();
			} finally {
				ContextEnvironment.unsetAsContext();
				StreamContextEnvironment.unsetAsContext();
			}
		} finally {
			Thread.currentThread().setContextClassLoader(contextClassLoader);
		}
	}

最后總結(jié)一下整個流程:

  1. 執(zhí)行flink run 命名傳入相關(guān)參數(shù)

  2. 創(chuàng)建PackagedProgram對象,準備相關(guān)jar,用戶類加載器,Configuration對象

  3. 通過反射調(diào)用用戶Main方法

  4. 構(gòu)建Pipeline StreamGraph,提交job到集群

2. 提交job時,動態(tài)加載第三方j(luò)ar(如udf等)

通過分析流程我們可以發(fā)現(xiàn)可以有兩種方式來實現(xiàn)動態(tài)jar的添加

  1. 動態(tài)的 把三方j(luò)ar 放入 主函數(shù)jar包的lib目錄下(可以通過jar uf 命名搞定) 因為在PackagedProgram構(gòu)造方法里會通過extractContainedLibraries()方法獲取jar lib目錄里的所有jar,并且這些jar會一并上傳到集群

  2. 在用戶任務(wù)main函數(shù)里,通過反射動態(tài)設(shè)置 Configuration 對象的 pipeline.classpaths , pipeline.jars 這兩個屬性 。并且還需要把第三方j(luò)ar加載到Thread.contextClassLoader里。(可參見:https://zhuanlan.zhihu.com/p/278482766)

本人在項目中直接采用的是第一種方案,不會添加更多代碼。

看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。

向AI問一下細節(jié)

免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。

AI