我在写这东西的时候,除开k8s给我带来的一堆麻烦事,我到现在才发现我大部分时间都在纠结于架构而非实现上,总而言之,我这个傻逼犯了一个极大的错误,所以目前,比起去继续折腾环境的问题,我想要先设计好它……至少是骨架……

在此之前……

在helm chart里直接指定数据库的实现并不是好的做法,这同样是我犯的另一个错误,所以helm chart/helm values也迎来了一次大更改,以及一些job trigger的指定,不过这都不是什么很重要的事情……

设计

需求

最原始的需求

我有一个列表,这个列表里记录了曲目信息;我还有一个数据库,这个数据库里记录了量产号的信息;我需要让每个Pod都唯一持有这个数据库的账号。此外,entry-pod从外部接受数据,并且让worker-pod协力处理发送完API请求后,将结果整合到某个地方供entry-pod读

接下来的需求

首先,如何保证一个Pod能够唯一获取到账号库中的一个数据且不冲突?账号库的结构应该如何设计?如何规划工作队列(或者使用什么其它的设计模式?)如何记录输出的数据?

实现

这里才是最头疼的地方……

账号数据库

首先,数据库内当然需要有账号密码以及临时储存的token,需要有一个维护当前账号条目的字段,并且需要有一个时间戳来保证一个账号太久没有使用之后可以正确回收,所以,目前的设计如下:

1
2
3
4
5
6
7
8
9
10
CREATE TABLE IF NOT EXISTS account_table (
id BIGSERIAL PRIMARY KEY,
account_email TEXT NOT NULL,
account_password TEXT NOT NULL,
user_id TEXT DEFAULT NULL,
temp_token TEXT DEFAULT NULL,
state TEXT NOT NULL DEFAULT 'Idle',
rate INTEGER NOT NULL DEFAULT 10,
lease_time TIMESTAMPTZ NOT NULL DEFAULT now()
);

同时,在entry-pod中,暴露几个接口,用来管理这个数据库,提供简单的CRUD

数据流

在设计工作队列之前,我们需要先捋清一下我们期望的数据流

我考虑过一些奇奇怪怪的“架构”:

  • “队列分发模式”(entry-pod通过将任务拆分为具体曲目信息,然后下发到工作队列来拆分大任务)
    但是这种方式API请求次数高,账号利用效率低,且数据库吞吐量大
  • “滑动窗口模式”(每个worker-pod持有一个曲目列表的滑动窗口,通过工作队列,多个worker-pod并行处理单个账号的数据)
    这种模式降低了API请求次数,同时降低了数据库吞吐量,但是使用这种方式的前提是“每个worker-pod”知道自己的滑动窗口,这就对集群内动态扩缩容的情况下保持多个worker-pod的一致性提出了不小的挑战,我也考虑通过滚动更新来解决这个问题,但是这种解决问题的方法过于粗放;同时,在高并发情况下,这种方法的效率和第三种“简单模式”是等同的,在数据量低的情况下,账号资源越多,查询越快;但是问题比较明显:即当worker-pod中出现“坏点”时,会直接引发单点故障,这时候需要引入额外的调度器来调整滑动窗口,同时还要考虑每个worker-pod的状态以及当前的处理情况;总之,这种方法的复杂性非常高 (如果我能做到这个,我大抵就可以直接去当架构师了,可惜我学农)
  • “简单模式”(每个worker-pod简单地持有曲目信息,并简单地使用API轮询)
    这是我最开始想到的方法,没想到它成为了我最后选择的方法。这种方法本质上就是“滑动窗口模式”,但是每个worker-pod的滑动窗口都默认无限大(直接不用考虑一致性问题,甚至调节状态更加方便),并且不用滚动更新,也不用考虑任务队列的分配,当账号发生故障后,这种方法的故障转移非常简单(只要将任务重新入队即可);在数据量低的情况下,效率不如前两种模式(账号利用效率较低),在数据量大的情况下,这种方法的效率趋近于和“滑动窗口模式”,理想情况下和“滑动窗口模式”(忽略“滑动窗口模式”在调度上的摩擦)一样快;同时,这种方法最资源的消耗是最小的

综上,我最后选择“简单模式”

