client-go实战之八:更新资源时的冲突错误处理
阿里云国内75折 回扣 微信号:monov8 |
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6 |
欢迎访问我的GitHub
这里分类和汇总了欣宸的全部原创(含配套源码)https://github.com/zq2599/blog_demos
系列文章链接
- client-go实战之一准备工作
- client-go实战之二:RESTClient
- client-go实战之三Clientset
- client-go实战之四dynamicClient
- client-go实战之五DiscoveryClient
- client-go实战之六:时隔两年刷新版本继续实战
- client-go实战之七准备一个工程管理后续实战的代码
本篇概览
- 本文是《client-go实战》系列的第七篇来了解一个常见的错误版本冲突以及client-go官方推荐的处理方式
- 本篇由以下部分组成
- 什么是版本冲突from kubernetes官方
- 编码复现版本冲突
- 版本冲突的解决思路from kubernetes官方
- 版本冲突的实际解决手段from client-go官方
- 编码演示如何解决版本冲突
- 自定义入参对抗更高的并发
什么是版本冲突from kubernetes官方
- 简单的说就是同时出现多个修改请求针对同一个kubernetes资源的时候会出现一个请求成功其余请求都失败的情况
- 这里有kubernetes官方对版本冲突的描述https://github.com/kubernetes/community/blob/master/contributors/devel/sig-architecture/api-conventions.md#concurrency-control-and-consistency
- 以下是个人的理解
- 首先在逻辑上来说提交冲突是肯定存在的多人同时获取到同一个资源的信息例如同一个pod然后各自在本地修改后提交就有可能出现A的提交把B的提交覆盖的情况这一个点就不展开了数据库的乐观锁和悲观锁都可以用来处理并发冲突
- kubernetes应对提交冲突的方式是资源版本号属于乐观锁类型Kubernetes leverages the concept of resource versions to achieve optimistic concurrency
- 基于版本实现并发控制是常见套路放在kubernetes也是一样基本原理如下图所示按照序号看一遍即可理解左右两人从后台拿到的资源都是1.0版本然而右侧提交的1.1的时候服务器上已经被左侧更新到1.1了于是服务器不接受右侧提交
编码复现版本冲突
- 接下来咱们将上述冲突用代码复现出来具体的功能如下
- 创建一个deployment资源该资源带有一个label名为biz-version值为101
- 启动5个协程每个协程都做同样的事情读取deployment得到label的值后加一再提交保存
- 正常情况下label的值被累加了5次那么最终的值应该等于101+5=106
- 等5个协程都执行完毕后再读读取一次deployment看label值是都等于106
- 接下来就写代码实现上述功能
- 为了后续文章的实战代码能统一管理这里继续使用前文《client-go实战之七准备一个工程管理后续实战的代码
》创建的client-go-tutorials工程将代码写在这个工程中 - 在client-go-tutorials工程中新增名为的conflict.go的文件整个工程结构如下图所示
$ tree client-go-tutorials
client-go-tutorials
├── action
│ ├── action.go
│ ├── conflict.go
│ └── list_pod.go
├── client-go-tutorials
├── go.mod
├── go.sum
└── main.go
- 接下来的代码都写在conflict.go中
- 首先是新增两个常量
const (
// deployment的名称
DP_NAME string = "demo-deployment"
// 用于更新的标签的名字
LABEL_CUSTOMIZE string = "biz-version"
)
- 然后是辅助方法返回32位整型的指针后面会用到
func int32Ptr(i int32) *int32 { return &i }
- 创建deployment的方法要注意的是增加了一个label名为LABEL_CUSTOMIZE其值为101
// 创建deployment
func create(clientset *kubernetes.Clientset) error {
deploymentsClient := clientset.AppsV1().Deployments(apiv1.NamespaceDefault)
deployment := &appsv1.Deployment{
ObjectMeta: metav1.ObjectMeta{
Name: DP_NAME,
Labels: map[string]string{LABEL_CUSTOMIZE: "101"},
},
Spec: appsv1.DeploymentSpec{
Replicas: int32Ptr(1),
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app": "demo",
},
},
Template: apiv1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app": "demo",
},
},
Spec: apiv1.PodSpec{
Containers: []apiv1.Container{
{
Name: "web",
Image: "nginx:1.12",
Ports: []apiv1.ContainerPort{
{
Name: "http",
Protocol: apiv1.ProtocolTCP,
ContainerPort: 80,
},
},
},
},
},
},
},
}
// Create Deployment
fmt.Println("Creating deployment...")
result, err := deploymentsClient.Create(context.TODO(), deployment, metav1.CreateOptions{})
if err != nil {
return err
}
fmt.Printf("Created deployment %q.\n", result.GetObjectMeta().GetName())
return nil
}
- 按照名称删除deployment的方法实战的最后会调用将deployment清理掉
// 按照名称删除
func delete(clientset *kubernetes.Clientset, name string) error {
deletePolicy := metav1.DeletePropagationBackground
err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Delete(context.TODO(), name, metav1.DeleteOptions{PropagationPolicy: &deletePolicy})
if err != nil {
return err
}
return nil
}
- 再封装一个get方法用于所有更新操作完成后获取最新的deployment检查其label值是否符合预期
// 按照名称查找deployment
func get(clientset *kubernetes.Clientset, name string) (*v1.Deployment, error) {
deployment, err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return nil, err
}
return deployment, nil
}
- 接下来是最重要的更新方法这里用的是常见的先查询再更新的方式查询deployment取得标签值之后加一再提交保存
// 查询指定名称的deployment对象得到其名为biz-version的label加一后保存
func updateByGetAndUpdate(clientset *kubernetes.Clientset, name string) error {
deployment, err := clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return err
}
// 取出当前值
currentVal, ok := deployment.Labels[LABEL_CUSTOMIZE]
if !ok {
return errors.New("未取得自定义标签")
}
// 将字符串类型转为int型
val, err := strconv.Atoi(currentVal)
if err != nil {
fmt.Println("取得了无效的标签重新赋初值")
currentVal = "101"
}
// 将int型的label加一再转为字符串
deployment.Labels[LABEL_CUSTOMIZE] = strconv.Itoa(val + 1)
_, err = clientset.AppsV1().Deployments(apiv1.NamespaceDefault).Update(context.TODO(), deployment, metav1.UpdateOptions{})
return err
}
- 最后是主流程代码为了能在现有工程框架下运行这里新增一个struct并实现了action接口的DoAction方法这个DoAction方法中就是主流程
type Confilct struct{}
func (conflict Confilct) DoAction(clientset *kubernetes.Clientset) error {
fmt.Println("开始创建deployment")
// 开始创建deployment
err := create(clientset)
if err != nil {
return err
}
// 如果不延时就会导致下面的更新过早会报错
<-time.NewTimer(1 * time.Second).C
// 一旦创建成功就一定到删除再返回
defer delete(clientset, DP_NAME)
testNum := 5
waitGroup := sync.WaitGroup{}
waitGroup.Add(testNum)
fmt.Println("在协程中并发更新自定义标签")
startTime := time.Now().UnixMilli()
for i := 0; i < testNum; i++ {
go func(clientsetA *kubernetes.Clientset, index int) {
// 避免进程卡死
defer waitGroup.Done()
err := updateByGetAndUpdate(clientsetA, DP_NAME)
// var retryParam = wait.Backoff{
// Steps: 5,
// Duration: 10 * time.Millisecond,
// Factor: 1.0,
// Jitter: 0.1,
// }
// err := retry.RetryOnConflict(retryParam, func() error {
// return updateByGetAndUpdate(clientset, DP_NAME)
// })
if err != nil {
fmt.Printf("err: %v\n", err)
}
}(clientset, i)
}
// 等待协程完成全部操作
waitGroup.Wait()
// 再查一下自定义标签的最终值
deployment, err := get(clientset, DP_NAME)
if err != nil {
fmt.Printf("查询deployment发生异常: %v\n", err)
return err
}
fmt.Printf("自定义标签的最终值为: %v耗时%v毫秒\n", deployment.Labels[LABEL_CUSTOMIZE], time.Now().UnixMilli()-startTime)
return nil
}
- 最后还要修改main.go增加一个action的处理新增的内容如下
- 这里给出完整main.go
package main
import (
"client-go-tutorials/action"
"flag"
"fmt"
"path/filepath"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
)
func main() {
var kubeconfig *string
var actionFlag *string
// 试图取到当前账号的家目录
if home := homedir.HomeDir(); home != "" {
// 如果能取到就把家目录下的.kube/config作为默认配置文件
kubeconfig = flag.String("kubeconfig", filepath.Join(home, ".kube", "config"), "(optional) absolute path to the kubeconfig file")
} else {
// 如果取不到就没有默认配置文件必须通过kubeconfig参数来指定
kubeconfig = flag.String("kubeconfig", "", "absolute path to the kubeconfig file")
}
actionFlag = flag.String("action", "list-pod", "指定实际操作功能")
flag.Parse()
fmt.Println("解析命令完毕开始加载配置文件")
// 加载配置文件
config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig)
if err != nil {
panic(err.Error())
}
// 用clientset类来执行后续的查询操作
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
fmt.Printf("加载配置文件完毕即将执行业务 [%v]\n", *actionFlag)
var actionInterface action.Action
// 注意如果有新的功能类实现就在这里添加对应的处理
switch *actionFlag {
case "list-pod":
listPod := action.ListPod{}
actionInterface = &listPod
case "conflict":
conflict := action.Confilct{}
actionInterface = &conflict
}
err = actionInterface.DoAction(clientset)
if err != nil {
fmt.Printf("err: %v\n", err)
} else {
fmt.Println("执行完成")
}
}
- 最后如果您用的是vscode可以修改launch.json调整输入参数
{
"version": "0.2.0",
"configurations": [
{
"name": "Launch Package",
"type": "go",
"request": "launch",
"mode": "auto",
"program": "${workspaceFolder}",
"args": ["-action=conflict"]
}
]
}
-
回顾上面的代码您会发现是5个协程并行执行先查询再修改提交的逻辑理论上会出现前面提到的冲突问题5个协程并发更新会出现并发冲突因此最终标签的值是小于101+5=106的咱们来运行代码试试
-
果然经过更新后lable的最终值等于102也就是说过5个协程同时提交只成功了一个
-
至此咱们通过代码证明了资源版本冲突问题确实存在接下来就要想办法解决此问题了
版本冲突的解决思路from kubernetes官方
- 来看看kubernetes的官方对于处理此问题是如何建议的下面是官方原话
In the case of a conflict, the correct client action at this point is to GET the resource again, apply the changes afresh, and try submitting again
- 很明显在更新因为版本冲突而失败的时候官方建议重新获取最新版本的资源然后再次修改并提交
- 听起来很像CAS
- 在前面复现失败的场景如果是5个协程并发提交总有一个会失败多次那岂不是要反复重试把代码变得更复杂
- 还好client-go帮我们解决了这个问题按照kubernetes官方的指导方向将重试逻辑进行了封装让使用者可以很方便的实现完成失败重试
版本冲突的实际解决手段from client-go官方
- client-go提供的是方法下面是该方法的源码
func RetryOnConflict(backoff wait.Backoff, fn func() error) error {
return OnError(backoff, errors.IsConflict, fn)
}
- 从上述方法有两个入参backoff用于控制重试相关的细节如重试次数、间隔时间等fn则是常规的先查询再更新的自定义方法由调用方根据自己的业务自行实现总之只要fn返回错误并且该错误是可以通过重试来解决的RetryOnConflict方法就会按照backoff的配置进行等待和重试
- 可见经过client-go的封装对应普通开发者来说已经无需关注重试的实现了只要调用RetryOnConflict即可确保版本冲突问题会被解决
- 接下来咱们改造前面有问题的代码看看能否解决并发冲突的问题
编码演示如何解决版本冲突
- 改成client-go提供的自动重试代码整体改动很小如下图所示原来是直接调用updateByGetAndUpdate方法现在注释掉改为调用RetryOnConflict并且将updateByGetAndUpdate作为入参使用
- 再次运行代码如下图这次五个协程都更新成功了不过耗时也更长毕竟是靠着重试来实现最终提交成功的
自定义入参对抗更高的并发
- 前面的验证过程中并发数被设置为5现在加大一些试试改成10如下图红色箭头位置
- 执行结果如下图所示10个并发请求只成功了5个其余5个就算重试也还是失败了
- 出现这样的问题原因很明显下面是咱们调用方法时的入参每个并发请求最多重试5次显然即便是重试5次也只能确保每一次有个协程提交成功所以5次过后没有重试机会导致只成功了5个
var retryParam = wait.Backoff{
Steps: 5,
Duration: 10 * time.Millisecond,
Factor: 1.0,
Jitter: 0.1,
}
- 找到了原因就好处理了把上面的Steps参数调大改为10再试试
- 如下图这一次结果符合预期不过耗时更长了
- 最后留下一个问题Steps参数到底该设置成多少呢这个当然没有固定值了5是client-go官方推荐的值结果在并发为10的时候依然不够用所以具体该设置成多少还是要依照您的实际情况来决定需要大于最大的瞬间并发数才能保证所有并发冲突都能通过重试解决当然了实际场景中大量并发同时修改同一个资源对象的情况并不多见所以大多数时候可以直接使用client-go官方的推荐值
- 至此kubernetes资源更新时的版本冲突问题经过实战咱们都已经了解了并且掌握了解决方法基本的增删改查算是没问题了接下来的文章咱们要聚焦的是client-go另一个极其重要的能力List&Watch
- 敬请期待欣宸原创必不会辜负您
源码下载
- 上述完整源码可在GitHub下载到地址和链接信息如下表所示(https://github.com/zq2599/blog_demos)
名称 | 链接 | 备注 |
---|---|---|
项目主页 | https://github.com/zq2599/blog_demos | 该项目在GitHub上的主页 |
git仓库地址(https) | https://github.com/zq2599/blog_demos.git | 该项目源码的仓库地址https协议 |
git仓库地址(ssh) | git@github.com:zq2599/blog_demos.git | 该项目源码的仓库地址ssh协议 |
- 这个git项目中有多个文件夹本篇的源码在tutorials/client-go-tutorials文件夹下如下图红框所示