这张图分为两部分,黄色图标是开发者需要自行开发的部分,而其它的部分是 client-go 已经提供的,直接使用即可。
客户端组件
Reflector
定义这里,通过ListAndWatch,监听Kubernetes API里面特定的资源种类(kind),可以是内置的,也可以是自定义的。
当reflector通过watch api 收到新资源创建的通知时候,通过list api获取新创建的对象,使用watchHandler将对象放到 Delta Fifo queue
用于 Watch 指定的 Kubernetes 资源,当 watch 的资源发生变化时,触发变更的事件,比如 Added,Updated 和 Deleted 事件,并将资源对象存放到本地缓存 DeltaFIFO
DeltaFIFO
拆开理解,FIFO 就是一个队列,拥有队列基本方法(ADD,UPDATE,DELETE,LIST,POP,CLOSE 等),Delta 是一个资源对象存储,保存存储对象的消费类型,比如 Added,Updated,Deleted,Sync 等
Informer
定义,使用processLoop函数,获取从Delta Fifo 队列中弹出对象
- 保存对象供后面检索用
- 传递对象到controller
Indexer
一个典型的索引用例是基于对象标签创建索引。Indexer 可以基于多个索引功能维护索引。Indexer 使用线程安全的数据存储来存储对象及其键。使用MetaNamespaceKeyFunc定义key,<namespace>/<name>
通常是这样的一个key。
Client-go 用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储到 Indexer,Indexer 与 Etcd 集群中的数据完全保持一致。从而 client-go 可以本地读取,减少 Kubernetes API 和 Etcd 集群的压力。
自定义Controller组件
client-go 中的基本控制器提供了NewIndexerInformer函数来创建自定义的 Informer 和 Indexer。在您的代码中,您可以直接调用此函数或使用工厂方法来创建informer。
-
Resource Event Handlers
Informer 在要将对象传递给控制器时将调用的回调函数,获取分派对象的键并将该键加入work queue以供进一步处理。
-
Work queue
这是您在控制器代码中创建的队列,用于将对象的交付与其处理分离。编写资源事件处理函数以提取交付对象的密钥并将其添加到工作队列中。
-
Process Item
这是您在代码中创建的函数,用于处理工作队列中的项目。可以有一个或多个其他函数来进行实际处理。这些函数通常会使用索引器引用或列表包装器来检索与键对应的对象。
Informer 机制
Kubernetes 中使用 http 进行通信,如何不依赖中间件的情况下保证消息的实时性,可靠性和顺序性等呢?答案就是利用了 Informer 机制。Informer 的机制,降低了了 Kubernetes 各个组件跟 Etcd 与 Kubernetes API Server 的通信压力。
1 |
|
上面这个示例,当触发了 Add,Update 或者 Delete 事件,就通知 Client-go,告知 Kubernetes 资源事件发生变更并且需要进行相应的处理。
list and watch
获取资源数据列表
通过 Run
获取了所有 Pod 的资源数据,List 流程如下:
-
r.ListWatcher.List
获取资源数据,根据 ResourceVersion 获取资源下的所有对象数据。以 Example 为例,该方法调用的就是 Pod Informer 下的ListFunc
函数,通过 ClientSet 客户端与 Kubernetes API Server 交互并获取 Pod 资源列表数据; -
listMetaInterface.GetResourceVersion
获取资源版本号;获取 ResourceVersion,即资源版本号,注意这里的资源版本号并不是指前面各个客户端的不同 kind 的不同 Version,所有资源都拥有 ResourceVersion,标识当前资源对象的版本号。每次修改 etcd 集群中存储的对象时,Kubernetes API Server 都会更改 ResourceVersion,使得 client-go 执行 watch 时可以根据 ResourceVersion 判断当前资源对象是否发生变化 -
meta.ExtractList
将资源数据转换为资源对象列表;将 runtime.Object 对象转换为 []runtime.Object 对象。因为r.ListWatcher.List
获取的是资源下所有对象的数据,因此应当是一个列表 -
r.SyncWith
将资源对象列表的资源对象和资源版本号存储到 DeltaFIFO 中;将结果同步到 DeltaFIFO 中 -
r.setLastSyncResourceVersion
设置最新的资源版本号。
监控资源对象
Watch 通过 HTTP 协议与 Kubernetes API Server 建立长连接,接收 Kubernetes API Server 发来的资源变更事件。Watch 操作的实现机制使用 HTTP 协议的分块传输编码——当 client-go 调用 Kubernetes API Server 时,Kubernetes API Server 在 Response 的 HTTP Header 中设置 Transfer-Encoding 的值为 chunked,表示采用分块传输编码,客户端收到消息后,与服务端进行连接,并等待下一个数据块。
https://github.com/kubernetes/client-go/blob/master/tools/cache/reflector.go
1 |
|
r.listerWatcher.Watch
实际调用了 Pod Informer 下的 Watch 函数,通过 ClientSet 客户端与 Kubernetes API Server 建立长链接,监控指定资源的变更事件,如下
https://github.com/kubernetes/client-go/blob/master/informers/core/v1/pod.go
1 |
|
r.watchHandler
处理资源的变更事件,将对应资源更新到本地缓存 DeltaFIFO 并更新 ResourceVersion 资源版本号。
1 |
|
Shared Informer
每一个 k8s Resource 都实现了 Informer 机制,均有 Informer 和 Lister 方法,以 PodInformer 为例:
1 |
|
Shared Informer 共享机制
Informer 又称为 Shared Informer,表明是可以共享使用的,在使用 client-go 写代码时,若同一资源的 Informer 被实例化太多次,每个 Informer 使用一个 Reflector,会运行过多的相同 ListAndWatch,太多重复的序列化和反序列化会导致 k8s API Server 负载过重。
而 Shared Informer 通过对同一类资源 Informer 共享一个 Reflector 可以节约很多资源,这通过 map 数据结构即可实现这样一个共享 Informer 机制。
https://github.com/kubernetes/client-go/blob/master/informers/factory.go
1 |
|
DeltaFIFO
DeltaFIFO 拆开理解,FIFO 就是一个队列,拥有队列基本方法(ADD,UPDATE,DELETE,LIST,POP,CLOSE 等),Delta 是一个资源对象存储,保存存储对象的消费类型,比如 Added,Updated,Deleted,Sync 等。
https://github.com/kubernetes/client-go/blob/master/tools/cache/delta_fifo.go
1 |
|
DeltaFIFO 会保留所有关于资源对象(obj)的操作类型,队列中会存在拥有不同操作类型的同一资源对象,使得消费者在处理该资源对象时能够了解资源对象所发生的事情。queue 字段存储资源对象的 key,这个 key 通过 KeyOf 函数计算得到,items 字段通过 map 数据结构的方式存储,value 存储的是对象 Deltas 数组,一个结构示例图如下:
1 |
|
作为一个 FIFO 的队列,有数据的生产者和消费者,其中生产者是 Reflector 调用的 Add 方法,消费者是 Controller 调用的 Pop 方法。三个核心方法为生产者方法,消费者方法和 Resync 机制。
生产者方法
DeltaFIFO 队列中的资源对象在调用 Added,Updated,Deleted 等事件时都调用了 queueActionLocked 函数:
https://github.com/kubernetes/client-go/blob/master/tools/cache/delta_fifo.go
1 |
|
https://github.com/kubernetes/client-go/blob/8f44946f6cbe967fbe2e2548e76987680a89428e/tools/cache/delta_fifo.go#L409
1 |
|
消费者方法
Pop 函数作为消费者使用方法,从 DeltaFIFO 的头部取出最早进入队列中的资源对象数据。Pop 方法必须传入 process 函数,用于接收并处理对象的回调方法,如下
使用 f.lock.Lock()
确保了数据的同步,当队列不为空时,取出 f.queue
的头部数据,将对象传入 process 回调函数,由上层消费者进行处理,如果 process 回调方法处理出错,将该对象重新存入队列。
1 |
|
Controller 的 processLoop
方法负责从 DeltaFIFO 队列中取出数据传递给 process 回调函数,process 函数的类型如下:
1 |
|
一个 process 回调函数代码示例如下:
1 |
|
在这个例子中,HandleDeltas 作为 process 的一个回调函数,当资源对象操作类型为 Added,Updated 和 Delted 时,该资源对象存储至 Indexer(它是并发安全的),并通过 distribute 函数将资源对象分发到 SharedInformer,在之前 Informer 机制架构设计的示例代码中,通过 informer.AddEventHandler
函数添加了对资源事件进行处理的函数,distribute 函数将资源对象分发到该事件处理函数。
Resync 机制
Resync 机制会将 Indexer 本地存储中的资源对象同步到 DeltaFIFO 中,并将这些资源对象设置为 Sync 的操作类型,
1 |
|
为什么需要 Resync 机制呢?因为在处理 SharedInformer 事件回调时,可能存在处理失败的情况,定时的 Resync 让这些处理失败的事件有了重新 onUpdate 处理的机会。
那么经过 Resync 重新放入 Delta FIFO 队列的事件,和直接从 apiserver 中 watch 得到的事件处理起来有什么不一样呢?在消费者方法中有介绍过 HandleDeltas
,其中就有关于 Resync 的部分:
1 |
|
从上面对 Delta FIFO 的队列处理源码可看出,
当从 Resync 重新同步到 Delta FIFO 队列的事件,会分发到 updateNotification 中触发 onUpdate 的回调。Resync 机制的引入,定时将 Indexer 缓存事件重新同步到 Delta FIFO 队列中,在处理 SharedInformer 事件回调时,让处理失败的事件得到重新处理。并且通过入队前判断 FIFO 队列中是否已经有了更新版本的 event,来决定是否丢弃 Indexer 缓存不进行 Resync 入队。在处理 Delta FIFO 队列中的 Resync 的事件数据时,触发 onUpdate 回调来让事件重新处理。
Indexer
用来存储资源对象并自带索引功能的本地存储,Reflector 从 DeltaFIFO 中将消费出来的资源对象存储到 Indexer,Indexer 与 Etcd 集群中的数据完全保持一致。从而 client-go 可以本地读取,减少 Kubernetes API 和 Etcd 集群的压力。
了解 Indexer 之前,先了解 ThreadSafeMap,ThreadSafeMap 是实现并发安全存储,就像 Go 1.9 后推出 sync.Map
一样。Kubernetes 开始编写的时候还没有 sync.Map
。Indexer 在 ThreadSafeMap 的基础上进行了封装,继承了 ThreadSafeMap 的存储相关的增删改查相关操作方法,实现了 Indexer Func 等功能,例如 Index,IndexKeys,GetIndexers 等方法,这些方法为 ThreadSafeMap 提供了索引功能。如下图:
1 |
|
ThreadSafeStore
ThreadSafeStore 是一个内存中存储,数据不会写入本地磁盘,增删改查都会加锁,保证数据一致性。结构如下
1 |
|
items 字段存储资源对象数据,其中 items 的 key 通过 keyFunc 函数计算得到,计算默认使用 MetaNamespaceKeyFunc 函数,该函数根据资源对象计算出 <namespace>/<name>
格式的 key,value 用于存储资源对象。
而后面两个字段的定义类型如下:
1 |
|
Indexer 索引器
每次增删改 ThreadSafeStore 的数据时,都会通过 updateIndices 或 deleteFormIndices 函数变更 Indexer。Indexer 被设计为可以自定义索引函数,他有重要的四个数据结构,Indexers,IndexFunc,Indices 和 Index。
看下面这个例子的关键流程:
1 |
|
首先定义了一个索引器函数(IndexFunc),UsersIndexFunc。该函数定义查询所有 Pod 资源下 Annotations 字段的 key 为 users 的 Pod:
1 |
|
Main 函数中 cache.NewIndexer
实例化了一个 Indexer 对象:
1 |
|
第一个参数计算资源对象的 key,默认就是 MetaNamespaceKeyFunc,第二个参数是一个 Indexers 对象,如上一节展示的定义那样,key 为索引器(IndexFunc)的名称,value 为索引器函数。
通过 index.Add 添加了三个 Pod,再通过 index.ByIndex 函数查询使用 byUser 索引器下匹配 ernie 字段的 Pod 列表:
1 |
|
回看这四个类型:
1 |
|
- Indexers:存储索引器,key 为 索引器名称,value 为索引器实现函数;
- IndexFunc:索引器函数,定义为接收一个资源对象,返回检索结果列表;
- Indices:存储缓存器,key 为缓存器名称,value 为缓存数据;
- Index:存储缓存数据,结构为 K/V。
Indexer 索引器核心实现
1 |
|
ByIndex 接收两个参数:indexName(索引器名字)以及 indexKey(需要检索的 key),首先从 c.indexers 查找制定的索引器函数,然后从 c.indices 查找返回的缓存器函数,最后根据需要索引的 indexKey 从缓存数据中查到并返回。
K8s 将 map 结构类型的 key 作为 Set 数据结构,实现 Set 去重特性。