阶段 时机
初始化任务 在Pod启动时,仅执行一次
定时检查任务 在Pod运行时,以固定周期进行检查,一个“步骤”为一个独立的检查操作
循环任务N 在Pod运行时,循环执行的任务
其它 字面意思
worker-pod
阶段 步骤 .measure(…).map_err(…)? if let Err(e) = measures() {…}
初始化任务 拉取账号信息 worker从账号数据库内原子地取出一个账号,试图将其state设置为pending,并重置leaseTime,根据leaseTime的调整来设置定时任务,并定期续租,不选择rating过低的账号 如果连接不上数据库,则重试,并让Pod not ready;如果无符合条件的条目,自杀,毕竟这些Pod的命和我这种人的命一样低贱
拉取API配置,连接数据库 废话 如果失败,自杀
定时检查任务 拉取曲目列表 虽然这很傻,但是每个worker-pod都持有一个完整的曲目列表,而不是等entry-pod一个一个分发,在定时例行检查中,worker-pod检测曲目列表的版本(hash),如果没有或者过旧(或不一致),就更新 在worker-pod停机前,无论遇到何种错误,警报,一直重试
账号心跳 定期续租,防止极端情况 在worker-pod停机前,无论遇到何种错误,警报,一直重试
重新入队 定时尝试有限次重新入队,如果入队失败次数超过阈值,直接在worker-pod内存内放弃任务,等待controller-pod收尸 在worker-pod停机前,无论遇到何种错误,警报,一直重试
循环任务1 拉取任务 从Redis工作队列内拉取一个任务(工作队列中,所有worker-pod在同一个消费者组内,互相竞争任务),并且处理加好友操作,当worker-pod的任务数量超过设定的最大阈值或收到优雅停机信号时,跳过这个步骤 当拉取任务失败时,警告错误并跳过这个步骤;当加好友失败时,重试设定的有限次,每次失败都触发账号的错误处理,直到rating归零,任务失败,这时将所有任务加入重新入队列表
循环 循环任务有一个worker-pod内的游标,来轮询曲目列表;将轮询出来的结果记入数据库,就这么简单;当收到优雅停机信号时,将任务加入重新入队列表,并放弃任务,以这种途径放弃的任务不增加重试次数 当轮询触发风控时,仅在本次轮询记录重试次数,进入半线性退避等待(函数后面给),重试超过阈值时,触发账号错误处理;当任务重新入队失败时,当轮询触发账号错误时,正常触发账号错误处理
post循环 如果有任务完成了,则在Redis创建一个新List来存放任务结果,并将任务ID写入Redis里的“完成任务”列表,这两个操作需要原子化,当前两个操作成功完成后,使用XACK彻底判定任务为完成,回复当前账号一定量的rating,具体值根据配置决定;如果有任务没有完成,就在Redis队列内延长超时时间 当Redis的操作出现问题时,除了重试几遍以外,我真没招了,尝试将任务加入重新入队列表, 不过这时候重新入队应该是没希望了
一般错误处理 账号 如果账号出现认证失败,则尝试重新登录并将token记录到数据库内,若登录失败、引发其它的错误,或是账号发生了并非认证失败的错误(例如在API出现风控返回之外的其它错误),则降低账号的rating,并记录到账号数据库内,若账号rating过低,则触发worker-pod优雅停机(当触发worker-pod停机时,这个错误处理函数返回true) 如果在写入数据库的过程中失败了,打日志报警,然后接着启动优雅停机
停机 停机要优雅。在停机前,我们需要干些事情:首先取消定时检查任务,其次尝试将账号归还(设置stateIdle),然后等待重新入队列表清空,最长等待配置指定的时间,最后尝试删除消费者 在这个阶段,忽略所有错误
entry-pod
阶段 步骤 .measure(…).map_err(…)? if let Err(e) = measures() {…}
初始化任务 拉取配置,连接数据库,连接工作队列 废话 自杀
业务端点 账号数据库的CRUD 废话 错误送回客户端
工作队列的读取 废话 错误送回客户端
任务入队 几乎是最重要的端点,但是干的事情却很简单,即创建ID,并将任务推入任务队列,然后给客户端返回202以及有关任务查询的URL 如果创建任务失败,则将错误返回给客户端
任务查询 第二重要的端点,先查询结果List是否存在,若存在,则读取内部内容,提取内部信息与错误信息,然后返回给客户端;若不存在,则返回206,让客户端接着等去吧 错误返回客户端
controller-pod
阶段 步骤 .measure(…).map_err(…)? if let Err(e) = measures() {…}
初始化任务 拉取配置,连接数据库 废话 自杀
定时检查任务 任务队列 在Redis队列内XCLAIM超时的任务,加入自己的Pending队列,并尝试重新入队,成功后XACK自己Pending队列的条目;注意,controller-pod的消费者名为固定名,这样在controller-pod崩溃时,在Redis数据库的内容也不会丢失 忽略错误
账号列表 在账号列表内搜索过期的账号,并将其状态设置为Idle 忽略错误

好了,我感觉设计到这个程度,已经足够了

工作队列和结果

在整理完上面那一大坨东西之后,最后来解决一下比较简单的工作队列和结果结构这些在Redis侧的东西

首先,工作队列这一块,我们需要的东西十分简单:

Field Value
job:friend_id 字面意思
job:timestamp 加入任务的时间戳
job:retry_count 字面意思,任务的重试次数
job:cursor_start(Experimental) 资源搜索的idx的起始游标,用来负责“断点续传”
job:cursor_length(Experimental) 资源搜索的目前处理长度,用来和job:cursor_start计算当前点与终点
job:resource_hash(Experimental) 资源文件的Hash,当此值与worker-pod记录的值不一致时,需重置job:cursor

(To be continued)