我不行了……

话说每次写正文之前都要这样发一下牢骚吗……
(不过最近的考试真的超多……)

架构

在此之前

曾经我们在草稿1上讨论过相关的架构问题;当然最终,我们选择并实现了第一种架构,在mock测试后,我们将其目标地址设置为真实服务器再进行测试时,结果是成功的

但是……

速度太慢了,完全遍历一个用户所消耗的时间是9分钟,有点不可忍受,对吧;更倒霉的是,之间的架构在多worker情况下,把worker的数量全叠到最大并行数上去了,每个任务所消耗的时间不会因为worker数量的增多而减少,这是一个致命的问题——因为我们其实并不需要这么多的并行,我们要做到的是让单个用户的查询速度尽可能快

虽然我认为为了先做出一个能跑的prototype而牺牲一点性能无可厚非,但是目前看来我的决定是愚蠢的

看看之前的东西……

所以我现在选择了第二个极端(暂时),将worker数量全部叠到对单个任务的处理速度上……

经过 上完遗传学课到晚饭的那 一段时间,我重新复盘了一下在草稿1中提到的第二种方案:

“滑动窗口模式”(每个worker-pod持有一个曲目列表的滑动窗口,通过工作队列,多个worker-pod并行处理单个账号的数据)
这种模式降低了API请求次数,同时降低了数据库吞吐量,但是使用这种方式的前提是“每个worker-pod”知道自己的滑动窗口,这就对集群内动态扩缩容的情况下保持多个worker-pod的一致性提出了不小的挑战,我也考虑通过滚动更新来解决这个问题,但是这种解决问题的方法过于粗放;同时,在高并发情况下,这种方法的效率和第三种“简单模式”是等同的,在数据量低的情况下,账号资源越多,查询越快;但是问题比较明显:即当worker-pod中出现“坏点”时,会直接引发单点故障,这时候需要引入额外的调度器来调整滑动窗口,同时还要考虑每个worker-pod的状态以及当前的处理情况;总之,这种方法的复杂性非常高 (如果我能做到这个,我大抵就可以直接去当架构师了,可惜我学农)

——草稿1

我发现我似乎高估了这种方案的复杂性,我当时的想法有可能是让worker拥有额外的状态来获取自己的滑动窗口,并且,在worker具有这种额外状态的情况下,我之前的顾虑还是有点道理的(x);但是,我想这个状态可以不由worker持有和管理,这么一来,问题可能会变得简单一些……

所以

我目前更加倾向使用曾经的第二种方法,不过要稍加改造……

TL;DR:这种方法的工作方式相似于第一种(也就是目前的实现),但是不同的地方在于,这个方法将song_list拆分为多个“窗口”,并让worker们分别负责其中的一个窗口

对于worker的滑动窗口,肯定是要有一个地方定义的,既然worker变成stateful并不是一件好事,那我们就把这个state转嫁给其他东西……是的,就是我们的account_table

PostgreSQL数据库方面,我们在account_table表加一列新的东西,就叫它slide_window,这个列记录了这个worker的滑动窗口;worker在占领账号时,也不再直接将Idle置换为Using,而是将Idle写为Requested,然后等待下一步指示;同样,worker在释放账号时,也不再直接将Using写为Idle,而是写为Finishing

我们给entry一项新任务,让entry定时遍历这个数据表(反正账号数据不多,worker的上下线也不频繁💦),并执行以下流程(伪代码):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
fn scheduled_task(){
// 为了方便理解,我们先假设以下所有数据库操作是一整个原子操作(实际情况下,拉到的数据库在更改时可能已经变化);并且将所有async函数视作同步函数(粗犷)
let table_entries = db::fetch_account_table();
let has_request = table_entries.iter().any(|it|it.state=="Requested"||it.state=="Finishing");
if !has_request {
return;
}
let available_workers = table_entries.iter().filter(|it|it.state=="Using"||it.state=="Requested").len();
let song_list = data::SONG_LIST.脑补一下从OnceLock<RwLock<Option<SongData>>>里取东西的函数();
let slide_windows = 脑补一下将song_list均匀切成n份的函数(song_list.clone(), available_workers);
for (entry, window) in table_entries.iter().zip(slide_windows.iter()) {
match entry.state {
"Using" => {
db::sql(
r#"
UPDATE account_table
SET slide_window = $1
WHERE id = $2
"#,
)
.bind(window.序列化())
.bing(entry.id)
.execute();
},
"Requested" => {
db::sql(
r#"
UPDATE account_table
SET state = 'Using', slide_window = $1
WHERE id = $2
"#,
)
.bind(window.序列化())
.bing(entry.id)
.execute();
},
"Finishing" => {
db::sql(
r#"
UPDATE account_table
SET state = 'Idle'
WHERE id = $1
"#,
)
.bing(entry.id)
.execute();
},
_ => {}
}
}
}

