阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
欢迎访问我的GitHub
这里分类和汇总了欣宸的全部原创(含配套源码)https://github.com/zq2599/blog_demos
系列文章链接
- client-go实战之一准备工作
- client-go实战之二:RESTClient
- client-go实战之三Clientset
- client-go实战之四dynamicClient
- client-go实战之五DiscoveryClient
- client-go实战之六:时隔两年刷新版本继续实战
- client-go实战之七准备一个工程管理后续实战的代码
- client-go实战之八:更新资源时的冲突错误处理
本篇概览
- 本文是《client-go实战》系列的第九篇前面咱们已经了解了client-go的基本功能现在要来一次经典的综合实战了接下来咱们会手写一个kubernetes的controller其功能是监听某种资源的变化一旦资源发生变化例如增加或者删除apiserver就会有广播发出controller使用client-go可以订阅这个广播然后在收到广播后进行各种业务操作
- 本次实战代码量略大但如果随本文一步步先设计再开发并不会觉得有太多总的来说由以下内容构成
- 代码整体架构一览
- 对着架构细说流程
- 全局重点的小结
- 编码实战
代码整体架构一览
- 首先再次明确本次实战的目标开发出类似kubernetes的controller那样的功能实时监听pod资源的变化针对每个变化做出响应
- 今天的实战源自client-go的官方demo其主要架构如下
- 可能您会觉得上图有些复杂没关系接下来咱们细说此图为后面的编码打好理论基础
对着架构细说流程
- 最左侧的Kubernetes API Server+etcd是第一部分它们都是kubernetes的内部组件
- 第二部分是整个informerinformer是client-go库的核心模块
- 第三部分是WorkQueue和Conrol Loop它们都是controller的业务逻辑代码
- 上面三部分合作就能做到监听资源变化并做出响应
- 另外informer内部很复杂也很精巧后面会有专门的文章去细说本篇只会提到与controller有关系的informer细节其余的能不提就不提不然内容太多这篇文章写不完了
- 分类完毕后再来聊流程
- controller会通过client-go的list&watch机制与API Server建立长连接http2的stream只要pod资源发生变化API Server就会通过长连接推送到controller
- API Server推的数据到达Reflector它将数据写入Delta FIFO Queue
- Delta FIFO Queue是个先入先出的队列除了pod信息还保存了操作类型增加、修改、删除informer内部不断从这个队列获取数据再执行AddFunc、UpdateFunc、DeleteFunc等方法
- 完整的pod数据被存放在Local Store中外部通过Indexer随时可以获取到
- controller中准备一个或多个工作队列在执行AddFunc、UpdateFunc、DeleteFunc等方法时可以将定制化的数据放入工作队列中
- controller中启动一个或多个协程持续从工作队列中取数据执行业务逻辑执行过程中如果需要pod的详细数据可以通过indexder获取
- 差不多了我有种胸有成竹的感觉迫不及待想写代码但还是忍忍吧先规划再动手
编码规划
- 所谓规划就是把步骤捋清楚先写啥再写啥如下图所示
- 捋顺了开始写代码吧
编码之一定义Controller数据结构(controller.go)
type Controller struct {
indexer cache.Indexer
queue workqueue.RateLimitingInterface
informer cache.Controller
}
- 从上述代码可见Controller结构体有三个成员indexer是informer内负责存取完整资源信息的对象queue是用于业务逻辑的工作队列
编码之二编写业务逻辑代码(controller.go)
- 把资源变化信息存入工作队列这里可能按实际需求定制例如有的数据不关注就丢弃了
- 从工作队列中取出数据
- 取出数据后的处理逻辑这边是纯粹的业务需求了各人的实现都不一样
- 异常处理
- 步骤1存入工作队列的操作留待初始化informer的时候再做
- 步骤4异常处理稍后也有单独段落细说
- 这里只聚焦步骤2和3怎么取取出后怎么用
- 先写步骤2的代码从工作队列中取取数据用名为processNextItem的方法来实现对每一行代码进行中文注释着实不易支持的话请点个赞
func (c *Controller) processNextItem() bool {
key, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(key)
err := c.syncToStdout(key.(string))
c.handleErr(err, key)
return true
}
- 接下来写业务处理的代码就是上面调用的syncToStdout方法常规套路是检查spec和status的差距然后让status和spec保持一致例如spec中指定副本数为2而status中记录了真实的副本数是1所以业务处理就是增加一个副本数这里仅仅是为了展示业务处理代码在哪些所以就简(fu)化(yan)一些了只打印pod的名称
func (c *Controller) syncToStdout(key string) error {
obj, exists, err := c.indexer.GetByKey(key)
if err != nil {
klog.Errorf("Fetching object with key %s from store failed with %v", key, err)
return err
}
if !exists {
fmt.Printf("Pod %s does not exist anymore\n", key)
} else {
fmt.Printf("Sync/Add/Update for Pod %s\n", obj.(*v1.Pod).GetName())
}
return nil
}
编码之三编写错误处理代码(controller.go)
- 回顾前面的processNextItem方法内容在调用syncToStdout执行完业务逻辑后就立即调用handleErr方法了此方法的作用是检查syncToStdout的返回值是否有错误然后做针对性处理
func (c *Controller) handleErr(err error, key interface{}) {
if err == nil {
c.queue.Forget(key)
return
}
if c.queue.NumRequeues(key) < 5 {
klog.Infof("Error syncing pod %v: %v", key, err)
c.queue.AddRateLimited(key)
return
}
c.queue.Forget(key)
runtime.HandleError(err)
klog.Infof("Dropping pod %q out of the queue: %v", key, err)
}
- 好了和业务有关的代码已经完成接下来就是搭建controller框架把基本功能串起来
编码之四编写Controller主流程(controller.go)
- 编写一个完整的Controller最基本的是构造方法Controller的构造方法也很简单保存三个重要的成员变量即可
func NewController(queue workqueue.RateLimitingInterface, indexer cache.Indexer, informer cache.Controller) *Controller {
return &Controller{
informer: informer,
indexer: indexer,
queue: queue,
}
}
- 先定义个名为runWorker的简单方法里面是个无限循环只要消费消息的processNextItem方法返回true就无限循环下去
func (c *Controller) runWorker() {
for c.processNextItem() {
}
}
- 然后是Controller主流程代码简介清晰启动informer开始接受apiserver推送写入工作队列然后开启无限循环从工作队列取数据并处理
func (c *Controller) Run(workers int, stopCh chan struct{}) {
defer runtime.HandleCrash()
defer c.queue.ShutDown()
klog.Info("Starting Pod controller")
go c.informer.Run(stopCh)
if !cache.WaitForCacheSync(stopCh, c.informer.HasSynced) {
runtime.HandleError(fmt.Errorf("Timed out waiting for caches to sync"))
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
}
<-stopCh
klog.Info("Stopping Pod controller")
}
- 现在一个完整的Controller已经完成了接下来编写调用Controller的代码将其所需的三个对象传入再调用它的Run方法
编码之五编写调用Controller的代码(controller_demo.go)
- 为了能让整个工程的main方法调用Controller这里新增controller_demo.go方法里面新增名为ControllerDemo的数据结构创建Controller对象以及为其准备成员变量的操作都在ControllerDemo.DoAction方法中
package action
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
)
type ControllerDemo struct{}
func (controllerDemo ControllerDemo) DoAction(clientset *kubernetes.Clientset) error {
podListWatcher := cache.NewListWatchFromClient(clientset.CoreV1().RESTClient(), "pods", v1.NamespaceDefault, fields.Everything())
queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter())
indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
UpdateFunc: func(old interface{}, new interface{}) {
key, err := cache.MetaNamespaceKeyFunc(new)
if err == nil {
queue.Add(key)
}
},
DeleteFunc: func(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
if err == nil {
queue.Add(key)
}
},
}, cache.Indexers{})
controller := NewController(queue, indexer, informer)
stop := make(chan struct{})
defer close(stop)
go controller.Run(1, stop)
select {}
return nil
}
编码之六main方法中支持(main.go)
- 然后是整个工程的main方法里面增加一段代码支持新增的ControllerDemo如下图黄框所示
- 最后如果您使用的是vscode记得修改launch.json如下图黄色箭头这样main方法运行的时候就会执行Controller的代码了
运行和验证
go get k8s.io/apimachinery/pkg/util/diff@v0.25.4
- 确保kubernetes环境正常.kube/config配置也能正常使用然后运行main.go
- 使用kubectl edit xxx修改kubernetes环境中的pod例如我这里改的是下图黄色箭头的值
- 修改完毕保存退出后运行mian.go的控制台立即有内容输出如下图黄色箭头是咱们前面的syncToStdout方法的输入符合预期
- 至此整个Controller已经开发完成了相信您已经熟悉了informer和kubernetes的controller的基本套路加上前面的文章打下的基础再去做kubernetes二次开发或者operator开发等都能轻松驾驭了
本篇涉及知识点串讲
- 前几篇的风格都是抓住一个问题深入研究和实践但是到了本篇似乎多个知识点同时涌出并且还要紧密配合完成业务目标可能年轻的您一下子略有不适应我这里再次将本次开发中的重点进行总结经历过一番实战再来看这些总结相信您很容易就融会贯通了
- 先给出数据流视图结合前面的实战您应该能一眼看懂
- 接下来开始梳理重点
- 创建一个名为podListWatcher的ListWatch对象用于对指定资源类型建立监听本例中监听的资源是pod
- 创建一个名为queue的工作队列就是个先进先出的内存对象没啥特别之处
- 通过podListWatcher创建一个informer这个informer的功能对podListWatcher监听的事件作相应
- 在创建informer的时候还会返回一个名为indexer的本地缓存这里面保存了所有pod信息由于pod的变动全部都会被informer收到因此indexer中保存了最新的pod信息
- 在新协程中启动informer这里面对应两件事情第一创建Reflector对象这个Reflector对象会把podListWatcher监听到的数据放入一个DeltaFIFO队列注意不是步骤2中的工作队列第二是循环地取出fifo队列中的数据再调用AddFunc、UpdateFunc、DeleteFunc等方法
- 步骤5中提到的AddFunc、UpdateFunc、DeleteFunc可以在创建informer的时候由业务开发者自定义一般会再次将key放入工作队列中
- 在新协程消费工作队列queue的数据这里可以根据业务需求写入也任务逻辑代码
- 基于以上详细描述再来个精简版介绍重点对象如果您对详细描述不感兴趣可以只看精简版掌握其中关键即可
- podListWatcher用于监听指定类型资源的变化
- queue工作队列从里面取出的key其资源都有事件发生
- informer接受监听到的事件再调用指定的回调方法
- Reflectorinformer内部三大对象之一用于接受事件再写入一个内部fifo队列
- DeltaFIFOinformer内部三大对象之二先入先出队列还保存了操作类型
- indexerinformer内部三大对象之三这里面保存的是指定资源的完整数据和apiserver侧保持同步
- 接受消息的协程informer在这个协程中启动也在这个协程中将数据写入工作队列
- 处理工作队列的协程负责从工作队列中取出数据处理
- 工作队列queue和informer内部的fifo是不同的队列是两回事为了满足业务需求我们可以在一个controller中创建多个工作队列也可以不要工作队列在informer的三个回调方法中完成业务逻辑
以下是官方参考信息
- https://github.com/kubernetes/community/blob/master/contributors/devel/sig-api-machinery/controllers.md
源码下载
- 上述完整源码可在GitHub下载到地址和链接信息如下表所示(https://github.com/zq2599/blog_demos)
名称 | 链接 | 备注 |
---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址ssh协议 |
- 这个git项目中有多个文件夹本篇的源码在tutorials/client-go-tutorials文件夹下如下图红框所示
- 写到这里client-go基本功的学习已经完成了接下来咱们还要继续深入研究让这个优秀的库在手中发挥更大的威力欣宸原创敬请期待
你不孤单欣宸原创一路相伴
- Java系列
- Spring系列
- Docker系列
- kubernetes系列
- 数据库+中间件系列
- DevOps系列