项目代码
https://gitlab.gz.cvte.cn/LTC/mq-demo
启动工程
- 本地apollo环境/opt/settings/server.properties配置为
env=dev
- Idea配置环境变量
spring.rabbitmq.listener.auto-start-up=true
- 启动项目
启动时会自动注册生产者、消费者信息
测试消息
代码说明
- 发送端
@Autowired
private EventService eventService;
@GlobalEvent
public void send(){
TestMsgDTO testMsgDTO = new TestMsgDTO();
testMsgDTO.setCode("so-header-code-001");
testMsgDTO.setName("测试消息");
eventService.send("test.exchange.topic", "testKey", testMsgDTO);
// eventService.batchSend();
log.info("msg[{}] report to event-center", testMsgDTO);
}
消费端
为测试方便,同一个exchange定义不同的队列监听,模拟成功和失败的场景@RabbitListener(containerFactory = "biz", bindings = @QueueBinding( value = @Queue(value = "test.queue.success"), exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC), key = "testKey" )) public void consumeSuccess(Message message, Channel channel) throws IOException { // convert to Object TestMsgDTO testMsgDTO = MqMessageUtils.parseObject(message, TestMsgDTO.class); // convert to List<T> // List<SoHeaderDTO> soHeaderDTOList = MqMessageUtils.parseList(message, SoHeaderDTO.class); // do your business // log something you like XLog.info(testMsgDTO.getCode(), "consume success, code:{}, name:{}", testMsgDTO.getCode(), testMsgDTO.getName()); } @RabbitListener(containerFactory = "biz", bindings = @QueueBinding( value = @Queue(value = "test.queue.failed"), exchange = @Exchange(value = "test.exchange.topic", type = ExchangeTypes.TOPIC), key = "testKey" )) public void consumeError(Message message, Channel channel) { log.info(new String(message.getBody())); if(true) { throw new ParamsIncorrectException("参数传错啦,你走吧"); } }
发送、消费消息
浏览器打开:
http://localhost:65002/api/v1/mq/send
控制台可以看到对应日志
复制链路ID,打开链路追踪,可以看到整个发送消费链的信息
https://tracetest.gz.cvte.cn/zipkin/traces/7f36d57511a07006
作者:李世荣 创建时间:2021-11-03 21:55
最后编辑:李世荣 更新时间:2024-04-28 09:23
最后编辑:李世荣 更新时间:2024-04-28 09:23