追源索骥:透过源码看懂Flink核心框架的执行流
PDF
程
flink
追源索骥:透过源码看懂Flink核心框架的执行流程
前言
1.从 Hello,World WordCount开始
1.1 flink执行环境
1.2 算子(Operator)的注册(声明)
1.3 程序的执行
1.3.1 本地模式下的execute方法
1.3.2 远程模式(RemoteEnvironment)的execute方法
1.3.3 程序启动过程
2.理解flink的图结构
2.1 flink的三层图结构
2.2 StreamGraph的生成
2.2.1 StreamTransformation类代表了流的转换
2.2.2 StreamGraph生成函数分析
2.2.3 WordCount函数的StreamGraph
2.3 JobGraph的生成
2.3.1 JobGraph生成源码
2.3.2 operator chain的逻辑
2.3.3 JobGraph的提交
2.4 ExecutionGraph的生成
3. 任务的调度与执行
3.1 计算资源的调度
3.2 JobManager执行job
3.2.1 JobManager的组件
3.2.2 JobManager的启动过程
3.2.3 JobManager启动Task
3.3 TaskManager执行task
3.3.1 TaskManager的基本组件
3.3.2 TaskManager执行Task
3.3.2.1 生成Task对象
3.3.2.2 运行Task对象
3.3.2.3 StreamTask的执行逻辑
3.4 StreamTask与StreamOperator
4. StreamOperator的抽象与实现
4.1 数据源的逻辑——StreamSource与时间模型
4.2 从数据输入到数据处理——OneInputStreamOperator &
AbstractUdfStreamOperator
4.3 StreamSink
4.4 其他算子
5. 为执行保驾护航——Fault Tolerant与保证Exactly-Once语义
5.1 Fault Tolerant演进之路
5.1.1 Storm的Record acknowledgement模式
5.1.2 Spark streaming的micro batch模式
5.1.3 Google Cloud Dataflow的事务式模型
5.1.4 Flink的分布式快照机制
5.2 checkpoint的生命周期
5.2.1 触发checkpoint
5.2.2 Task层面checkpoint的准备工作
5.2.3 操作符的状态保存及barrier传递
5.3 承载checkpoint数据的抽象:State & StateBackend
6.数据流转——Flink的数据抽象及数据交换过程
6.1 flink的数据抽象
6.1.1 MemorySegment
6.1.2 ByteBuffer与NetworkBufferPool
6.1.3 RecordWriter与Record
6.2 数据流转过程
6.2.1 整体过程
6.2.2 数据跨task传递
6.3 Credit漫谈
6.3.1 背压问题
6.3.2 使用Credit实现ATM网络流控
7.其他核心概念
7.1 EventTime时间模型
7.2 FLIP-6 部署及处理模型演进
7.2.1 现有模型不足
7.2.2 核心变更
7.2.3 Cluster Manager的架构
7.2.4 组件设计及细节
8.后记
前言
Flink是大数据处理领域最近很火的一个开源的分布式、高性能的流式处理框架,其对数据的处
理可以达到毫秒级别。本文以一个来自官网的WordCount例子为引,全面阐述flink的核心架
构及执行流程,希望读者可以借此更加深入的理解Flink逻辑。
本文跳过了一些基本概念,如果对相关概念感到迷惑,请参考官网文档。另外在本文写
作过程中,Flink正式发布了其1.5 RELEASE版本,在其发布之后完成的内容将按照1.5的
实现来组织。
1.从 Hello,World WordCount开始
首先,我们把WordCount的例子再放一遍:
public class SocketTextStreamWordCount {
public static void main(String[] args) throws Exception {
if (args.length != 2){
System.err.println("USAGE:\nSocketTextStreamWordCount
");
return;
}
String hostName = args[0];
Integer port = Integer.parseInt(args[1]);
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment
.getExecutionEnvironment();
// get input data
DataStream text = env.socketTextStream(hostName, port);
text.flatMap(new LineSplitter()).setParallelism(1)
// group by the tuple field "0" and sum up tuple field "1"
.keyBy(0)
.sum(1).setParallelism(1)
.print();
// execute program
env.execute("Java WordCount from SocketTextStream Example");
}
/**
* Implements the string tokenizer that splits sentences into words as
a user-defined
* FlatMapFunction. The function takes a line (String) and splits it in
to
* multiple pairs in the form of "(word,1)" (Tuple2<String, Integer&
gt;).
*/
public static final class LineSplitter implements
FlatMapFunction> {
@Override
public void flatMap(String value, Collector
> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2(token, 1));
}
}
}
}
}
首先从命令行中获取socket对端的ip和端口,然后启动一个执行环境,从socket中读取数
据,split成单个单词的流,并按单词进行总和的计数,最后打印出来。这个例子相信接触过大
数据计算或者函数式编程的人都能看懂,就不过多解释了。
1.1 flink执行环境
程序的启动,从这句开始。
1.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
这行代码会返回一个可用的执行环境。执行环境是整个flink程序执行的上下文,记录了相关配
置(如并行度等),并提供了一系列方法,如读取输入流的方法,以及真正开始运行整个代码
的execute方法等。对于分布式流处理程序来说,我们在代码中定义的flatMap,keyBy等等操
作,事实上可以理解为一种声明,告诉整个程序我们采用了什么样的算子,而真正开启计算的
代码不在此处。由于我们是在本地运行flink程序,因此这行代码会返回一个
LocalStreamEnvironment,最后我们要调用它的execute方法来开启真正的任务。我们先接
着往下看。
1.2 算子(Operator)的注册(声明)
我们以flatMap为例, text.flatMap(new LineSplitter()) 这一句话跟踪进去是这样的:
1.
public SingleOutputStreamOperator flatMap(FlatMapFunction
flatMapper) {
2.
3.
4.
5.
6.
7.
8.
TypeInformation outType =
TypeExtractor.getFlatMapReturnTypes(clean(flatMapper),
getType(), Utils.getCallLocationName(), true);
return transform("Flat Map", outType, new StreamFlatMap<>(clean
(flatMapper)));
}
里面完成了两件事,一是用反射拿到了flatMap算子的输出类型,二是生成了一个Operator。
flink流式计算的核心概念,就是将数据从输入流一个个传递给Operator进行链式处理,最后
交给输出流的过程。对数据的每一次处理在逻辑上成为一个operator,并且为了本地化处理的
效率起见,operator之间也可以串成一个chain一起处理(可以参考责任链模式帮助理解)。
下面这张图表明了flink是如何看待用户的处理流程的:抽象化为一系列operator,以source
开始,以sink结尾,中间的operator做的操作叫做transform,并且可以把几个操作串在一起
执行。
我们也可以更改flink的设置,要求它不要对某个操作进行chain处理,或者从某个操作开启一
个新chain等。
上面代码中的最后一行transform方法的作用是返回一个SingleOutputStreamOperator,它
继承了Datastream类并且定义了一些辅助方法,方便对流的操作。在返回之前,transform方
法还把它注册到了执行环境中(后面生成执行图的时候还会用到它)。其他的操作,包括
keyBy,sum和print,都只是不同的算子,在这里出现都是一样的效果,即生成一个operator
并注册给执行环境用于生成DAG。
1.3 程序的执行
程序执行即 env.execute("Java WordCount from SocketTextStream Example") 这行代
码。
1.3.1 本地模式下的execute方法
这行代码主要做了以下事情:
生成StreamGraph。代表程序的拓扑结构,是从用户代码直接生成的图。
生成JobGraph。这个图是要交给flink去生成task的图。
生成一系列配置
将JobGraph和配置交给flink集群去运行。如果不是本地运行的话,还会把jar文件通过网
络发给其他节点。
以本地模式运行的话,可以看到启动过程,如启动性能度量、web模块、JobManager、
ResourceManager、taskManager等等
启动任务。值得一提的是在启动任务之前,先启动了一个用户类加载器,这个类加载器可
以用来做一些在运行时动态加载类的工作。
1.3.2 远程模式(RemoteEnvironment)的execute方法
远程模式的程序执行更加有趣一点。第一步仍然是获取StreamGraph,然后调用
executeRemotely方法进行远程执行。
该方法首先创建一个用户代码加载器
1.
ClassLoader usercodeClassLoader = JobWithJars.buildUserCodeClassLoader
(jarFiles, globalClasspaths, getClass().getClassLoader());
然后创建一系列配置,交给Client对象。Client这个词有意思,看见它就知道这里绝对是跟远
程集群打交道的客户端。
1.
2.
3.
4.
5.
6.
7.
8.
9.
ClusterClient client;
try {
client = new StandaloneClusterClient(configuration);
client.setPrintStatusDuringExecution(getConfig().isSysoutLo
ggingEnabled());
}
}
try {
return client.run(streamGraph, jarFiles, globalClasspaths,
usercodeClassLoader).getJobExecutionResult();
10.
}
client的run方法首先生成一个JobGraph,然后将其传递给JobClient。关于Client、
JobClient、JobManager到底谁管谁,可以看这张图:
确切的说,JobClient负责以异步的方式和JobManager通信(Actor是scala的异步模块),
具体的通信任务由JobClientActor完成。相对应的,JobManager的通信任务也由一个Actor
完成。
1.
2.
3.
4.
JobListeningContext jobListeningContext = submitJob(
actorSystem,config,highAvailabilityServices,jobGraph,ti
meout,sysoutLogUpdates, classLoader);
return awaitJobResult(jobListeningContext);
可以看到,该方法阻塞在awaitJobResult方法上,并最终返回了一个JobListeningContext,
透过这个Context可以得到程序运行的状态和结果。
1.3.3 程序启动过程
上面提到,整个程序真正意义上开始执行,是这里:
1.
env.execute("Java WordCount from SocketTextStream Example");
远程模式和本地模式有一点不同,我们先按本地模式来调试。
我们跟进源码,(在本地调试模式下)会启动一个miniCluster,然后开始执行代码:
1.
2.
3.
4.
5.
6.
7.
8.
9.
// LocalStreamEnvironment.java
@Override
public JobExecutionResult execute(String jobName) throws Exception
{
//生成各种图结构
......
try {