您好,登錄后才能下訂單哦!
從Flink client提交源碼看第三方j(luò)ar包的動態(tài)加載的解決方案是怎樣的,很多新手對此不是很清楚,為了幫助大家解決這個難題,下面小編將為大家詳細講解,有這方面需求的人可以來學(xué)習(xí)下,希望你能有所收獲。
查看flink腳本找到執(zhí)行run命令的入口類,如下:
exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@
入口類為:org.apache.flink.client.cli.CliFrontend。 最終會調(diào)用 parseParameters(String[] args) 方法來執(zhí)行命令解析,run 命令會調(diào)用 run(params) 方法,如下:
switch (action) { case ACTION_RUN: run(params); return 0; case ACTION_RUN_APPLICATION: runApplication(params); return 0; case ACTION_LIST: list(params); return 0; case ACTION_INFO: info(params); return 0; case ACTION_CANCEL: cancel(params); return 0; case ACTION_STOP: stop(params); return 0; case ACTION_SAVEPOINT: savepoint(params); return 0; }
run 方法代碼如下
protected void run(String[] args) throws Exception { LOG.info("Running 'run' command."); final Options commandOptions = CliFrontendParser.getRunCommandOptions(); final CommandLine commandLine = getCommandLine(commandOptions, args, true); // evaluate help flag if (commandLine.hasOption(HELP_OPTION.getOpt())) { CliFrontendParser.printHelpForRun(customCommandLines); return; } final CustomCommandLine activeCommandLine = validateAndGetActiveCommandLine(checkNotNull(commandLine)); final ProgramOptions programOptions = ProgramOptions.create(commandLine); # 創(chuàng)建 PackagedProgram 對象 final PackagedProgram program = getPackagedProgram(programOptions); #解析獲取相關(guān)依賴jar final List<URL> jobJars = program.getJobJarAndDependencies(); # 生成最終提交配置 final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars); LOG.debug("Effective executor configuration: {}", effectiveConfiguration); try { executeProgram(effectiveConfiguration, program); } finally { program.deleteExtractedLibraries(); } }
run方法根據(jù)用戶傳入的參數(shù)如 main函數(shù),jar包等信息創(chuàng)建出 PackagedProgram 對象,這個對象封裝了用戶提交的信息。從 getPackagedProgram()方法里可以看出。
return PackagedProgram.newBuilder() .setJarFile(jarFile) .setUserClassPaths(classpaths) .setEntryPointClassName(entryPointClass) .setConfiguration(configuration) .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings()) .setArguments(programArgs) .build();
查看PackagedProgram構(gòu)造方法,里面會創(chuàng)建幾個關(guān)鍵成員變量:
classpaths:用戶-C 參數(shù)傳入的信息
jarFile : 用戶的主函數(shù)的jar
extractedTempLibraries :提取出上面主jar包里 lib/ 文件夾下的所有jar包信息,供后面classloader使用
userCodeClassLoader : 用戶code的classloader,這個classloader會把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。該userCodeClassLoader默認采用child_first優(yōu)先策略
mainClass :用戶main函數(shù)方法 該構(gòu)造方法如下:
private PackagedProgram( @Nullable File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, Configuration configuration, SavepointRestoreSettings savepointRestoreSettings, String... args) throws ProgramInvocationException { this.classpaths = checkNotNull(classpaths); this.savepointSettings = checkNotNull(savepointRestoreSettings); this.args = checkNotNull(args); checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null."); // whether the job is a Python job. this.isPython = isPython(entryPointClassName); // load the jar file if exists this.jarFile = loadJarFile(jarFile); assert this.jarFile != null || entryPointClassName != null; // now that we have an entry point, we can extract the nested jar files (if any) this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile); this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader( getJobJarAndDependencies(), classpaths, getClass().getClassLoader(), configuration); // load the entry point class this.mainClass = loadMainClass( // if no entryPointClassName name was given, we try and look one up through the manifest entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile), userCodeClassLoader); if (!hasMainMethod(mainClass)) { throw new ProgramInvocationException("The given program class does not have a main(String[]) method."); } }
PackagedProgram 里 getJobJarAndDependencies 方法,該方法收集了job所有依賴的jar包,這些jar包后續(xù)會提交到集群并加入到classpath路徑中。
PackagedProgram對象構(gòu)造完成之后,便是創(chuàng)建最終的Configuration對象了,如下方法
final Configuration effectiveConfiguration = getEffectiveConfiguration( activeCommandLine, commandLine, programOptions, jobJars);
這個方法會設(shè)置兩個參數(shù):
pipeline.classpaths: 值為getJobJarAndDependencies()和classpaths里的url
pipeline.jars: 值為getJobJarAndDependencies()返回的jar和lib文件夾下的依賴,后續(xù)提交集群的時候會根據(jù)這個把jar一起提交到集群
準備好 PackagedProgram和Configuration后,就開始執(zhí)行用戶程序了,
executeProgram(effectiveConfiguration, program);
詳細代碼如下:
public static void executeProgram( PipelineExecutorServiceLoader executorServiceLoader, Configuration configuration, PackagedProgram program, boolean enforceSingleJobExecution, boolean suppressSysout) throws ProgramInvocationException { checkNotNull(executorServiceLoader); final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader(); final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader(); try { # 設(shè)置用戶上下文用戶類加載器 Thread.currentThread().setContextClassLoader(userCodeClassLoader); LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED)); ContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); StreamContextEnvironment.setAsContext( executorServiceLoader, configuration, userCodeClassLoader, enforceSingleJobExecution, suppressSysout); try { # 反射調(diào)用戶的 main 函數(shù)執(zhí)行job提交 program.invokeInteractiveModeForExecution(); } finally { ContextEnvironment.unsetAsContext(); StreamContextEnvironment.unsetAsContext(); } } finally { Thread.currentThread().setContextClassLoader(contextClassLoader); } }
最后總結(jié)一下整個流程:
執(zhí)行flink run 命名傳入相關(guān)參數(shù)
創(chuàng)建PackagedProgram對象,準備相關(guān)jar,用戶類加載器,Configuration對象
通過反射調(diào)用用戶Main方法
構(gòu)建Pipeline StreamGraph,提交job到集群
通過分析流程我們可以發(fā)現(xiàn)可以有兩種方式來實現(xiàn)動態(tài)jar的添加
動態(tài)的 把三方j(luò)ar 放入 主函數(shù)jar包的lib目錄下(可以通過jar uf 命名搞定) 因為在PackagedProgram構(gòu)造方法里會通過extractContainedLibraries()方法獲取jar lib目錄里的所有jar,并且這些jar會一并上傳到集群
在用戶任務(wù)main函數(shù)里,通過反射動態(tài)設(shè)置 Configuration 對象的 pipeline.classpaths , pipeline.jars 這兩個屬性 。并且還需要把第三方j(luò)ar加載到Thread.contextClassLoader里。(可參見:https://zhuanlan.zhihu.com/p/278482766)
本人在項目中直接采用的是第一種方案,不會添加更多代碼。
看完上述內(nèi)容是否對您有幫助呢?如果還想對相關(guān)知識有進一步的了解或閱讀更多相關(guān)文章,請關(guān)注億速云行業(yè)資訊頻道,感謝您對億速云的支持。
免責(zé)聲明:本站發(fā)布的內(nèi)容(圖片、視頻和文字)以原創(chuàng)、轉(zhuǎn)載和分享為主,文章觀點不代表本網(wǎng)站立場,如果涉及侵權(quán)請聯(lián)系站長郵箱:is@yisu.com進行舉報,并提供相關(guān)證據(jù),一經(jīng)查實,將立刻刪除涉嫌侵權(quán)內(nèi)容。