TL;DR:这个任务将song_list根据活动的worker数量重新拆分为窗口,并更新条目状态

接下来,worker要改的东西就很简单了,即在初始化时定时查询自己对应的条目是否为Using来判断自己是否获取到了窗口;在每个主循环内,也查询一次窗口的状态,当worker缓存的窗口与数据库中记录的不符,就停止拉取任务,直到自己的有效任务数为0后,将自己的缓存窗口设置为数据库中的记录,然后继续开始拉取任务

在任务发布时,entry一如往常,在Redis队列中发布一项Job,格式与之前基本相同,不同的是,这次所有worker在不同的消费者组内消费这条消息,也就是说所有worker都同时收到这一条Job,然后开始在自己的窗口执行工作

每个worker会向辅助结果列表写入一个条目,它记录了这个worker所完成的查询量,entry可以通过查询完成的查询量来判断任务是否完成

解决的问题……以及目前能考虑到的缺陷

当worker挂掉时,优雅停机会将worker绑定的数据库条目的state设为Finishing,并尝试向辅助结果队列写入失败类型的查询量(包括了窗口位置以及结果列表和辅助结果列表的名称),进而触发滑动窗口更新,其余的worker也会停止接活,在完成自己手头的所有任务后更新自己的滑动窗口,这让这个设计支持动态扩容

但是目前有一个问题,即worker挂掉之后会出现“坏段”,目前能考虑到的做法是,将这个”坏段“加入一个专门的重试队列,让所有worker以相同的消费者组争夺重试队列中的内容,然后优先处理这些任务,但是,这个做法也有一定的问题,它让worker“分心”出来处理这个单独的任务,而脱离了它原有的窗口,这可能拖慢查询进度——当然,坏段越小越少(也就是现役worker越多),这个现象就会越不明显

666还有第二关

随着我拙劣的设计被qianmo佬打至趴下,我们共同完成了第三个设计(目前处于静等瑞平状态)

以下内容从当时的文档中摘录而来

对于一些背景知识,相关文章已经说的挺清楚的,下面是我关于如何解决这个问题的一些想法。

关于状态维护在大体上和nofyso老师的想法差不多,但我认为「滑动窗口」还是太复杂了。

我们不说「滑动窗口」,我们说「任务分块」。

将任务划分成一定大小的块放进任务队列,worker竞争性领取分块任务执行提交即可。

worker能够自发的接取任务很重要,一旦entry做了某种形式的分配,它就要不断维护任务的从属关系,同时还要处理各种一致性问题。

总之worker不应该是「拥有任务的节点」,而只能是「执行任务的计算资源」。

如果单单从理想情况下的任务执行时间来看,二者实际上是接近的:滑动窗口由于减少了调度与状态维护的开销,理论上甚至可能比 chunk 方案效率更高一些。

但这个前提建立在一个非常理想化的假设上:

「所有worker都能以近似的速度稳定完成任务」

而实际情况往往并非如此。

worker之间天然会存在差异:

「网络波动」

「API RTT变化」

「retry与限流」

「坏段补偿」

「节点性能波动」

甚至单首曲目的处理异常,都有可能阻塞该worker后续的全部任务。

而滑动窗口方案的一个核心问题在于:整个任务必须等待最慢的worker完成才能进行最终汇总

这意味着,木桶效应在滑动窗口模型中会被放得非常明显:

「有的worker早已空闲」「有的worker却仍然卡在长尾任务」「其余worker无法有效接管其工作」

总之最终整体耗时被最慢节点决定。

相比之下,chunk方案的系统则天然趋向动态****负载均衡

快worker会自然领取更多chunk,慢worker则领取更少,因此整体耗时会更接近于**「总工作量 / 总算力」**

拥有有更强的容错能力,也不容易被某个异常worker拖垮整体任务。

从实现复杂度来看,chunk方案实际上也更加简单。

entry不需要提前为worker规划窗口,也不需要维护复杂的worker状态;worker之间更不需要互相同步。整个系统真正流动的只有待执行的chunk。

worker本身只负责:领取 -> 执行 -> 提交 即可。

