导入配置

Import pom.xml

<!-- 消息总线SDK -->
<dependency>
    <groupId>com.cvte.csb</groupId>
    <artifactId>event-sdk</artifactId>
    <version>${event.sdk.version}</version>
</dependency>
版本说明

跟CSB的版本对应关系:

  • csb-cloud< 3.x版本,2.0.9 <= event-sdk < 3.x
  • csb-cloud>= 3.x版本,event-sdk >= 3.0.1
    具体版本更新内容,请查看 版本更新

为了避免CSB版本冲突问题,建议exclude相关包

<!-- 消息总线SDK -->
<dependency>
    <groupId>com.cvte.csb</groupId>
    <artifactId>event-sdk</artifactId>
    <version>${event.sdk.version}</version>
    <exclusions>
        <exclusion>
            <groupId>com.cvte.csb</groupId>
            <artifactId>csb-base</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.cvte.csb</groupId>
            <artifactId>csb-core</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.cvte.csb</groupId>
            <artifactId>csb-config</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.cvte.csb</groupId>
            <artifactId>csb-web</artifactId>
        </exclusion>
        <exclusion>
            <groupId>com.cvte.csb</groupId>
            <artifactId>csb-cloud</artifactId>
        </exclusion>
    </exclusions>
</dependency>

Apollo Config

event.center.url = event-dev.gz.cvte.cn //事件中心服务域名
各环境配置如下:
    DEV,TEST: event-dev.gz.cvte.cn
    UAT: event-uat.gz.cvte.cn
    PROD: event.gz.cvte.cn

event.mq.mqCode = biz //事件中心MQ数据源配置编码
目前已配置的有:
商务域:biz
供应链:scm
研发域:cplm
财务域:fin
【如未在以上清单,请联系世荣配置】

event.source.prodLine = SD //产品线标识
event.source.system = OMS //系统标识
event.source.tenantId = 78076a48-bb75-4f55-be7f-d84e5e657bbe //产品线租户ID

application.properties

event.dto.package = xxxxx // 定义发送到Mq的消息体DTO扫描路径(如有多个逗号分隔),如:com.cvte.biz.core

Add GE_ID/BRANCH_ID To OkHttp Header

public class HeaderConfig implements OkHttpTokenInterceptor.IHeaderConfig {

    @Override
    public Map<String, String> getHeaders() {
        HashMap<String, String> header = new HashMap<>();
        header.put(GE_ID, StringUtils.isNotBlank(EventCurrentContext.getGeId()) ? EventCurrentContext.getGeId() : "");
        header.put(BRANCH_ID, StringUtils.isNotBlank(EventCurrentContext.getBranchId()) ? EventCurrentContext.getBranchId() : "");

        return header;
    }
}

注意

如果项目有开启MQ消费retry配置(spring默认关闭),建议关闭掉,event-center本身有消费失败的重试机制,客户端没必要开启。
客户端开启retry,可能会出现多次ack导致channel自动关闭的问题,建议关闭掉:

spring.rabbitmq.template.retry.enabled=false
spring.rabbitmq.listener.simple.retry.enabled=false
spring.rabbitmq.listener.direct.retry.enabled=false

开始使用

Send Message To Event-Center

注意事项

  • 有发送MQ事件的,一定要添加@GlobalEvent注解,建议放在最外层Service
  • @GlobalEvent注解代码里,任何地方抛出了异常(事务回滚),都会自动取消MQ消息的发布
@Autowired
private EventService eventService;

/**
* @GlobalEvent注解包裹下的所有MQ消息,将在请求结束后统一发布到MQ上,类似于seata的@GlobalTransactional
*/
@GlobalEvent
public void mqExample() {
    SoCreatedMsgDTO soCreatedMsgDTO = new SoCreatedMsgDTO();
    SoLineCancelledEventDTO soLineCancelledEventDTO = new SoLineCancelledEventDTO();

    /**
     * 注意:所有的exchange和routingKey的定义,必须使用统一的常量定义,不能写死字符串(如下例子,为了方便大家看命名规则)
     * 命名规则:系统 + 模块 + [子模块]+ 业务动作 + 投递方式
     * 如:oms.so.created.fanout
     * 具体命名可参考:https://kb.cvte.com/pages/viewpage.action?pageId=228600933
     */

    // 发送MQ信息到exchange,不指定routingKey
    eventService.send(OMS_SO_CREATED_FANOUNT_EXCHANGE, null, soCreatedMsgDTO);

    /**
     * 发送MQ信息到exchange,根据不同场景发送不同的routingKey
     */
    // 下采购
    eventService.send(OMS_SO_LINE_STAGE_CHANGE_TOPIC_EXCHANGE, OMS_SO_LINE_PURCHASE_PUSHING_ROUTING_KEY, soLineCancelledEventDTO);
}

