flink

结构

  • API & Libraries 层

    主要提供了编程 API 和 顶层类库

    进行流处理的 DataStream API 和用于进行批处理的 DataSet API

    包括用于复杂事件处理的 CEP 库;用于结构化数据查询的 SQL & Table 库,以及基于批处理的机器学习库 FlinkML 和 图形处理库 Gelly

  • Runtime 核心层

    Flink 分布式计算框架的核心实现层,包括作业转换,任务调度,资源分配,任务执行等功能,基于这一层的实现,可以在流式引擎下同时运行流处理程序和批处理程序

  • 物理部署层

    用于支持在不同平台上部署运行 Flink 应用

img

组件

JobManager

JobManager 具有许多与协调 Flink 应用程序的分布式执行有关的职责:它决定何时调度下一个 task(或一组 task)、对完成的 task 或执行失败做出反应、协调 checkpoint、并且协调从失败中恢复等等。这个进程由三个不同的组件组成:

  • ResourceManager

    ResourceManager 负责 Flink 集群中的资源提供、回收、分配 - 它管理 task slots,这是 Flink 集群中资源调度的单位(请参考TaskManagers)。Flink 为不同的环境和资源提供者(例如 YARN、Mesos、Kubernetes 和 standalone 部署)实现了对应的 ResourceManager。在 standalone 设置中,ResourceManager 只能分配可用 TaskManager 的 slots,而不能自行启动新的 TaskManager。

  • Dispatcher

    Dispatcher 提供了一个 REST 接口,用来提交 Flink 应用程序执行,并为每个提交的作业启动一个新的 JobMaster。它还运行 Flink WebUI 用来提供作业执行信息。

  • JobMaster

    JobMaster 负责管理单个JobGraph的执行。Flink 集群中可以同时运行多个作业,每个作业都有自己的 JobMaster。

始终至少有一个 JobManager。高可用(HA)设置中可能有多个 JobManager,其中一个始终是 leader,其他的则是 standby(请参考 高可用(HA))。

TaskManagers

TaskManager(也称为 worker)执行作业流的 task,并且缓存和交换数据流。

必须始终至少有一个 TaskManager。在 TaskManager 中资源调度的最小单位是 task slot。TaskManager 中 task slot 的数量表示并发处理 task 的数量。请注意一个 task slot 中可以执行多个算子(请参考Tasks 和算子链)。

Tasks 和算子链

对于分布式执行,Flink 将算子的 subtasks 链接tasks。每个 task 由一个线程执行。将算子链接成 task 是个有用的优化:它减少线程间切换、缓冲的开销,并且减少延迟的同时增加整体吞吐量。链行为是可以配置的;请参考链文档以获取详细信息。

  • Task

    在执行分布式计算时,Flink 将可以链接的操作 (operators) 链接到一起,这就是 Task

    减少线程间切换和缓冲而导致的开销,在降低延迟的同时可以提高整体的吞吐量。

    但不是所有的 operator 都可以被链接

    一个 Task 就是一个可以链接的最小的操作链 (Operator Chains) 。如上图,source 和 map 算子被链接到一块,因此整个作业就只有三个 Task

  • SubTask

    A subtask is one parallel slice of a task,即一个 Task 可以按照其并行度拆分为多个 SubTask。

    source & map 具有两个并行度,KeyBy 具有两个并行度,Sink 具有一个并行度,因此整个虽然只有 3 个 Task,但是却有 5 个 SubTask。Jobmanager 负责定义和拆分这些 SubTask,并将其交给 Taskmanagers 来执行,每个 SubTask 都是一个单独的线程。

Task Slots 和资源

每个 worker(TaskManager)都是一个 JVM 进程,可以在单独的线程中执行一个或多个 subtask。为了控制一个 TaskManager 中接受多少个 task,就有了所谓的 task slots(至少一个)。

每个 task slot 代表 TaskManager 中资源的固定子集。TaskManager平均分配内存到用于每个 slot。

