Architecture

Event-SDK

Feature

  • 消息统一收集、发布或者取消
  • 客户端消费统一处理
  • 客户端发送消费异常上报
  • 重复消费幂等校验
  • 自动注册生产者、消费者
  • 消息打通zipkin链路追踪(可插拔),串联生产者和消费者链路

工作原理

消息发布:通过@GlobalEvent注解实现分布式消息的发布,参考SEATA工作原理
消息消费:通过拦截@RabbitListener注解监控客户端消费情况

灵魂注解

@GlobalEvent

注解下所有EventService.send()的消息,只是统一收集到事件中心

  • 注解的方法成功返回时,则统一做消息的发布
  • 注解的方法抛出异常,则取消消息的发布
@MQEvent

发送到消息队列的对象[EventService.send()],必须添加此注解
如:eventService.send(exchange, null, soLineCancelledEventDTO),须在SoLineCancelledEventDTO类上添加此注解

/**
 * category: MQ消息分类,按产品线+系统+模块+[对象]的方式命名
 * exchange: RabbitMq exchange
 * type: RabbitMq exchange type
 * routingKey: RabbitMq routingKey
 * desc: 消息体描述
 * object: 对象名称
 */
@Data
@MQEvent(category = "SD:OMS:SO:LINE", exchange = "oms.so.line.cancelled.fanout", routingKey = "",
        type = ExchangeTypes.FANOUT, desc = "订单行已取消", object = "SoLineCancelledEventDTO")
public class SoLineCancelledEventDTO {
    private String soLineCode;
    ...
}

如果一个DTO用于多个场景,可使用@MQEvents批量注解

  • 项目启动时,会扫描该注解注册生产者信息到事件中心。
    用途:用于做消息的闭环管理;关系拓扑图可视化
    @Override
    public void run(String... args) throws Exception {
        log.info("开始注册消息生产者信息");
        CompletableFuture.runAsync(()->clientService.registerProducer());
        log.info("开始注册消息消费者信息");
        CompletableFuture.runAsync(()->clientService.registerConsumer());
    }

Event-Center

Feature

  • 消息发布MQ
  • 消息缓存、持久化
  • 全局异常处理
  • 生产端、消费端注册
  • 发布失败重试
  • 消费失败,重新发布
  • 消息闭环管理(状态标识是否所有消费者已成功消费)

没啥特殊的,代码一看都懂

事件发布

  • 消息体数据保存在ES(非阻塞式),保存时同步写入redis缓存
  • 获取消息数据时,先从缓存获取数据,获取不到再从ES获取
  • 事件发布,mqContext.send(event)使用了多MQ数据源适配
  • 可根据mqCode自动切换并发送消息到对应的rabbitMQ上
  • 整个发布过程,发生任何异常,则标记事件发布失败,定时调度自动重试
  • 消息投递rabbitMQ成功,但不能成功消费的(如:无监听者,消息丢弃),则通过RabbitConfirmCallback回调标记事件状态

消息失败重试

不管是消息发布失败,还是客户端消费失败(SDK已做幂等消费),
都是统一由事件中心重新投递消息到MQ解决

目前提供如下方式:

  • 统一由定时调度任务处理,定时每5min执行一次,重试次数默认为5次
  • 使用postman指定参数手动调用api执行
  • 使用前端的可视化界面查看并在界面上点击重新发布按钮

更多请阅读源码,欢迎小伙伴参与贡献
EVENT-SDK
EVENT-CENTER

作者:李世荣  创建时间:2021-11-03 21:54
最后编辑:李世荣  更新时间:2024-04-28 09:23