千家信息网

从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的

发表于:2025-12-01 作者:千家信息网编辑
千家信息网最后更新 2025年12月01日,从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收
千家信息网最后更新 2025年12月01日从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的

从Flink client提交源码看第三方jar包的动态加载的解决方案是怎样的,很多新手对此不是很清楚,为了帮助大家解决这个难题,下面小编将为大家详细讲解,有这方面需求的人可以来学习下,希望你能有所收获。

1. flink run 提交流程源码分析

查看flink脚本找到执行run命令的入口类,如下:

exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@

入口类为:org.apache.flink.client.cli.CliFrontend。 最终会调用 parseParameters(String[] args) 方法来执行命令解析,run 命令会调用 run(params) 方法,如下:

switch (action) {        case ACTION_RUN:                run(params);                return 0;        case ACTION_RUN_APPLICATION:                runApplication(params);                return 0;        case ACTION_LIST:                list(params);                return 0;        case ACTION_INFO:                info(params);                return 0;        case ACTION_CANCEL:                cancel(params);                return 0;        case ACTION_STOP:                stop(params);                return 0;        case ACTION_SAVEPOINT:                savepoint(params);                return 0;}

run 方法代码如下

protected void run(String[] args) throws Exception {                LOG.info("Running 'run' command.");                final Options commandOptions = CliFrontendParser.getRunCommandOptions();                final CommandLine commandLine = getCommandLine(commandOptions, args, true);                // evaluate help flag                if (commandLine.hasOption(HELP_OPTION.getOpt())) {                        CliFrontendParser.printHelpForRun(customCommandLines);                        return;                }                final CustomCommandLine activeCommandLine =                                validateAndGetActiveCommandLine(checkNotNull(commandLine));                final ProgramOptions programOptions = ProgramOptions.create(commandLine);        # 创建 PackagedProgram 对象                final PackagedProgram program =                                getPackagedProgram(programOptions);        #解析获取相关依赖jar                final List jobJars = program.getJobJarAndDependencies();                # 生成最终提交配置        final Configuration effectiveConfiguration = getEffectiveConfiguration(                                activeCommandLine, commandLine, programOptions, jobJars);                LOG.debug("Effective executor configuration: {}", effectiveConfiguration);                try {                        executeProgram(effectiveConfiguration, program);                } finally {                        program.deleteExtractedLibraries();                }        }

run方法根据用户传入的参数如 main函数,jar包等信息创建出 PackagedProgram 对象,这个对象封装了用户提交的信息。从 getPackagedProgram()方法里可以看出。

return PackagedProgram.newBuilder()                        .setJarFile(jarFile)                        .setUserClassPaths(classpaths)                        .setEntryPointClassName(entryPointClass)                        .setConfiguration(configuration)                        .setSavepointRestoreSettings(runOptions.getSavepointRestoreSettings())                        .setArguments(programArgs)                        .build();

查看PackagedProgram构造方法,里面会创建几个关键成员变量:

  • classpaths:用户-C 参数传入的信息

  • jarFile : 用户的主函数的jar

  • extractedTempLibraries :提取出上面主jar包里 lib/ 文件夹下的所有jar包信息,供后面classloader使用

  • userCodeClassLoader : 用户code的classloader,这个classloader会把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。该userCodeClassLoader默认采用child_first优先策略

  • mainClass :用户main函数方法 该构造方法如下:

private PackagedProgram(                        @Nullable File jarFile,                        List classpaths,                        @Nullable String entryPointClassName,                        Configuration configuration,                        SavepointRestoreSettings savepointRestoreSettings,                        String... args) throws ProgramInvocationException {                this.classpaths = checkNotNull(classpaths);                this.savepointSettings = checkNotNull(savepointRestoreSettings);                this.args = checkNotNull(args);                checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null.");                // whether the job is a Python job.                this.isPython = isPython(entryPointClassName);                // load the jar file if exists                this.jarFile = loadJarFile(jarFile);                assert this.jarFile != null || entryPointClassName != null;                // now that we have an entry point, we can extract the nested jar files (if any)                this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile);                this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(                        getJobJarAndDependencies(),                        classpaths,                        getClass().getClassLoader(),                        configuration);                // load the entry point class                this.mainClass = loadMainClass(                        // if no entryPointClassName name was given, we try and look one up through the manifest                        entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),                        userCodeClassLoader);                if (!hasMainMethod(mainClass)) {                        throw new ProgramInvocationException("The given program class does not have a main(String[]) method.");                }        }

PackagedProgram 里 getJobJarAndDependencies 方法,该方法收集了job所有依赖的jar包,这些jar包后续会提交到集群并加入到classpath路径中。

PackagedProgram对象构造完成之后,便是创建最终的Configuration对象了,如下方法

final Configuration effectiveConfiguration = getEffectiveConfiguration(                                activeCommandLine, commandLine, programOptions, jobJars);

这个方法会设置两个参数:

  • pipeline.classpaths: 值为getJobJarAndDependencies()和classpaths里的url

  • pipeline.jars: 值为getJobJarAndDependencies()返回的jar和lib文件夹下的依赖,后续提交集群的时候会根据这个把jar一起提交到集群

准备好 PackagedProgram和Configuration后,就开始执行用户程序了,

executeProgram(effectiveConfiguration, program);

详细代码如下:

public static void executeProgram(                        PipelineExecutorServiceLoader executorServiceLoader,                        Configuration configuration,                        PackagedProgram program,                        boolean enforceSingleJobExecution,                        boolean suppressSysout) throws ProgramInvocationException {                checkNotNull(executorServiceLoader);                final ClassLoader userCodeClassLoader = program.getUserCodeClassLoader();                final ClassLoader contextClassLoader = Thread.currentThread().getContextClassLoader();                try {# 设置用户上下文用户类加载器Thread.currentThread().setContextClassLoader(userCodeClassLoader);                        LOG.info("Starting program (detached: {})", !configuration.getBoolean(DeploymentOptions.ATTACHED));                        ContextEnvironment.setAsContext(                                executorServiceLoader,                                configuration,                                userCodeClassLoader,                                enforceSingleJobExecution,                                suppressSysout);                        StreamContextEnvironment.setAsContext(                                executorServiceLoader,                                configuration,                                userCodeClassLoader,                                enforceSingleJobExecution,                                suppressSysout);                        try {                # 反射调用户的 main 函数执行job提交                                program.invokeInteractiveModeForExecution();                        } finally {                                ContextEnvironment.unsetAsContext();                                StreamContextEnvironment.unsetAsContext();                        }                } finally {                        Thread.currentThread().setContextClassLoader(contextClassLoader);                }        }

最后总结一下整个流程:

  1. 执行flink run 命名传入相关参数

  2. 创建PackagedProgram对象,准备相关jar,用户类加载器,Configuration对象

  3. 通过反射调用用户Main方法

  4. 构建Pipeline StreamGraph,提交job到集群

2. 提交job时,动态加载第三方jar(如udf等)

通过分析流程我们可以发现可以有两种方式来实现动态jar的添加

  1. 动态的 把三方jar 放入 主函数jar包的lib目录下(可以通过jar uf 命名搞定) 因为在PackagedProgram构造方法里会通过extractContainedLibraries()方法获取jar lib目录里的所有jar,并且这些jar会一并上传到集群

  2. 在用户任务main函数里,通过反射动态设置 Configuration 对象的 pipeline.classpaths , pipeline.jars 这两个属性 。并且还需要把第三方jar加载到Thread.contextClassLoader里。(可参见:https://zhuanlan.zhihu.com/p/278482766)

本人在项目中直接采用的是第一种方案,不会添加更多代码。

看完上述内容是否对您有帮助呢?如果还想对相关知识有进一步的了解或阅读更多相关文章,请关注行业资讯频道,感谢您对的支持。

0