背景
由于我在公司内部负责维护Pulsar,需要时不时的升级Pulsar版本从而和社区保持一致。
而每次升级过程都需要做相同的步骤:
命令行工具
以上的流程步骤最好是全部一键完成,我们只需要人工检测下监控是否正常即可。
于是我便写了一个命令行工具,执行流程如下:
pulsar-upgrade-cli -hok | at 10:33:18A cli app for upgrading PulsarUsage:pulsar-upgrade-cli [command]Available Commands:completionGenerate the autocompletion script for the specified shellhelpHelp about any commandinstallinstall a target versionscalescale statefulSet of the clusterFlags:--burst-limit intclient-side default throttling limit (default 100)--debugenable verbose output-h, --helphelp for pulsar-upgrade-cli--kube-apiserver stringthe address and the port for the Kubernetes API server--kube-as-group stringArraygroup to impersonate for the operation, this flag can be repeated to specify multiple groups.--kube-as-user stringusername to impersonate for the operation
真实使用的example如下:
pulsar-upgrade-cli install \--values ./charts/pulsar/values.yaml \--set namespace=pulsar-test \--set initialize=true \--debug \--test-case-schema=http \--test-case-host=127.0.0.1 \--test-case-port=9999 \pulsar-test ./charts/pulsar -n pulsar-test
它的安装命令非常类似于helm,也是直接使用 helm 的value.yaml进行安装;只是在安装成功后(等待所有的 Pod 都处于 Running 状态)会再触发 test-case 测试,也就是请求一个 endpoint。
同时还提供了一个 scale(扩、缩容) 命令,可以用修改集群规模:
# 缩容集群规模为0./pulsar-upgrade-cli scale --replicase 0 -n pulsar-test# 缩容为最小集群./pulsar-upgrade-cli scale --replicase 1 -n pulsar-test# 恢复为最满集群./pulsar-upgrade-cli scale --replicase 2 -n pulsar-test
这个需求是因为我们的Pulsar测试集群部署在了一个servless的kubernetes集群里,它是按照使用量收费的,所以在我不需要的使用的时候可以通过这个命令将所有的副本数量修改为 0,从而减少使用成本。
当只需要做简单的功能测试时便回将集群修改为最小集群,将副本数修改为只可以提供服务即可。
而当需要做性能测试时就需要将集群修改为最高配置。
这样可以避免每次都安装新集群,同时也可以有效的减少测试成本。
实现原理
require (github.com/spf13/cobra v1.6.1github.com/spf13/pflag v1.0.5helm.sh/helm/v3 v3.10.2)
这个命令行工具本质上是参考了 helm 的命令行实现的,所有主要也是依赖了helm和cobra。
下面以最主要的安装命令为例,核心的是以下的步骤:
func (e *installEvent) FinishInstall(cfg *action.Configuration, name string) error {bar.Increment()bar.Finish()clientSet, err := cfg.KubernetesClientSet()if err != nil {return err}ctx := context.Background()ip, err := GetServiceExternalIp(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-proxy", name))if err != nil {return err}token, err := GetPulsarProxyToken(ctx, clientSet, settings.Namespace(), fmt.Sprintf("%s-token-proxy-admin", name))if err != nil {return err}// trigger testcaseerr = e.client.Trigger(context.Background(), ip, token)return err}
这里的FinishInstall需要获取到新安装的 Pulsar 集群的 proxy IP 地址和鉴权所使用的token(GetServiceExternalIp()/GetPulsarProxyToken())。
将这两个参数传递给test-case才可以构建出pulsar-client.
这个命令的核心功能就是安装集群和触发测试,以及一些集群的基本运维能力。
测试框架
而关于这里的测试用例也有一些小伙伴咨询过,如何对 Pulsar 进行功能测试。
其实 Pulsar 源码中已经包含了几乎所有我们会使用到的测试代码,理论上只要新版本的官方镜像已经推送了那就是跑了所有的单测,质量是可以保证的。
那为什么还需要做功能测试呢?
其实很很简单,Pulsar这类基础组件官方都有提供基准测试,但我们想要用于生产环境依然需要自己做压测得出一份属于自己环境下的性能测试报告。
根本目的是要看在自己的业务场景下是否可以满足(包括公司的软硬件,不同的业务代码)。
所以这里的功能测试代码有一个很重要的前提就是:需要使用真实的业务代码进行测试。
也就是业务在线上使用与 Pulsar 相关的代码需要参考功能测试里的代码实现,不然有些问题就无法在测试环节覆盖到。
实现原理
以上是一个集群的功能测试报告,这里我只有 8 个测试场景(结合实际业务使用),考虑到未来可能会有新的测试用例,所以在设计这个测试框架时就得考虑到扩展性。
AbstractJobDefine job5 =new FailoverConsumerTest(event, "故障转移消费测试", pulsarClient, 20, admin);CompletableFuture<Void> c5 = CompletableFuture.runAsync(job5::start, EXECUTOR);AbstractJobDefine job6 = new SchemaTest(event,"schema测试",pulsarClient,20,prestoService);CompletableFuture<Void> c6 = CompletableFuture.runAsync(job6::start, EXECUTOR);AbstractJobDefine job7 = new VlogsTest(event,"vlogs test",pulsarClient,20, vlogsUrl);CompletableFuture<Void> c7 = CompletableFuture.runAsync(job7::start, EXECUTOR);CompletableFuture<Void> all = CompletableFuture.allOf(c1, c2, c3, c4, c5, c6, c7);all.whenComplete((___, __) -> {event.finishAll();pulsarClient.closeAsync();admin.close();}).get();
对外提供的 trigger 接口就不贴代码了,重点就是在这里构建测试任务,然后等待他们全部执行完毕。
@Datapublic abstract class AbstractJobDefine {private Event event;private String jobName;private PulsarClient pulsarClient;private int timeout;private PulsarAdmin admin;public AbstractJobDefine(Event event, String jobName, PulsarClient pulsarClient, int timeout, PulsarAdmin admin) {this.event = event;this.jobName = jobName;this.pulsarClient = pulsarClient;this.timeout = timeout;this.admin = admin;}public void start() {event.addJob();try {CompletableFuture.runAsync(() -> {StopWatch watch = new StopWatch();try {watch.start(jobName);run(pulsarClient, admin);} catch (Exception e) {event.oneException(this, e);} finally {watch.stop();event.finishOne(jobName, StrUtil.format("cost: {}s", watch.getTotalTimeSeconds()));}}, TestCase.EXECUTOR).get(timeout, TimeUnit.SECONDS);} catch (Exception e) {event.oneException(this, e);}}/** run busy code* @param pulsarClient pulsar client* @param admin pulsar admin client* @throws Exception e*/public abstract void run(PulsarClient pulsarClient, PulsarAdmin admin) throws Exception;}
核心代码就是这个抽象的任务定义类,其中的 start 函数用于定义任务执行的模版:
下面来看一个普通用例的实现情况:
就是重写了run()函数,然后在其中实现具体的测试用例,断言测试结果。
这样当我们需要再添加用例的时候只需要再新增一个子类实现即可。
同时还需要定义一个事件接口,用于处理一些关键的节点:
public interface Event {/*** 新增一个任务*/void addJob();/** 获取运行中的任务数量* @return 获取运行中的任务数量*/TestCaseRuntimeResponse getRuntime();/*** 单个任务执行完毕** @param jobName任务名称* @param finishCost 任务完成耗时*/void finishOne(String jobName, String finishCost);/**单个任务执行异常* @param jobDefine 任务* @param e 异常*/void oneException(AbstractJobDefine jobDefine, Exception e);/*** 所有任务执行完毕*/void finishAll();}
其中getRuntime接口是用于在 cli 那边查询任务是否执行完毕的接口,只有任务执行完毕之后才能退出cli。
监控指标
当这些任务运行完毕后我们需要重点查看应用客户端和 Pulsar broker 端是否有异常日志。
同时还需要观察一些关键的监控面板:
包含但不限于:
当然还有zookeeper的运行情况也需要监控,限于篇幅就不一一粘贴了。
以上就是测试整个 Pulsar 集群的流程,当然还有一些需要优化的地方。
比如使用命令行还是有些不便,后续可能会切换到网页上就可以操作。
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://jmbhsh.com/keji/36524.html