k8s-informer 机制

Posted by Wang Gang on 2019-09-01

首先上一张官方架构图

image.png

简介

  • Informer 是 Client-go 中的一个核心工具包。在 Kubernetes 源码中,如果 Kubernetes 的某个组件,需要 List/Get Kubernetes 中的 Object,在绝大多 数情况下,会直接使用 Informer 实例中的 Lister()方法(该方法包含 了 Get 和 List 方法),而很少直接请求 Kubernetes API。Informer 最基本 的功能就是 List/Get Kubernetes 中的 Object。由于kubernetes中复杂的逻辑,由此Informer提供更加复杂的功能,监听事件,触发预设的回调函数以实现复杂的逻辑
  • 使用 Informer 实例的 Lister() 方法, List/Get Kubernetes 中的 Object 时,Informer 不会去请求 Kubernetes API,而是直接查找缓存在本地内存中的数据(这份数据由 Informer 自己维护)。通过这种方式,Informer 既可以更快地返回结果,又能减少对 Kubernetes API 的直接调用。

设计逻辑

主要几个对象:

  • reflctor:
    Reflector 类型定义在 cache 包中(tools/cache/reflector.go:47),它的作用是向 apiserver watch 特定的资源类型。这个功能通过其绑定的 ListAndWatch 方法实现。Watch 的资源可以是 in-build 的资源也可以是 custom 的资源。当 Reflector 通过 watch API 接收到存在新的资源对象实例的通知后,它使用相应的 list API 获取新创建的资源对象,然后 put 进 Delta Fifo 队列。
    • store(delta FIFO queue): 对list-watch 到的apiserver的事件进行存储
    • listWatch[filed]这里的listWatcher对象,对apiser的时间监听
    • Run()[func]ListAndWatch[func]对apiserver数据list-watch
      image.png
  • indexer
    Indexer 提供的是 objects 之上的检索能力。Indexer 也定义在 cache 包中(tools/cache/index.go:27). 一个典型的检索使用方式是基于一个对象的 labels 创建索引。Indexer 可以基于各种索引函数维护索引。Indexer 使用一个线程安全的 store 来存储对象和其对应的 key. 还有一个默认函数 MetaNamespaceKeyFunc(tools/cache/store.go:76) 可以生成对象的 key,类似 / 格式来关联对应的对象。
  • informer
    一个controller(client-go/tools/cache/controller.go),通过创建reflector,来实现list-watch kubernetes-apiserver中的object变化,然后将数据写入到indexer中,并触发其中的回调(回调函数可自定义)
    image.png
    image.png

业务流程

  1. Informer 在初始化时,Reflector 会先 List API 获得所有的 Pod
  2. Reflect 拿到全部 Pod 后,会将全部 Pod 放到 Store 中
  3. 如果有人调用 Lister 的 List/Get 方法获取 Pod, 那么 Lister 会直接从 Store 中拿数据
  4. Informer 初始化完成之后,Reflector 开始 Watch Pod,监听 Pod 相关 的所有事件;如果此时 pod_1 被删除,那么 Reflector 会监听到这个事件
  5. Reflector 将 pod_1 被删除 的这个事件发送到 DeltaFIFO
  6. DeltaFIFO 首先会将这个事件存储在自己的数据结构中(实际上是一个 queue),然后会直接操作 Store 中的数据,删除 Store 中的 pod_1
  7. DeltaFIFO 再 Pop 这个事件到 Controller 中
  8. Controller 收到这个事件,会触发 Processor 的回调函数
  9. LocalStore 会周期性地把所有的 Pod 信息重新放到 DeltaFIFO 中

一个简单的例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package main

import (
"fmt"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog"
"time"
)

func main() {
// 初始化日志信息
klog.InitFlags(nil)

// 配置信息
cfg, err := clientcmd.BuildConfigFromFlags("", "/Users/lives/.kube/config")
if err != nil {
klog.Fatalf("Error building kubeconfig: %s", err.Error())
}

// 生成client
clientSet, err := kubernetes.NewForConfig(cfg)
if err != nil {
panic(err)
}

//获取所有版本的shared informer
sharedInformer := informers.NewSharedInformerFactory(clientSet, time.Second*10)

stopCh := make(chan struct{})

// 启动监听
go sharedInformer.Start(stopCh)

// 对pod监听
podInformer := sharedInformer.Core().V1().Pods()

podLister := podInformer.Lister()
informer := podInformer.Informer()

// 添加增删的回调
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: nil,
UpdateFunc: nil,
DeleteFunc: nil,
})

// 等待list完成
if !cache.WaitForCacheSync(stopCh, informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}

// 从cache中获取kube-system中的pod列表
pods, err := podLister.Pods("kube-system").List(labels.Everything())
if err != nil {
panic(err)
}
for key, value := range pods {
klog.V(2).Info(key, value)
}
}