Architecture
Event-SDK
Feature
- 消息统一收集、发布或者取消
- 客户端消费统一处理
- 客户端发送消费异常上报
- 重复消费幂等校验
- 自动注册生产者、消费者
- 消息打通zipkin链路追踪(可插拔),串联生产者和消费者链路
工作原理
消息发布:通过@GlobalEvent
注解实现分布式消息的发布,参考SEATA工作原理
消息消费:通过拦截@RabbitListener
注解监控客户端消费情况
灵魂注解
@GlobalEvent
注解下所有EventService.send()的消息,只是统一收集到事件中心
- 注解的方法成功返回时,则统一做消息的发布
- 注解的方法抛出异常,则取消消息的发布
@MQEvent
发送到消息队列的对象[EventService.send()],必须添加此注解
如:eventService.send(exchange, null, soLineCancelledEventDTO),须在SoLineCancelledEventDTO类上添加此注解
/**
* category: MQ消息分类,按产品线+系统+模块+[对象]的方式命名
* exchange: RabbitMq exchange
* type: RabbitMq exchange type
* routingKey: RabbitMq routingKey
* desc: 消息体描述
* object: 对象名称
*/
@Data
@MQEvent(category = "SD:OMS:SO:LINE", exchange = "oms.so.line.cancelled.fanout", routingKey = "",
type = ExchangeTypes.FANOUT, desc = "订单行已取消", object = "SoLineCancelledEventDTO")
public class SoLineCancelledEventDTO {
private String soLineCode;
...
}
如果一个DTO用于多个场景,可使用@MQEvents
批量注解
- 项目启动时,会扫描该注解注册生产者信息到事件中心。
用途:用于做消息的闭环管理;关系拓扑图可视化
@Override
public void run(String... args) throws Exception {
log.info("开始注册消息生产者信息");
CompletableFuture.runAsync(()->clientService.registerProducer());
log.info("开始注册消息消费者信息");
CompletableFuture.runAsync(()->clientService.registerConsumer());
}
Event-Center
Feature
- 消息发布MQ
- 消息缓存、持久化
- 全局异常处理
- 生产端、消费端注册
- 发布失败重试
- 消费失败,重新发布
- 消息闭环管理(状态标识是否所有消费者已成功消费)
没啥特殊的,代码一看都懂
事件发布
- 消息体数据保存在ES(非阻塞式),保存时同步写入redis缓存
- 获取消息数据时,先从缓存获取数据,获取不到再从ES获取
- 事件发布,
mqContext.send(event)
使用了多MQ数据源适配 - 可根据
mqCode
自动切换并发送消息到对应的rabbitMQ上 - 整个发布过程,发生任何异常,则标记事件发布失败,定时调度自动重试
- 消息投递rabbitMQ成功,但不能成功消费的(如:无监听者,消息丢弃),则通过
RabbitConfirmCallback
回调标记事件状态
消息失败重试
不管是消息发布失败,还是客户端消费失败(SDK已做幂等消费),
都是统一由事件中心重新投递消息到MQ解决
目前提供如下方式:
- 统一由定时调度任务处理,定时每5min执行一次,重试次数默认为5次
- 使用postman指定参数手动调用api执行
- 使用前端的可视化界面查看并在界面上点击重新发布按钮
更多请阅读源码,欢迎小伙伴参与贡献
EVENT-SDK
EVENT-CENTER
作者:李世荣 创建时间:2021-11-03 21:54
最后编辑:李世荣 更新时间:2024-04-28 09:23
最后编辑:李世荣 更新时间:2024-04-28 09:23