项目代码

https://gitlab.gz.cvte.cn/LTC/mq-demo

启动工程

  1. 本地apollo环境/opt/settings/server.properties配置为 env=dev
  2. Idea配置环境变量 spring.rabbitmq.listener.auto-start-up=true
  3. 启动项目
    启动时会自动注册生产者、消费者信息

测试消息

代码说明

  • 发送端
    @Autowired
    private EventService eventService;

    @GlobalEvent
    public void send(){
        TestMsgDTO testMsgDTO = new TestMsgDTO();
        testMsgDTO.setCode("so-header-code-001");
        testMsgDTO.setName("测试消息");

        eventService.send("test.exchange.topic", "testKey", testMsgDTO);

//        eventService.batchSend();

        log.info("msg[{}] report to event-center", testMsgDTO);
    }
  • 消费端
    为测试方便,同一个exchange定义不同的队列监听,模拟成功和失败的场景

      @RabbitListener(containerFactory = "biz", bindings = @QueueBinding(
              value = @Queue(value = "test.queue.success"),
              exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC),
              key = "testKey"
      ))
      public void consumeSuccess(Message message, Channel channel) throws IOException {
          // convert to Object
          TestMsgDTO testMsgDTO = MqMessageUtils.parseObject(message, TestMsgDTO.class);
    
          // convert to List<T>
    //        List<SoHeaderDTO> soHeaderDTOList = MqMessageUtils.parseList(message, SoHeaderDTO.class);
    
          // do your business
    
          // log something you like
          XLog.info(testMsgDTO.getCode(), "consume success, code:{}, name:{}", testMsgDTO.getCode(), testMsgDTO.getName());
      }
    
      @RabbitListener(containerFactory = "biz", bindings = @QueueBinding(
              value = @Queue(value = "test.queue.failed"),
              exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC),
              key = "testKey"
      ))
      public void consumeError(Message message, Channel channel) {
          log.info(new String(message.getBody()));
          if(true) {
              throw new ParamsIncorrectException("参数传错啦,你走吧");
          }
      }

发送、消费消息

浏览器打开:
http://localhost:65002/api/v1/mq/send
控制台可以看到对应日志

复制链路ID,打开链路追踪,可以看到整个发送消费链的信息
https://tracetest.gz.cvte.cn/zipkin/traces/7f36d57511a07006

作者:李世荣  创建时间:2021-11-03 21:55
最后编辑:李世荣  更新时间:2024-04-28 09:23