延时队列带重试的几个架构比较

Table of Contents

  1. 延时队列带重试的几个架构比较
    1. 业务流程描述
    2. 方案1 使用kafka
      1. 难点
    3. 方案2 使用数据库+java延时队列DelayQueue
      1. failover
      2. 优点
      3. 缺点
    4. 方案3 使用Redis的zset和list
      1. 优点
      2. 缺点

现在很多系统因为消息队列的引入,会延时处理请求,导致结果无法同步获取,需要异步延时去单独查询。 而去查询的时候,一般是需要间隔延时多少秒或者多少分钟后去查询,举例:比如调用政府平台服务,但是因为它处理能力有限, 需要在请求处理之后一分钟后去查询处理结果,这个时候我们的程序不可能同步阻塞在这里,然后一分钟后去查询处理结果,更新数据库。

常用的解决方案是将查询请求放到延时队列中去,然后消费发送请求,再根据处理结果更新数据库。 但大多数对延时处理支持并不是很好,rabbitMQ是支持的,kafka需要自己实现时间轮,所以还是比较复杂的。

业务流程描述

目前这边是有几个业务系统,都是调用的第三方的服务上传更新一些信息,无法同步得到结果,需要延迟去查询处理结果后更新数据库。 其他业务系统不可能每个都写自己的一套延时处理的东西出来,因此需要一个公共的服务去延迟查询结果,然后回调通知各个业务系统。 造成这样的局面很大的一个原因是:微服务架构的乱用,本来一个项目实例能够解决的问题,非要搞7,8个出来。

方案1 使用kafka

之所以先考虑kafka是因为这个很火,现在很多公司都集成了有,使用它成本会很小。 业务系统请求第三方服务,得到一个唯一id,然后业务系统将唯一id,自己的回调地址或者微服务的服务名 发送到延时处理系统DS,DS存kafka后,直接返回。

kafka实现时间轮,然后一分钟后去查询,但是kafka是批量消费的,如果只成功处理了一部分数据,但是另一部分失败了,offeset如何处理? kafka新的版本的确是支持offset的自定义提交的,也就是只提交指定的topic的partition的offset,使用 commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets)方法搞定, 但是假设第三方服务不可用了,可能超时,可能挂了,这时如何做?sleep一定时间。

查询第三方服务得到处理结果,有3个状态:成功,失败,处理中。 成功的,失败的直接通知给业务系统,处理中的怎么做?当然是一分钟后重试,那将请求重新塞回到kafka。 那这地方的语义就不是很好,consumer中又有了重新塞回请求的producer,但是加入到mysql中重试就成本大了,复杂了。

回调通知失败了,如何做??这个又是问题,还是得延时重试,比如1分钟后重试第一次,3分钟后重试第二次,或者简单粗暴的 每隔一分钟重试一次,3次重试完了告警发邮件。 这里的关键点是:重试的次数这个状态信息报错在哪里?

难点

所以kafka来实现的难点2个:1. 时间轮的实现;2.消息语义不明。3.重试次数状态信息保存

方案2 使用数据库+java延时队列DelayQueue

  1. 业务系统请求第三方服务,将返回的id,回调地址或者微服务服务名发送到延时处理系统DS, DS存DB,计算出逾期时间,存延时队列DelayQueue
  2. 元素逾期后从DelayQueue中出队消费,向第三方服务查询处理结果, 请求失败(超时或者第三方服务不可用),重新计算逾期时间,塞回DelayQueue
  3. 请求得到结果,成功或者失败,更新Db,回调通知业务系统;处理中,重新计算逾期时间,塞回队列
  4. 回调通知失败,计算下一次的重试时间,塞入到另一条DelayQueue;回调通知成功,更新db,得到最终状态。

failover

这是这个系统的最大问题,延时处理系统DS肯定是微服务,多实例部署的,但是mysql实例只有一个, 如何在DS系统实例挂掉之后,或者启动的时候合理分配数据成了最大问题,想到的解决方案是 在添加一张表,作为全局记录的一个排斥锁,记录上一个实例load的数据最后id,然后顺序load。

优点

  1. 依赖服务少,实现简单
  2. 持久化已经支持

缺点

这个架构的缺点有以下几个:

  1. 每个服务实例都是消费一个DelayQueue,会成为性能瓶颈
  2. db删除修改成本太大,最终状态的数据会不断堆积需清理
  3. 代码实现要写的太多,难免出问题
  4. 数据库是单点的,分布式多服务节点如何协同合作分配数据?
  5. db适合多读的情况,也即是数据状态应该尽量不变,但是这里变化很多

方案3 使用Redis的zset和list

参考的有赞商城的架构设计,但是实现较简单,都是一个DelayQueue,一个ReadyQueue。 但是由于从DelayQueue中取数据是批量的,取出来后的操作都是多线程异步的,因此问题应该不大。 具体流程就是:

  1. 业务系统请求第三方服务,成功后,将id,回调地址或服务名发送给延时处理系统DS,
  2. DS以返回id为唯一key(如果非全局唯一就要用SnowFlake之类的生成),对象转map存到redis hash结构中, 同时计算出逾期时间作为score,id作为value存到zset结构的延时队列DelayQueue中。
  3. 逾期时间到了,延时队列出队消费,立刻放到list结构的ReadyQueue中
  4. 饥饿消费ReadyQueue队列,然后查询第三方服务处理结果,查询得到结果(成功或失败)就从hash结构中删除对象, 失败就将重试次数加一,重试计算逾期时间,塞入到延时队列中。
  5. 然后将查询结果回调通知到业务系统,回调成功就整个完结,回调失败(超时或者被回调对象服务不可用)塞入到另外的 延时重试队列之中,回调超过3次,发邮件告警。

优点

  1. 因为所以的数据都是保存到Redis中的,所以数据的全局分配等等都不成问题。
  2. 回调通知和具体的延时时重试业务是分离开的,耦合度大大降低
  3. 易扩展,后期可根据不同的业务放到不同的队列中

缺点

  1. 持久化成问题,最好是用Redis的RDB和日志结合
  2. 需要部署单独的Redis集群,和其他业务分开

PROGRAMDESIGN
DelayQueue,延时队列,重试,kafka,redis