这时每个 SubTask 线程运行在一个独立的 TaskSlot, 它们共享所属的 TaskManager 进程的TCP 连接(通过多路复用技术)和心跳信息 (heartbeat messages),从而可以降低整体的性能开销。此时看似是最好的情况,但是每个操作需要的资源都是不尽相同的,这里假设该作业 keyBy 操作所需资源的数量比 Sink 多很多 ,那么此时 Sink 所在 Slot 的资源就没有得到有效的利用。

Flink 允许多个 subtasks 共享 slots,即使它们是不同 tasks 的 subtasks,但只要它们来自同一个 Job 就可以,假设上面 souce & map 和 keyBy 的并行度调整为 6,而 Slot 的数量不变,此时情况如下

默认情况一个 Job 所需要的 Slot 的数量就等于其 Operation 操作的最高并行度

窗口概念

我们只需要对某个时间范围或者数量范围内的数据进行统计分析:如每隔五分钟统计一次过去一小时内所有商品的点击量;或者每发生1000次点击后,都去统计一下每个商品点击率的占比。在 Flink 中,我们使用窗口 (Window) 来实现这类功能。按照统计维度的不同,Flink 中的窗口可以分为 时间窗口 (Time Windows) 和 计数窗口 (Count Windows) 。

Time Windows

Time Windows 用于以时间为维度来进行数据聚合,具体分为以下四类

  • Tumbling Windows 滚动窗口

    彼此之间没有重叠的窗口

    每隔1小时统计过去1小时内的商品点击量,1 天就只能分为 24 个窗口,每个窗口彼此之间是不存在重叠的

  • Sliding Windows 滚动进行聚合分析

    那么统计窗口彼此之间就是存在重叠的

    每隔 6 分钟统计一次过去一小时内所有商品的点击量,,即 1天可以分为 240 个窗口

1
2
// 每隔3秒统计一次过去1分钟内的数据
timeWindow(Time.minutes(1),Time.seconds(3))
  • Session Windows

    在活动区间内,用户可能频繁的将某类商品加入和移除购物车,而你只想知道用户本次浏览最终的购物车情况,此时就可以在用户持有的会话结束后再进行统计

1
2
3
4
// 以处理时间为衡量标准,如果10秒内没有任何数据输入,就认为会话已经关闭,此时触发统计
window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
// 以事件时间为衡量标准    
window(EventTimeSessionWindows.withGap(Time.seconds(10)))

判断windows间隙时间

  • Global Windows

    全局窗口会将所有 key 相同的元素分配到同一个窗口中,其通常配合触发器 (trigger) 进行使用。如果没有相应触发器,则计算将不会被执行

    1
    2
    // 当单词累计出现的次数每达到10次时,则触发计算,计算整个窗口内该单词出现的总数
    window(GlobalWindows.create()).trigger(CountTrigger.of(10)).sum(1).print();
    

Count Windows

Count Windows 用于以数量为维度来进行数据聚合,同样也分为滚动窗口和滑动窗口,实现方式也和时间窗口完全一致,只是调用的 API 不同,具体如下:

1
2
3
4
// 滚动计数窗口,每1000次点击则计算一次
countWindow(1000)
// 滑动计数窗口,每10次点击发生后,则计算过去1000次点击的情况
countWindow(1000,10)

状态

算子状态Operator State

each operator state is bound to one parallel operator instance一个算子状态是与一个并发的算子实例所绑定的,即假设算子的并行度是 2,那么其应有两个对应的算子状态

键控状态 Keyed State

状态是根据 key 值进行区分的,Flink 会为每类键值维护一个状态实例。

CheckPoint 容错

