导入配置
Import pom.xml
<!-- 消息总线SDK -->
<dependency>
<groupId>com.cvte.csb</groupId>
<artifactId>event-sdk</artifactId>
<version>${event.sdk.version}</version>
</dependency>
版本说明
跟CSB的版本对应关系:
- csb-cloud< 3.x版本,2.0.9 <= event-sdk < 3.x
- csb-cloud>= 3.x版本,event-sdk >= 3.0.1
具体版本更新内容,请查看 版本更新
为了避免CSB版本冲突问题,建议exclude相关包
<!-- 消息总线SDK -->
<dependency>
<groupId>com.cvte.csb</groupId>
<artifactId>event-sdk</artifactId>
<version>${event.sdk.version}</version>
<exclusions>
<exclusion>
<groupId>com.cvte.csb</groupId>
<artifactId>csb-base</artifactId>
</exclusion>
<exclusion>
<groupId>com.cvte.csb</groupId>
<artifactId>csb-core</artifactId>
</exclusion>
<exclusion>
<groupId>com.cvte.csb</groupId>
<artifactId>csb-config</artifactId>
</exclusion>
<exclusion>
<groupId>com.cvte.csb</groupId>
<artifactId>csb-web</artifactId>
</exclusion>
<exclusion>
<groupId>com.cvte.csb</groupId>
<artifactId>csb-cloud</artifactId>
</exclusion>
</exclusions>
</dependency>
Apollo Config
event.center.url = event-dev.gz.cvte.cn //事件中心服务域名
各环境配置如下:
DEV,TEST: event-dev.gz.cvte.cn
UAT: event-uat.gz.cvte.cn
PROD: event.gz.cvte.cn
event.mq.mqCode = biz //事件中心MQ数据源配置编码
目前已配置的有:
商务域:biz
供应链:scm
研发域:cplm
财务域:fin
【如未在以上清单,请联系世荣配置】
event.source.prodLine = SD //产品线标识
event.source.system = OMS //系统标识
event.source.tenantId = 78076a48-bb75-4f55-be7f-d84e5e657bbe //产品线租户ID
application.properties
event.dto.package = xxxxx // 定义发送到Mq的消息体DTO扫描路径(如有多个逗号分隔),如:com.cvte.biz.core
Add GE_ID/BRANCH_ID To OkHttp Header
public class HeaderConfig implements OkHttpTokenInterceptor.IHeaderConfig {
@Override
public Map<String, String> getHeaders() {
HashMap<String, String> header = new HashMap<>();
header.put(GE_ID, StringUtils.isNotBlank(EventCurrentContext.getGeId()) ? EventCurrentContext.getGeId() : "");
header.put(BRANCH_ID, StringUtils.isNotBlank(EventCurrentContext.getBranchId()) ? EventCurrentContext.getBranchId() : "");
return header;
}
}
注意
如果项目有开启MQ消费retry配置(spring默认关闭),建议关闭掉,event-center本身有消费失败的重试机制,客户端没必要开启。
客户端开启retry,可能会出现多次ack导致channel自动关闭的问题,建议关闭掉:
spring.rabbitmq.template.retry.enabled=false
spring.rabbitmq.listener.simple.retry.enabled=false
spring.rabbitmq.listener.direct.retry.enabled=false
开始使用
Send Message To Event-Center
注意事项
- 有发送MQ事件的,一定要添加@GlobalEvent注解,建议放在最外层Service
- @GlobalEvent注解代码里,任何地方抛出了异常(事务回滚),都会自动取消MQ消息的发布
@Autowired
private EventService eventService;
/**
* @GlobalEvent注解包裹下的所有MQ消息,将在请求结束后统一发布到MQ上,类似于seata的@GlobalTransactional
*/
@GlobalEvent
public void mqExample() {
SoCreatedMsgDTO soCreatedMsgDTO = new SoCreatedMsgDTO();
SoLineCancelledEventDTO soLineCancelledEventDTO = new SoLineCancelledEventDTO();
/**
* 注意:所有的exchange和routingKey的定义,必须使用统一的常量定义,不能写死字符串(如下例子,为了方便大家看命名规则)
* 命名规则:系统 + 模块 + [子模块]+ 业务动作 + 投递方式
* 如:oms.so.created.fanout
* 具体命名可参考:https://kb.cvte.com/pages/viewpage.action?pageId=228600933
*/
// 发送MQ信息到exchange,不指定routingKey
eventService.send(OMS_SO_CREATED_FANOUNT_EXCHANGE, null, soCreatedMsgDTO);
/**
* 发送MQ信息到exchange,根据不同场景发送不同的routingKey
*/
// 下采购
eventService.send(OMS_SO_LINE_STAGE_CHANGE_TOPIC_EXCHANGE, OMS_SO_LINE_PURCHASE_PUSHING_ROUTING_KEY, soLineCancelledEventDTO);
}
=== 其中消息体定义说明如下,需要使用@MQEvent注解说明,用于注册MQ生产端消息
/**
* category: MQ消息分类,按产品线+系统+模块+[对象]的方式命名
* exchange: RabbitMq exchange
* type: RabbitMq exchange type
* routingKey: RabbitMq routingKey
* desc: 消息体描述
* object: 对象名称
*/
@Data
@MQEvent(category = "SD:OMS:SO:LINE", exchange = "oms.so.line.cancelled.fanout", routingKey = "",
type = ExchangeTypes.FANOUT, desc = "订单行已取消", object = "SoLineCancelledEventDTO")
public class SoLineCancelledEventDTO {
private String soLineCode;
...
}
Consume Message
注意:
监听方法上的Message message, Channel channel两个参数是必须的且顺序保持一致,不然没法做到自动应答和幂等的处理.
消费发生异常时需要把异常往外抛出,不能try catch吃掉,不然组件是会认为该消息是消费成功的,不会再重试
@Component @Slf4j public class MqConsume { @RabbitListener(containerFactory = "biz", bindings = @QueueBinding( value = @Queue(value = "test.queue.success"), exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC), key = "testKey" )) public void consumeSuccess(Message message, Channel channel) throws IOException { // convert to Object TestMsgDTO testMsgDTO = MqMessageUtils.parseObject(message, TestMsgDTO.class); // convert to List<T> // List<SoHeaderDTO> soHeaderDTOList = MqMessageUtils.parseList(message, SoHeaderDTO.class); // do your business // log something you like XLog.info(testMsgDTO.getCode(), "consume success, code:{}, name:{}", testMsgDTO.getCode(), testMsgDTO.getName()); } @RabbitListener(containerFactory = "biz", bindings = @QueueBinding( value = @Queue(value = "test.queue.failed"), exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC), key = "testKey" )) public void consumeError(Message message, Channel channel) { if(true) { throw new ParamsIncorrectException("参数传错啦,你走吧"); } } }
增强组件[可选]
具体的组件引入使用demo可参考demo项目
spring-multirabbit
消费使用原生的@RabbitListener即可,如有需要可使用spring-multirabbit的包,支持多数据源配置 开箱即用,注意:目前只支持springboot2.x的版本
<dependency>
<groupId>com.cvte.multirabbit</groupId>
<artifactId>spring-multirabbit</artifactId>
<version>3.2</version>
</dependency>
多数据源配置请参考:spring-multirabbit配置
Zipkin Trace
微服务盛下,链路追踪必不可少
目前基于zipkin已增强的功能有:
- 通用的异常拦截上报,记录堆栈信息
- 自定义header属性记录
- http请求的parameter和body的记录
- 作为消息组件的扩展,已打通生产消费链,可以清晰追踪生产端、消费端的情况
springboot为1.x的使用
<!-- Zipkin链路追踪 -->
<dependency>
<groupId>com.cvte.csb.trace</groupId>
<artifactId>csb-zipkin</artifactId>
<version>2.0.1-SNAPSHOT</version>
</dependency>
springboot为2.x的使用
<!-- Zipkin链路追踪 -->
<dependency>
<groupId>com.cvte.csb.trace</groupId>
<artifactId>csb-zipkin</artifactId>
<version>3.0.1-SNAPSHOT</version>
</dependency>
添加Apollo配置,请参考:https://kb.cvte.com/pages/viewpage.action?pageId=266454435
强大的日志组件
功能太多,晚点补充
<dependency>
<groupId>com.cvte.biz</groupId>
<artifactId>business-log</artifactId>
<version>${business.log.version}</version>
</dependency>
相关文档请参考:日志组件
作者:李世荣 创建时间:2021-11-03 21:53
最后编辑:李世荣 更新时间:2024-04-28 09:24
最后编辑:李世荣 更新时间:2024-04-28 09:24