一 简介
流式计算分为无状态和有状态两种情况
- 无状态的计算观察每个独立事件,并根据最后一个事件输出结果。
- 有状态的计算则会基于多个事件输出结果。
1 | 无状态流处理分别接收每条数据记录,然后根据最新输入的数据生成输出数据 |
- 有一个任务维护,并且用来计算某个结果的所有数据,都属于这个任务的状态
- 可以认为状态就是一个本地变量,可以被任务的业务逻辑访问
- flink会进行状态管理,包括状态一致性,故障处理以及高效存储和访问,以便开发人员可以专注于应用程序的逻辑
二 分类
在flink中,状态始终与特定算子相关联,为了使运行的flink了解算子的状态,算子需要预先注册其状态
托管状态(Managed State)和原始状态(Raw State)Flink 的状态有两种:托管状态(Managed State)和原始状态(Raw State)。托管状态就是由 Flink 统一管理的,状态的存储访问、故障恢复和重组等一系列问题都由 Flink 实现,我们只要调接口就可以;而原始状态则是自定义的,相当于就是开辟了一块内存,需要我们自己管理,实现状态的序列化和故障恢复。具体来讲,托管状态是由 Flink 的运行时(Runtime)来托管的;在配置容错机制后,状态会自动持久化保存,并在发生故障时自动恢复。当应用发生横向扩展时,状态也会自动地重组分配到所有的子任务实例上。对于具体的状态内容,Flink 也提供了值状态(ValueState)、列表状态(ListState)、映射状态(MapState)、聚合状态(AggregateState)等多种结构,内部支持各种数据类型。聚合、窗口等算子中内置的状态,就都是托管状态;我们也可以在富函数类(RichFunction)中通过上下文来自定义状态,这些也都是托管状态。而对比之下,原始状态就全部需要自定义了。Flink 不会对状态进行任何自动操作,也不知道状态的具体数据类型,只会把它当作最原始的字节(Byte)数组来存储。我们需要花费大量的精力来处理状态的管理和维护。所以只有在遇到托管状态无法实现的特殊需求时,我们才会考虑使用原始状态;一般情况下不推荐使用。绝大多数应用场景,我们都可以用 Flink 提供的算子或者自定义托管状态来实现需求。
算子状态(Operator State)和按键分区状态(Keyed State)
我们知道在 Flink 中,一个算子任务会按照并行度分为多个并行子任务执行,而不同的子241任务会占据不同的任务槽(task slot)。由于不同的 slot 在计算资源上是物理隔离的,所以 Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。而很多有状态的操作(比如聚合、窗口)都是要先做 keyBy 进行按键分区的。按键分区之后,任务所进行的所有计算都应该只针对当前 key 有效,所以状态也应该按照 key 彼此隔离。在这种情况下,状态的访问方式又会有所不同。基于这样的想法,我们又可以将托管状态分为两类:算子状态和按键分区状态
1 | 算子状态 |
总的来说,有两种类型的状态:
1 | 算子状态(operator state) |
三 算子状态
算子状态(Operator State)就是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的 key 无关,所以不同 key 的数据只要被分发到同一个并行子任务,就会访问到同一个 Operator State。
算子状态的实际应用场景不如 Keyed State 多,一般用在 Source 或 Sink 等与外部系统连接的算子上,或者完全没有 key 定义的场景。比如 Flink 的 Kafka 连接器中,就用到了算子状态。在我们给 Source 算子设置并行度后,Kafka 消费者的每一个并行实例,都会为对应的主题261(topic)分区维护一个偏移量, 作为算子状态保存起来。这在保证 Flink 应用“精确一次”(exactly-once)状态一致性时非常有用
1 | 1:算子状态的作用范围限定为算子任务,由同一并行任务所处理的所有数据都可以访问到相同的状态 |
算子状态的数据结构
1 | ① 列表状态(List state),将状态表示为一组数据的列表;(会根据并行度的调整把之前的状态重新分组重新分配)当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当 |
1 代码实现
我们已经知道,状态从本质上来说就是算子并行子任务实例上的一个特殊本地变量。它的特殊之处就在于 Flink 会提供完整的管理机制,来保证它的持久化保存,以便发生故障时进行状态恢复;另外还可以针对不同的 key 保存独立的状态实例。按键分区状态(Keyed State)对这两个功能都要考虑;而算子状态(Operator State)并不考虑 key 的影响,所以主要任务就是要让 Flink 了解状态的信息、将状态数据持久化后保存到外部存储空间。看起来算子状态的使用应该更加简单才对。不过仔细思考又会发现一个问题:我们对状态进行持久化保存的目的是为了故障恢复;在发生故障、重启应用后,数据还会被发往之前分配的分区吗?显然不是,因为并行度可能发生了调整,不论是按键(key)的哈希值分区,还是直接轮询(round-robin)分区,数据分配到的分区都会发生变化。这很好理解,当打牌的人数从 3 个增加到 4 个时,即使牌的次序不变,轮流发到每个人手里的牌也会不同。数据分区发生变化,带来的问题就是,怎么保证原先的状态跟故障恢复后数据的对应关系呢?对于 Keyed State 这个问题很好解决:状态都是跟 key 相关的,而相同 key 的数据不管发往哪个分区,总是会全部进入一个分区的;于是只要将状态也按照 key 的哈希值计算出对应的分区,进行重组分配就可以了。恢复状态后继续处理数据,就总能按照 key 找到对应之前的状态,就保证了结果的一致性。所以 Flink 对 Keyed State 进行了非常完善的包装,我们不需实现任何接口就可以直接使用。而对于 Operator State 来说就会有所不同。因为不存在 key,所有数据发往哪个分区是不可预测的;也就是说,当发生故障重启之后,我们不能保证某个数据跟之前一样,进入到同一个并行子任务、访问同一个状态。所以 Flink 无法直接判断该怎样保存和恢复状态,而是提供了接口,让我们根据业务需求自行设计状态的快照保存(snapshot)和恢复(restore)逻辑。
自定义的 SinkFunction 会在CheckpointedFunction 中进行数据缓存,然后统一发送到下游
1 | public class BufferingSinkExample { |
当初始化好状态对象后,我们可以通过调用. isRestored()方法判断是否是从故障中恢复。在代码中 BufferingSink 初始化时,恢复出的 ListState 的所有元素会添加到一个局部变量bufferedElements 中,以后进行检查点快照时就可以直接使用了。在调用.snapshotState()时,直接清空 ListState,然后把当前局部变量中的所有元素写入到检查点中。对于不同类型的算子状态,需要调用不同的获取状态对象的接口,对应地也就会使用不同的状态分配重组算法。比如获取列表状态时,调用.getListState() 会使用最简单的 平均分割重组(even-split redistribution)算法;而获取联合列表状态时,调用的是.getUnionListState() ,对应就会使用联合重组(union redistribution) 算法
2 广播状态
算子状态中有一类很特殊,就是广播状态(Broadcast State)。从概念和原理上讲,广播状态非常容易理解:状态广播出去,所有并行子任务的状态都是相同的;并行度调整时只要直接复制就可以了。然而在应用上,广播状态却与其他算子状态大不相同。广播状态与其他算子状态的列表(list)结构不同,底层是以键值对(key-value)形式描述的,所以其实就是一个映射状态(MapState)
1 | 场景举例: |
在代码上,可以直接调用 DataStream 的.broadcast()方法,传入一个“映射状态描述器”(MapStateDescriptor)说明状态的名称和类型,就可以得到一个“广播流”(BroadcastStream);进而将要处理的数据流与这条广播流进行连接(connect),就会得到“广播连接流”(BroadcastConnectedStream)。注意广播状态只能用在广播连接流中
1 | API介绍: |
1: 如果DataStream是Keyed Stream ,则连接到Broadcasted Stream 后, 添加处理ProcessFunction 时需要使用KeyedBroadcastProcessFunction 来实现, 下面是KeyedBroadcastProcessFunction 的API
1 | public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> extends BaseBroadcastProcessFunction { |
上面泛型中的各个参数的含义,说明如下:
KS:表示Flink 程序从最上游的Source Operator 开始构建Stream,当调用keyBy 时所依赖的Key 的类型;
IN1:表示非Broadcast 的Data Stream 中的数据记录的类型;
IN2:表示Broadcast Stream 中的数据记录的类型;
OUT:表示经过KeyedBroadcastProcessFunction 的processElement()和processBroadcastElement()方法处理后输出结果数据记录的类型。
2 如果Data Stream 是Non-Keyed Stream,则连接到Broadcasted Stream 后,添加处理ProcessFunction 时需要使用BroadcastProcessFunction 来实现, 下面是BroadcastProcessFunction 的API,代码如下所示:
1 | public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction { |
注意事项:
Broadcast State 是Map 类型,即K-V 类型。
Broadcast State 只有在广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processBroadcastElement 方法中可以修改。在非广播的一侧, 即在BroadcastProcessFunction 或KeyedBroadcastProcessFunction 的processElement 方法中只读。
Broadcast State 中元素的顺序,在各Task 中可能不同。基于顺序的处理,需要注意。
Broadcast State 在Checkpoint 时,每个Task 都会Checkpoint 广播状态。
Broadcast State 在运行时保存在内存中,目前还不能保存在RocksDB State Backend 中。
1 实现配置动态更新
实时过滤出配置中的用户,并在事件流中补全这批用户的基础信息。
1事件流:表示用户在某个时刻浏览或点击了某个商品,格式如下
1 | {"userID": "user_3", "eventTime": "2019-08-17 12:19:47", "eventType": "browse", "productID": 1} |
2 配置数据: 表示用户的详细信息,在Mysql中,如下
1 | DROP TABLE IF EXISTS user_info; |
2 输出结果
1 | (user_3,2019-08-17 12:19:47,browse,1,王五,33) |
步骤:
1 | 1. env |
1 | import org.apache.flink.api.common.state.BroadcastState; |
四 键控状态
1 | 1:键控状态是根据输入数据流中定义的键来维护和访问的 |
键控状态的数据结构
1 | ① 值状态(ValueState<T>),将状态表示为单个值;(直接.value获取,Set操作是.update) |
4.1 键控状态的使用
因为声明一个键控状态需要获取上下文对象,所以必须在富函数中声明,在富函数中,我们可以调用.getRuntimeContext()获取当前的运行时上下文(RuntimeContext),进而获取到访问状态的句柄;这种富函数中自定义的状态也是 Keyed State
声明一个键控状态
1 | class MyReduce extends ReduceFunction[SalePrice]{ |
读取状态
1 | class MyReduce extends ReduceFunction[SalePrice]{ |
对状态赋值
1 | class MyReduce extends ReduceFunction[SalePrice]{ |
1 值状态
我们这里会使用用户 id 来进行分流,然后分别统计每个用户的 pv 数据,由于我们并不想每次 pv 加一,就将统计结果发送到下游去,所以这里我们注册了一个定时器,用来隔一段时间发送 pv 的统计结果,这样对下游算子的压力不至于太大。具体实现方式是定义一个用来保存定时器时间戳的值状态变量。当定时器触发并向下游发送数据以后,便清空储存定时器时间戳的状态变量,这样当新的数据到来时,发现并没有定时器存在,就可以注册新的定时器了,注册完定时器之后将定时器的时间戳继续保存在状态变量中。
1 | // 统计每个用户的 pv,隔一段时间(10s)输出一次结果 |
2 映射状态
映射状态的用法和 Java 中的 HashMap 很相似。在这里我们可以通过 MapState 的使用来探索一下窗口的底层实现,也就是我们要用映射状态来完整模拟窗口的功能。这里我们模拟一个滚动窗口。我们要计算的是每一个 url 在每一个窗口中的 pv 数据。我们之前使用增量聚合和全窗口聚合结合的方式实现过这个需求。这里我们用 MapState 再来实现一下
1 | // 统计每 10s 窗口内,每个 url 的 pv |
3 聚合状态
我们举一个简单的例子,对用户点击事件流每 5 个数据统计一次平均时间戳。这是一个类似计数窗口(CountWindow)求平均值的计算,这里我们可以使用一个有聚合状态的RichFlatMapFunction 来实现
1 | // 统计每个用户的点击频次,到达 5 次就输出统计结果 |
4.2 状态生存时间(TTL)
在实际应用中,很多状态会随着时间的推移逐渐增长,如果不加以限制,最终就会导致存储空间的耗尽。一个优化的思路是直接在代码中调用.clear()方法去清除状态,但是有时候我们的逻辑要求不能直接清除。这时就需要配置一个状态的“生存时间”(time-to-live,TTL),当状态在内存中存在的时间超出这个值时,就将它清除。具体实现上,如果用一个进程不停地扫描所有状态看是否过期,显然会占用大量资源做无用功。状态的失效其实不需要立即删除,所以我们可以给状态附加一个属性,也就是状态的“失效时间”。状态创建的时候,设置 失效时间 = 当前时间 + TTL;之后如果有对状态的访问和修改,我们可以再对失效时间进行更新;当设置的清除条件被触发时(比如,状态被访问的时候,或者每隔一段时间扫描一次失效状态),就可以判断状态是否失效、从而进行清除了。配置状态的 TTL 时,需要创建一个 StateTtlConfig 配置对象,然后调用状态描述器的.enableTimeToLive()方法启动 TTL 功能。
1 | StateTtlConfig ttlConfig = StateTtlConfig .newBuilder(Time.seconds(10)) .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build();ValueStateDescriptor stateDescriptor = new ValueStateDescriptor<>("mystate", String.class);260stateDescriptor.enableTimeToLive(ttlConfig) |
;这里用到了几个配置项:
⚫ .newBuilder()状态 TTL 配置的构造器方法,必须调用,返回一个 Builder 之后再调用.build()方法就可以得到 StateTtlConfig 了。方法需要传入一个 Time 作为参数,这就是设定的状态生存时间。
⚫ .setUpdateType()设置更新类型。更新类型指定了什么时候更新状态失效时间,这里的 OnCreateAndWrite表示只有创建状态和更改状态(写操作)时更新失效时间。另一种类型 OnReadAndWrite 则表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间。这个配置默认为 OnCreateAndWrite。
⚫ .setStateVisibility()设置状态的可见性。所谓的“状态可见性”,是指因为清除操作并不是实时的,所以当状态过期之后还有可能基于存在,这时如果对它进行访问,能否正常读取到就是一个问题了。这里设置的 NeverReturnExpired 是默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除了,应用不能继续读取;这在处理会话或者隐私数据时比较重要。对应的另一种配置是 ReturnExpireDefNotCleanedUp,就是如果过期状态还存在,就返回它的值。除此之外,TTL 配置还可以设置在保存检查点(checkpoint)时触发清除操作,或者配置增量的清理(incremental cleanup),还可以针对 RocksDB 状态后端使用压缩过滤器(compactionfilter)进行后台清理。关于检查点和状态后端的内容,我们会在后续章节继续讲解。这里需要注意,目前的 TTL 设置只支持处理时间。另外,所有集合类型的状态(例如ListState、MapState)在设置 TTL 时,都是针对每一项(per-entry)元素的。也就是说,一个列表状态中的每一个元素,都会以自己的失效时间来进行清理,而不是整个列表一起清理。
4.3 状态编程例子
利用Keyed State,实现这样一个需求:检测传感器的温度值,如果连续的两个温度差值超过10度,就输出报警。
1 | //方法一: |
1 | import akka.pattern.BackoffSupervisor.RestartCount |
五 状态后端
状态管理(存储、访问、维护和检查点)
1.每传入一条数据,有状态的算子任务都会读取和更新状态;
2.由于有效的状态访问对于处理数据的低延迟至关重要,因此每个并行任务都会在本地内存维护其状态,以确保快速的状态访问。
3.状态的存储、访问以及维护,由一个可插入的组件决定,这个组件就叫做状态后端(State Backend).
4.状态后端主要负责两件事:本地的状态管理,以及将检查点(checkpoint)状态写入远程存储。
状态后端分类
1 | ① MemoryStateBackend: 一般用于开发和测试 |
设置状态后端为FsStateBackend,并配置检查点和重启策略:
1 | <dependency> |
1 | StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); |