一 Flink运行时的组件
1 2 3 4 5
| 1:JObManger 作业管理器 2:TaskManger 任务管理器 3:ResourceManger 资源管理器 4:Dispacher 分发器 5:JObClient 作业客户端
|
1 任务完整流程
Flink程序需要提交给Job Client。 然后,Job Client将作业提交给Job Manager。 Job Manager负责协调资源分配和作业执行。 它首先要做的是分配所需的资源。 资源分配完成后,任务将提交给相应的Task Manager。 在接收任务时,Task Manager启动一个线程以开始执行。 执行到位时,Task Manager会继续向Job Manager报告状态更改。 可以有各种状态,例如开始执行,正在进行或已完成。 作业执行完成后,结果将发送回客户端(Job Client)。
2 JobManger
1 2 3 4
| 1:控制一个应用程序执行的主进程,每个应用程序都会被一个不同的JM所控制执行 2:jm会先接受要执行的应用程序,这个程序包含:作业图(JobGraph),逻辑数据流图(Logical dataflow graph)和打包了所有类,库和其他资源的jar包 3:jm会把JobGraph转换为一个物理层面的数据流图,这个图被称为执行图(ExecutionGraph),包含了所有可以并发执行的任务 4:jm会向资源管理器(ResourceManger)请求执行任务必要的资源,也就是任务管理器(TaskManger)上的插槽(slot),一旦获取到了足够的资源,就会将执行图分发到真正运行他们的TaskManger上,而在运行过程中,Jobmanger会负责所有需要中央协调的操作,如检查点(checkpoint)的协调等
|
master 进程(也称为作业管理器)协调和管理程序的执行。 他们的主要职责包括调度任务,管理检查点,故障恢复等。可以有多个Masters 并行运行并分担职责。 这有助于实现高可用性。 其中一个master需要成为leader。 如果leader 节点发生故障,master 节点(备用节点)将被选为领导者。
作业管理器包含以下重要组件:
3 TaskManger
1 2 3
| 1: Flink中的工作进程,通常Flink中会有多个Taskmanger运行,每一个Tm都包含一定数量的插槽(slots),插槽的数量限制了tm能够执行的任务数量(tm的数量*slot的数量=所有的slot的数量 这个数量代表了任务的总并行执行能力(不是并行度)) 2:启动后,tm会向资源管理器注册他的插槽,收到资源管理器的指令后。tm就会将一个或多个插槽提供给Jobmanger调用,jm就可以向插槽分配任务(tasks)来执行,允许多个task共享slot 3:在执行过程中,一个tm可以跟其他运行的同一应用程序的tm交换数据
|
4 ResourceManger
1 2 3
| 1: 主要负责管理tm的插槽(slot),tm的插槽是flink中定义的处理资源的单位 2:Flink为不同的环境和资源管理工具提供了不同的资源管理器,如yarn,mesos,k8s和standalone部署 3:当jm申请插槽资源时,rm会将有空闲插槽的tm分配给jm,如果rm没有足够的插槽来满足jm的请求,他还可以向资源提供平台发起会话,以提供启动tm进程的容器
|
5 Dispatcher
1 2 3 4
| 1:可以跨作业运行,为应用提供rest的接口 2:当一个应用被提交,分发器会启动并将应用移交给一个jm 3:dis会启动一个webui,用来方便展示和监控作业执行的信息 4:dis在架构中并不是必需的,这取决于应用提交运行的方式(yarn无)
|
二 任务的提交流程
standalone的模式4和5 在集群启动的时候就已经完成
2 yarn模式(YARN-Cluster)(per_job)
图中的resourcemanger是yarn的resourcemanger,flink自己的resourcemanger在applicationMaster中。实际中flink先向自己的rm申请资源,flink的rm在向yarn的rm申请资源
三 任务调度原理
1
| 编写代码,代码经过编译打包,直接会生成一个dataFlow graph(数据流图),经过接口(client)发送期间会把能合并的数据操作进行合并,合并后得到job graph,提交给jobmanger,分析job流图,判断并行度,任务有几个子任务,需要的资源等,然后向resourcemanger申请资源,然后向worker分配任务
|
1 2 3 4
| 问题: 1:怎么实现并行计算? 2:并行任务,需要占用多少slot? 3:一个留任务到底包含多少个子任务?
|
3.1 并行度
1 2
| 一个特定算子的子任务(subtask)的个数被称之为并行度(parallelism)(一般一个子任务占用一个slot). 一般情况下,一个stream的并行度,可以认为是其所有算子中最大的并行度
|
3.2 Taskmanger和slots
1 2 3
| 1: flink中每一个taskmanger都是一个jvm进程,它可能会在独立的线程上执行一个或多个子任务 2:为了控制一个taskmanger能接受多少个task,tm通过task slot来进行控制,一个tm至少一个slot(每一个slot可以执行一个独立的任务),实际就是一个tm的内存划分为多少分(slot的个数),slot的个数代表了tm的并行静态资源,这静态资源能用多少取决于并行度设置多少 3:slot就是一块独立的资源,主要是内存的隔离
|
1 2
| 1:默认情况下,flink允许子任务共享slot,即使他们是不同任务的子任务,这样的结果是,一个slot可以保存作业的整个管道 2:tsak slot是静态的概念,是指tm具有的并发执行的能力
|
3.3 并行子任务的分配
1
| 左侧的jobgraph的最大并行度是4,所以至少需要4个slot,具体分配a的并行度是4,四个slot依次都有,b并行度4,则4个slot都有,c的并行度为2,....(至于c和e具体分配到哪,这个可以配置,可以分配到不同的tm上,flink默认是随机)
|
具体的实例
1
| 配置文件中的tm的slot的个数是3,例子1:类似于词频统计...不细说看图
|
3.4 程序与数据流
任务划分为几个子任务,有些情况会把任务合并成一个任务,有时不合,现在讨论这个问题
1 2
| 1:所有的flink程序是由三部分组成:Source,Transformation,sink 2:Source读取数据源,transformation利用算子进行处理加工,sink负责输出
|
1 2 3
| 1:在运行时,fkink上运行的程序会被映射成逻辑数据流,包含三部分(source,transformation,sink) 2:每个dataflow(df),以一个或多个source开始,以一个或多个sink结束,df类似于任意的有向无环图 3:大部分情况下,程序中的转换算子跟df中的算子(operator)是一一对应的关系,不划分stage
|
3.4.1 执行图
1 2 3 4 5
| 1:flink中的执行图可以划分为四层:StreamGraph->jobgraph->executionGraph->物理执行图 StreamGraph(sg):是根据用户通过streamapi编写的代码生成最初的图,用来表示程序的拓补结构 jobgraph(jg):sg经过优化后生成了jg,提交给jobmanger的数据结构,主要优化为将多个符合条件的节点chain在一起作为一个节点(webui上看到) executionGraph(eg):jobmanger根据jg生成eg,eg是jg的并行化版本,是调度最核心的数据结构 物理执行图:jobmanger根据eg对job进行的调度后,在各个taskmanger上部署task后形成的图,并不是一个具体的数据结构
|
3.4.2 数据传输格式
1 2 3 4
| 1:一个程序中,不同的算子可能具有不同的并行度 2:算子之间传输数据的形式可以是one-to-one(forwarding)的模式,也可以是redistributing的,模式。具体是哪一种,取决于算子的种类和并行度的设置 one-to-one:stream维护着分区以及元素的顺序(如source与map之间),这意味着map算子的子任务看到的元素的个数以及顺序跟source算子的子任务生产的元素个数,顺序相同,map,filter,flatmap等算子都是one-to-one的对应关系 Redistribution:stream的分区会发生改变,每一个算子的子任务依据所选择的转换发送数据到不同的目标任务,例如,keyby基于hashcode重分区,而broadcast和rebalance会随机重新分区,这些算子都会引起redistribute过程,而redistribution过程类似于spark中的shuffle过程
|
3.4.3 任务链
1 2 3
| 1: flink采用了一种称为任务链的优化技术,可以在特定条件下减少本地通信的开销。为了满足任务链的要求,必须将两个或多个算子设为相同的并行度,并通过 本地转发(local forward)的方式进行 链接 2:相同并行度的onetoone操作,flink这样相连的算子链接在一起形成一个task,原来的算子成为里面的subtask 3: 并行度相同,且是one-to-one的操作,两个条件缺一不可(子任务合并)
|
1 2 3 4 5 6
| 1:flink的算子有个方法disableChaining(),则这个算子不参与任务链合并,前后都断开: 2:chain的切断,在想切断的算子后方法 startNewChain() 开启一个新的链 拆开是拆开了,但是若是slot是共享的,这个拆开意义就不大了,也可以设置拆开后放在不同的slot执行,如flatmap(...).slotSharingGroup("a"), a组:是从当前算子开始,后面的算子没有划分新的组,则后面的所有算子共享这个slot组,就和之前的分开了,属于不同的组,(在同一个组内可以共享slot) 如flatmap(/...).disableChaining("a").map(....).startNewChain() ps:若是不指定,则默认所有的算子在一个共享组内为default,自己指定是不要命名为default
|