1. flink run 提交流程源码分析
查看flink脚本找到执行run命令的入口类,如下:
1 | exec $JAVA_RUN $JVM_ARGS $FLINK_ENV_JAVA_OPTS "${log_setting[@]}" -classpath "`manglePathList "$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" org.apache.flink.client.cli.CliFrontend "$@ |
入口类为:org.apache.flink.client.cli.CliFrontend。 最终会调用 parseParameters(String[] args) 方法来执行命令解析,run 命令会调用 run(params) 方法,如下:
1 | switch (action) { |
run 方法代码如下
1 | protected void run(String[] args) throws Exception { |
run方法根据用户传入的参数如 main函数,jar包等信息创建出 PackagedProgram 对象,这个对象封装了用户提交的信息。从 getPackagedProgram()方法里可以看出。
1 | return PackagedProgram.newBuilder() |
查看PackagedProgram构造方法,里面会创建几个关键成员变量:
- classpaths:用户-C 参数传入的信息
- jarFile : 用户的主函数的jar
- extractedTempLibraries :提取出上面主jar包里 lib/ 文件夹下的所有jar包信息,供后面classloader使用
- userCodeClassLoader : 用户code的classloader,这个classloader会把classpaths,jarFile,extractedTempLibraries 都加入到classpath里。该userCodeClassLoader默认采用child_first优先策略
- mainClass :用户main函数方法
该构造方法如下:PackagedProgram 里 getJobJarAndDependencies 方法,该方法收集了job所有依赖的jar包,这些jar包后续会提交到集群并加入到classpath路径中。1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39private PackagedProgram(
@Nullable File jarFile,
List<URL> classpaths,
@Nullable String entryPointClassName,
Configuration configuration,
SavepointRestoreSettings savepointRestoreSettings,
String... args) throws ProgramInvocationException {
this.classpaths = checkNotNull(classpaths);
this.savepointSettings = checkNotNull(savepointRestoreSettings);
this.args = checkNotNull(args);
checkArgument(jarFile != null || entryPointClassName != null, "Either the jarFile or the entryPointClassName needs to be non-null.");
// whether the job is a Python job.
this.isPython = isPython(entryPointClassName);
// load the jar file if exists
this.jarFile = loadJarFile(jarFile);
assert this.jarFile != null || entryPointClassName != null;
// now that we have an entry point, we can extract the nested jar files (if any)
this.extractedTempLibraries = this.jarFile == null ? Collections.emptyList() : extractContainedLibraries(this.jarFile);
this.userCodeClassLoader = ClientUtils.buildUserCodeClassLoader(
getJobJarAndDependencies(),
classpaths,
getClass().getClassLoader(),
configuration);
// load the entry point class
this.mainClass = loadMainClass(
// if no entryPointClassName name was given, we try and look one up through the manifest
entryPointClassName != null ? entryPointClassName : getEntryPointClassNameFromJar(this.jarFile),
userCodeClassLoader);
if (!hasMainMethod(mainClass)) {
throw new ProgramInvocationException("The given program class does not have a main(String[]) method.");
}
}
PackagedProgram对象构造完成之后,便是创建最终的Configuration对象了,如下方法
1 | final Configuration effectiveConfiguration = getEffectiveConfiguration( |
这个方法会设置两个参数:
- pipeline.classpaths: 值为getJobJarAndDependencies()和classpaths里的url
- pipeline.jars: 值为getJobJarAndDependencies()返回的jar和lib文件夹下的依赖,后续提交集群的时候会根据这个把jar一起提交到集群
准备好 PackagedProgram和Configuration后,就开始执行用户程序了,
1 | executeProgram(effectiveConfiguration, program); |
详细代码如下:
1 | public static void executeProgram( |
最后总结一下整个流程:
- 执行flink run 命名传入相关参数
- 创建PackagedProgram对象,准备相关jar,用户类加载器,Configuration对象
- 通过反射调用用户Main方法
- 构建Pipeline StreamGraph,提交job到集群
2. 提交job时,动态加载第三方jar(如udf等)
通过分析流程我们可以发现可以有两种方式来实现动态jar的添加
- 动态的 把三方jar 放入 主函数jar包的lib目录下(可以通过jar uf 命名搞定)
因为在PackagedProgram构造方法里会通过extractContainedLibraries()方法获取jar lib目录里的所有jar,并且这些jar会一并上传到集群 - 在用户任务main函数里,通过反射动态设置 Configuration 对象的 pipeline.classpaths , pipeline.jars 这两个属性 。并且还需要把第三方jar加载到Thread.contextClassLoader里。(可参见:https://zhuanlan.zhihu.com/p/278482766)
本人在项目中直接采用的是第一种方案,不会添加更多代码。