为了使 Flink 的状态具有良好的容错性,Flink 提供了检查点机制 (CheckPoints) 。通过检查点机制,Flink 定期在数据流上生成 checkpoint barrier ,当某个算子收到 barrier 时,即会基于当前状态生成一份快照,然后再将该 barrier 传递到下游算子,下游算子接收到该 barrier 后,也基于当前状态生成一份快照,依次传递直至到最后的 Sink 算子上。当出现异常后,Flink 就可以根据最近的一次的快照数据将所有算子恢复到先前的状态。

默认情况下,检查点机制是关闭的,需要在程序中进行开启。

保存点机制 (Savepoints) 是检查点机制的一种特殊的实现,它允许你通过手工的方式来触发 Checkpoint,并将结果持久化存储到指定路径中,主要用于避免 Flink 集群在重启或升级时导致状态丢失

状态后端

所有的状态都存储在 JVM 的堆内存中,在状态数据过多的情况下,这种方式很有可能导致内存溢出,因此 Flink 该提供了其它方式来存储状态数据,这些存储方式统一称为状态后端 (或状态管理器)

  • MemoryStateBackend

    默认的方式,即基于 JVM 的堆内存进行存储,主要适用于本地开发和调试。

  • FsStateBackend

    基于文件系统进行存储,可以是本地文件系统,也可以是 HDFS 等分布式文件系统。 需要注意而是虽然选择使用了 FsStateBackend ,但正在进行的数据仍然是存储在 TaskManager 的内存中的,只有在 checkpoint 时,才会将状态快照写入到指定文件系统上。

  • RocksDBStateBackend

    RocksDBStateBackend 是 Flink 内置的第三方状态管理器,采用嵌入式的 key-value 型数据库 RocksDB 来存储正在进行的数据。等到 checkpoint 时,再将其中的数据持久化到指定的文件系统中,所以采用 RocksDBStateBackend 时也需要配置持久化存储的文件系统。之所以这样做是因为 RocksDB 作为嵌入式数据库安全性比较低,但比起全文件系统的方式,其读取速率更快;比起全内存的方式,其存储空间更大,因此它是一种比较均衡的方案。

乱序问题

实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来

Apache Flink的时间类型

  • ProcessingTime

    数据流入到具体某个算子时候相应的系统时间,但在分布式计算环境中ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。

  • IngestionTime

    数据进入Apache Flink框架的时间,是在Source Operator中设置的。IngestionTime的时间戳比较稳定(在源处只记录一次),同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。

  • EventTime

    是事件在设备上产生时候携带的。EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。

Watermark

watermarks 的作用 — 它们定义何时停止等待较早的事件

Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。Apache Flink 框架保证Watermark单调递增,算子接收到一个Watermark时候,框架知道不会再有任何小于该Watermark的时间戳的数据元素到来了,所以Watermark可以看做是告诉Apache Flink框架数据流已经处理到什么位置(时间维度)的方式。

两种生产Watermark的方式

  • Punctuated

    每一个递增的EventTime都会产生一个Watermark(Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成)

  • Periodic

    周期性的(一定时间间隔或者达到一定的记录条数)产生一个Watermark。在实际的生产中必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

Watermark的产生是在Apache Flink的Source节点或实现的Watermark生成器计算产生, Apache Flink内部对单流或多流的场景有统一的Watermark处理。

  • Watermark=EventTime 迟来的数据会丢弃

  • Watermark=EventTime-5s 下游得到window的结果就延迟了5s,会处理迟来的数据

多流的Watermark处理

在实际的流计算中往往一个job中会处理多个Source的数据,对Source的数据进行GroupBy分组,那么来自不同Source的相同key值会shuffle到同一个处理节点,并携带各自的Watermark,Apache Flink内部要保证Watermark要保持单调递增,多个Source的Watermark汇聚到一起时候可能不是单调自增的

Apache Flink内部实现每一个边上只能有一个递增的Watermark, 当出现多流携带Eventtime汇聚到一起(GroupBy or Union)时候,Apache Flink会选择所有流入的Eventtime中最小的一个向下游流出。从而保证watermark的单调递增和保证数据的完整性