=== 其中消息体定义说明如下,需要使用@MQEvent注解说明,用于注册MQ生产端消息
/**
 * category: MQ消息分类,按产品线+系统+模块+[对象]的方式命名
 * exchange: RabbitMq exchange
 * type: RabbitMq exchange type
 * routingKey: RabbitMq routingKey
 * desc: 消息体描述
 * object: 对象名称
 */
@Data
@MQEvent(category = "SD:OMS:SO:LINE", exchange = "oms.so.line.cancelled.fanout", routingKey = "",
        type = ExchangeTypes.FANOUT, desc = "订单行已取消", object = "SoLineCancelledEventDTO")
public class SoLineCancelledEventDTO {
    private String soLineCode;
    ...
}

Consume Message

注意:

  1. 监听方法上的Message message, Channel channel两个参数是必须的且顺序保持一致,不然没法做到自动应答和幂等的处理.

  2. 消费发生异常时需要把异常往外抛出,不能try catch吃掉,不然组件是会认为该消息是消费成功的,不会再重试

    @Component
    @Slf4j
    public class MqConsume {
    
     @RabbitListener(containerFactory = "biz", bindings = @QueueBinding(
             value = @Queue(value = "test.queue.success"),
             exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC),
             key = "testKey"
     ))
     public void consumeSuccess(Message message, Channel channel) throws IOException {
         // convert to Object
         TestMsgDTO testMsgDTO = MqMessageUtils.parseObject(message, TestMsgDTO.class);
    
         // convert to List<T>
    //        List<SoHeaderDTO> soHeaderDTOList = MqMessageUtils.parseList(message, SoHeaderDTO.class);
    
         // do your business
    
         // log something you like
         XLog.info(testMsgDTO.getCode(), "consume success, code:{}, name:{}", testMsgDTO.getCode(), testMsgDTO.getName());
     }
    
     @RabbitListener(containerFactory = "biz", bindings = @QueueBinding(
             value = @Queue(value = "test.queue.failed"),
             exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC),
             key = "testKey"
     ))
     public void consumeError(Message message, Channel channel) {
         if(true) {
             throw new ParamsIncorrectException("参数传错啦,你走吧");
         }
     }
    }

增强组件[可选]

具体的组件引入使用demo可参考demo项目

spring-multirabbit

消费使用原生的@RabbitListener即可,如有需要可使用spring-multirabbit的包,支持多数据源配置 开箱即用,注意:目前只支持springboot2.x的版本

<dependency>
    <groupId>com.cvte.multirabbit</groupId>
    <artifactId>spring-multirabbit</artifactId>
    <version>3.2</version>
</dependency>

多数据源配置请参考:spring-multirabbit配置

Zipkin Trace

微服务盛下,链路追踪必不可少
目前基于zipkin已增强的功能有:

  1. 通用的异常拦截上报,记录堆栈信息
  2. 自定义header属性记录
  3. http请求的parameter和body的记录
  4. 作为消息组件的扩展,已打通生产消费链,可以清晰追踪生产端、消费端的情况

springboot为1.x的使用

<!-- Zipkin链路追踪 -->
<dependency>
    <groupId>com.cvte.csb.trace</groupId>
    <artifactId>csb-zipkin</artifactId>
    <version>2.0.1-SNAPSHOT</version>
</dependency>

springboot为2.x的使用

<!-- Zipkin链路追踪 -->
<dependency>
    <groupId>com.cvte.csb.trace</groupId>
    <artifactId>csb-zipkin</artifactId>
    <version>3.0.1-SNAPSHOT</version>
</dependency>

添加Apollo配置,请参考:https://kb.cvte.com/pages/viewpage.action?pageId=266454435

强大的日志组件

功能太多,晚点补充

<dependency>
    <groupId>com.cvte.biz</groupId>
    <artifactId>business-log</artifactId>
    <version>${business.log.version}</version>
</dependency>

相关文档请参考:日志组件

作者:李世荣  创建时间:2021-11-03 21:53
最后编辑:李世荣  更新时间:2024-04-28 09:24