问题及原因
线上日志报了个错:
Query error: Duplicate entry 'XXXXXXXXXXXXXXXX' for key 'uniq_order_id' - Invalid query: INSERT INTO `xxx_result` (`sign`, `expire_at`, `order_id`) VALUES ('VNEZvnetZ0cib84s1Io206rp9vuF1keXGsTKoDeEtPw6oLpmnSfi/KNlYkq+EfpV', '2025-05-23 08:12:29', 'XXXXXXXXXXXXXXXX')
出错代码如下:「先查询一下,如果有则更新,否则插入」,报错应该是第5行,同时有多个进程插入同样的数据,后插入的那个报错了。
$info = $this->model->get_row($where);
if ($info) {
$this->model->update($data, $where);
} else {
$this->model->insert($data);
}
那么为什么会有同样的进程在执行呢?
往上层调用查看,发现是消费队列常驻消费者进程代码调用的。那么消费者进程为什么会有多个同时在执行呢?
再看日志,搜索订单号,查看相关日志,发现有一个远程API回调几乎是同一时刻回调了两次,两次回调的内容是一样的,回调的内容会被添加到消息队列中。也就是说消息队列中有两个一样的消息。
但我记得在 supersivor 配置文件中 numproc=1 应该只有一个进程啊,按顺序消费也不至于报错。为什么会有多个进程?
后来问了一下运维,消费者进程部署在了不同的机器(4个)上面,每个机器的限制是一个,但同时可能会有多个。
终于原因找到了。
解决
那么如何解决呢?
想到两个方案,一个是在代码中加锁,另一个是使用 rabbitmq 的「单一活跃者队列」。
代码加锁
想到两种加锁方式,一种是不等待,一种是等待。
不等待加锁代码如下:
$info = $this->model->get_row($where);
if ($info) {
$this->model->update($data, $where);
} else {
if (lock::get($uni_key, 5)) {
$this->model->insert($data);
} else {
log();
exit();
}
}
获取一个维持5秒的锁,如果获取不到,说明这时已经有进程在处理同样的数据,直接记日志并退出处理。如果获取到了锁,则执行插入。
等待的加锁(阻塞)
$info = $this->model->get_row($where);
if ($info) {
$this->model->update($data, $where);
} else {
lock::wait_get($uni_key, 60))
$info = $this->model->get_row($where);
if (!$info) {
$this->model->insert($data);
}
lock::release_lock($uni_key)
}
获取一个超时时间为20秒的锁,如果获取不到,则内部报错记日志。20秒内,一直等待直到获取锁,获取到锁之后,再查一遍,如果查不到,再插入,结束之后释放锁。
单一消费者队列
rabbitmq 有一个配置是 'x-single-active-consumer' => true
,设置之后,一个队列如果有多个消费者的,那么同时只能有一个消费者活跃,即消费消息。^1 在代码的消费者中修改声明队列的方法,添加上述配置。
线上现有的队列已经生成,无法添加队列属性,有两个做法,
一个是删掉线上队列,上线新代码之后,重新启动消费者进程,重新创建队列。但这样可能会导致丢消息。
另一个是修改一下队列配置中的队列名,上线新代码之后,重新启动消费者进程,创建新队列。这种比较可行。
总结
代码修改方式比较灵活,但是容易出 bug,并且每个业务涉及的地方都需要改,出错概率也很大。
单一消费者队列配置比较简单,但是也限制了队列的吞吐量,对于需要多个消费者的队列并不适用。