而滑动窗口方案则会逐渐引入很多额外问题:

  • 某个worker执行过慢,要不要继续给它分配窗口?
  • 后续任务是否需要等待它?
  • 如果跳过它,是不是又要重新切分窗口?
  • 新窗口该如何同步?
  • 状态如何迁移?
  • 卡住的worker恢复后怎么办?
  • 其他worker已经开始下一个任务时,这个“空出来的位置”又如何处理?

这些问题本质上都来自于:worker持有了「任务所有权」

系统必须围绕worker维护状态一致性。

而chunk方案天然不存在这些顾虑。

因为worker从未真正「拥有」任务,它只是临时消费chunk的执行器。worker可以随时加入、退出、崩溃或恢复,而不会影响整个系统的调度结构。

——qianmo佬的瑞平

看完瑞平之后,我感到我之前就是个大傻X,并且发现曾经的某个“一看就不行的”方案(草稿1中的方案1)和上一个方案(草稿1中的方案2)何尝不能来一次结合;经过粗略的思考后,我想出了第三个方案:

宽松版滑动窗口(什)

虽然但是,worker各自轮询自己的段的工作策略不打算变,根据qianmo的建议,现在将重新从第一版开始往上改

更改

首先,修改Redis侧工作队列的结构,从原本的单 cuscuta:job ,变为 cuscuta:job:chunk_[timestamp]_[hash]_[from]_[to]

总的策略为,entry在加入任务时,提前动态划分chunk,然后按chunk来将任务划分为多个任务队列,每个worker使用在每个Redis Stream使用相同的消费者组中不同的消费者来保证同步

(然后,不再在PostgreSQL中加入静态的slide_window,也不再在entry添加定时的数据库更新)

worker首先使用 SCAN 0 "cuscuta:job:*" type stream 来搜索现有的chunk队列,然后从中按时间顺序选取一个队列(由于同一消费者组的同步,随机选择并不会导致两个worker选取到同一个队列),如果队列为空,则继续选取下一个队列,直到发现队列内有任务;当worker获取到任务后,worker会持续消费这个队列的内容,在worker的获取到的任务耗尽且无法在一段时间内在这个队列内拉取任务后,worker就重复这一段的步骤,直到找到下一个chunk队列

扩展、容错与缺陷

在新worker上线后,由于entry在入列任务时会自动划分chunk,并创建新chunk队列,此时只需等待原有chunk队列的内容全部消耗完毕,worker会自动寻找新划分的chunk队列,worker下线同理,不过此时完成自己队列任务的worker会自动转向旧的遗留队列

容错方面,当worker宕机,任务失败时,在worker能够优雅停机的情况下,只需将任务原样入列;但worker若因不可抗力因素停机,无法执行优雅停机过程(如OOM,kiil -9)时,咱们可能就需要一个额外的操作来回收旧任务……一个可能的方案是,在worker搜索chunk队列时,先尝试XCLAIM这个队列内的任务,再尝试XREADGROUP

这里还需要注意的一个细节是,现在XREADGROUP应该尝试一次性获取尽可能多的任务,防止多个worker抢占到同一个Stream(不过即使真的这样了,在worker消费完成后,它们仍然会去抢占下一个Stream,只不过效率会降低)

另一个问题是Redis stream的存活时间需要设计,不然随着动态扩缩容,cuscuta:job的子项目会无限增多,这不利于worker对它们的遍历(因为worker不知道队列内是否有任务,会尝试获取任务,这会增大延迟和开销),这个时间需要按块大小灵活决定

TL;DR

这个方案本质上只是上一个方案的宽松型改版(给qianmo佬桂霞了x2),主要改变以及原因如下

  1. 将定时更改的“滑动窗口”改为了在队列内分配的chunk,灵活性增加了,worker不需要等待数据库或者什么地方的目标段发生了变化,它们可以自己做决定
  2. 对工作队列结构进行更改的主要原因其实是原先的工作队列无法满足“worker主动选取自己‘想要’的chunk”的目的(原因很简单:如果不改变(原先为单一队列的)工作队列结构,2或多个worker极有可能选取到不同账号的相同段而直接丢弃掉不属于这个段的一些任务,若这些任务不巧被所有的worker丢弃(实际上,这是很有可能发生的),那它将在很长很长的时间内无法被消费,直到下一个worker进入——这是不可容忍的)

后日谈

果然一个人的能力还是有限的,专业的事情还得是专业的人来干啊……

哦对了,scirpophaga刚刚取得了一些突破性(?)进展,我会在下一张草稿纸中做总结……