From e226535ed3bc81089d045013240c3bbd72d1bffe Mon Sep 17 00:00:00 2001 From: Kirk Lin Date: Sun, 11 Jul 2021 21:13:46 +0800 Subject: [PATCH] =?UTF-8?q?RabbitMQ=E5=AD=A6=E4=B9=A0?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- kkmall-order/pom.xml | 6 + .../kkmall/order/config/MyRabbitConfig.java | 97 +++++++++++++ .../order/controller/RabbitController.java | 62 ++++++++ .../service/impl/OrderItemServiceImpl.java | 88 ++++++++++- .../src/main/resources/application.properties | 15 ++ .../order/KkmallOrderApplicationTests.java | 137 ++++++++++++++++++ 6 files changed, 402 insertions(+), 3 deletions(-) create mode 100644 kkmall-order/src/main/java/name/lkk/kkmall/order/config/MyRabbitConfig.java create mode 100644 kkmall-order/src/main/java/name/lkk/kkmall/order/controller/RabbitController.java create mode 100644 kkmall-order/src/main/resources/application.properties diff --git a/kkmall-order/pom.xml b/kkmall-order/pom.xml index afea00e..805a913 100644 --- a/kkmall-order/pom.xml +++ b/kkmall-order/pom.xml @@ -24,6 +24,12 @@ kkmall-common 0.0.1-SNAPSHOT + + + org.springframework.boot + spring-boot-starter-amqp + + org.springframework.boot diff --git a/kkmall-order/src/main/java/name/lkk/kkmall/order/config/MyRabbitConfig.java b/kkmall-order/src/main/java/name/lkk/kkmall/order/config/MyRabbitConfig.java new file mode 100644 index 0000000..cb95ffb --- /dev/null +++ b/kkmall-order/src/main/java/name/lkk/kkmall/order/config/MyRabbitConfig.java @@ -0,0 +1,97 @@ +package name.lkk.kkmall.order.config; + +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; +import org.springframework.amqp.support.converter.MessageConverter; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.context.annotation.Primary; + +/** + * RabbitMQ配置类 + * + * @author kirklin + */ +@Configuration +public class MyRabbitConfig { + + private RabbitTemplate rabbitTemplate; + + /** + * 当一个接口有2个不同实现时,使用@Autowired注解时会报org.springframework.beans.factory.NoUniqueBeanDefinitionException异常信息 + * (1)使用Qualifier注解,选择一个对象的名称,通常比较常用 + *

+ * (2)Primary可以理解为默认优先选择,不可以同时设置多个,内部实质是设置BeanDefinition的primary属性 + * + * @param connectionFactory + * @return + */ + @Primary + @Bean + public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { + RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); + this.rabbitTemplate = rabbitTemplate; + rabbitTemplate.setMessageConverter(messageConverter()); + initRabbitTemplate(); + return rabbitTemplate; + } + + /** + * 配置消息序列化方式 + * + * @return + */ + @Bean + public MessageConverter messageConverter() { + return new Jackson2JsonMessageConverter(); + } + + /** + * 定制RabbitTemplate + * 1、服务收到消息就会回调 + * 1、spring.rabbitmq.publisher-confirms: true + * 2、设置确认回调 + * 2、消息正确抵达队列就会进行回调 + * 1、spring.rabbitmq.publisher-returns: true + * spring.rabbitmq.template.mandatory: true + * 2、设置确认回调ReturnCallback + *

+ * 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息) + */ + // @PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法 + public void initRabbitTemplate() { + + /** + * 1、只要消息抵达Broker就ack=true + * correlationData:当前消息的唯一关联数据(这个是消息的唯一id) + * ack:消息是否成功收到 + * cause:失败的原因 + */ + //设置确认回调 + rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { + System.out.println("confirm...correlationData[" + correlationData + "]==>ack:[" + ack + "]==>cause:[" + cause + "]"); + }); + + + /** + * 只要消息没有投递给指定的队列,就触发这个失败回调 + * message:投递失败的消息详细信息 + * replyCode:回复的状态码 + * replyText:回复的文本内容 + * exchange:当时这个消息发给哪个交换机 + * routingKey:当时这个消息用哪个路由键 + */ +// rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> { +// System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" + +// "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]"); +// }); + + + //新版本需要开启强制委托模式 + rabbitTemplate.setMandatory(true); + rabbitTemplate.setReturnsCallback(returned -> System.out.println("消息e-> q失败,详细信息:" + returned)); + + } + +} diff --git a/kkmall-order/src/main/java/name/lkk/kkmall/order/controller/RabbitController.java b/kkmall-order/src/main/java/name/lkk/kkmall/order/controller/RabbitController.java new file mode 100644 index 0000000..f41d4d6 --- /dev/null +++ b/kkmall-order/src/main/java/name/lkk/kkmall/order/controller/RabbitController.java @@ -0,0 +1,62 @@ +package name.lkk.kkmall.order.controller; + + +import name.lkk.kkmall.order.entity.OrderEntity; +import name.lkk.kkmall.order.entity.OrderItemEntity; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestParam; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Date; +import java.util.UUID; + +/** + * 测试rabbitMQ + */ +@RestController +public class RabbitController { + + @Autowired + private RabbitTemplate rabbitTemplate; + + + private String exchange = "hello-java-exchange"; + + + private String routeKey = "hello.java"; + + + @GetMapping("/sendMQ") + public String sendMQ(@RequestParam(value = "num", required = false, defaultValue = "10") Integer num) { + + OrderEntity entity = new OrderEntity(); + entity.setId(1L); + entity.setCommentTime(new Date()); + entity.setCreateTime(new Date()); + entity.setConfirmStatus(0); + entity.setAutoConfirmDay(1); + entity.setGrowth(1); + entity.setMemberId(12L); + + OrderItemEntity orderEntity = new OrderItemEntity(); + orderEntity.setCategoryId(225L); + orderEntity.setId(1L); + orderEntity.setOrderSn("mall"); + orderEntity.setSpuName("华为"); + for (int i = 0; i < num; i++) { + if (i % 2 == 0) { + entity.setReceiverName("OrderEntity ==> " + i); + rabbitTemplate.convertAndSend(this.exchange, this.routeKey, entity, new CorrelationData(UUID.randomUUID().toString().replace("-", ""))); + } else { + orderEntity.setOrderSn("OrderItemEntity ==> " + i); + rabbitTemplate.convertAndSend(this.exchange, this.routeKey, orderEntity, new CorrelationData(UUID.randomUUID().toString().replace("-", ""))); + // 测试消息发送失败 + //rabbitTemplate.convertAndSend(this.exchange, "fail", orderEntity); + } + } + return "ok"; + } +} diff --git a/kkmall-order/src/main/java/name/lkk/kkmall/order/service/impl/OrderItemServiceImpl.java b/kkmall-order/src/main/java/name/lkk/kkmall/order/service/impl/OrderItemServiceImpl.java index dbbc2a6..493ac02 100644 --- a/kkmall-order/src/main/java/name/lkk/kkmall/order/service/impl/OrderItemServiceImpl.java +++ b/kkmall-order/src/main/java/name/lkk/kkmall/order/service/impl/OrderItemServiceImpl.java @@ -1,18 +1,26 @@ package name.lkk.kkmall.order.service.impl; -import org.springframework.stereotype.Service; -import java.util.Map; import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl; +import com.rabbitmq.client.Channel; import name.lkk.common.utils.PageUtils; import name.lkk.common.utils.Query; - import name.lkk.kkmall.order.dao.OrderItemDao; +import name.lkk.kkmall.order.entity.OrderEntity; import name.lkk.kkmall.order.entity.OrderItemEntity; +import name.lkk.kkmall.order.entity.OrderReturnReasonEntity; import name.lkk.kkmall.order.service.OrderItemService; +import org.springframework.amqp.core.Message; +import org.springframework.amqp.core.MessageProperties; +import org.springframework.amqp.rabbit.annotation.RabbitHandler; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Service; +import java.io.IOException; +import java.util.Map; +@RabbitListener(queues = {"hello-java-queue"}) @Service("orderItemService") public class OrderItemServiceImpl extends ServiceImpl implements OrderItemService { @@ -26,4 +34,78 @@ public class OrderItemServiceImpl extends ServiceImpl OrderEntity orderEntity [Spring自动帮我们转换] + * 3.Channel channel: 当前传输数据的通道 + *

+ * // 同一个消息只能被一个人收到 + * + * @RabbitListener: 只能标注在类、方法上 配合 @RabbitHandler + * @RabbitHandler: 只能标注在方法上 [重载区分不同的消息] + */ + + @RabbitHandler + public void receiveMessageA(Message message, OrderEntity orderEntity, Channel channel) { + System.out.println("接受到消息: " + message + "\n内容:" + orderEntity); + try { + Thread.sleep(200); + } catch (InterruptedException e) { + } + // 这个是一个数字 通道内自增 + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + // 只签收当前货物 不批量签收 + channel.basicAck(deliveryTag, false); + + // deliveryTag: 货物的标签 multiple: 是否批量拒收 requeue: 是否重新入队 +// channel.basicNack(deliveryTag, false,true); +// 批量拒绝 +// channel.basicReject(); + } catch (IOException e) { + System.out.println("receiveMessageA网络中断"); + } + System.out.println(orderEntity.getReceiverName() + " 消息处理完成"); + } + + @RabbitHandler + public void receiveMessageB(Message message, OrderItemEntity orderEntity, Channel channel) { + System.out.println("接受到消息: " + message + "\n内容:" + orderEntity); + try { + Thread.sleep(2000); + } catch (InterruptedException e) { + } + long deliveryTag = message.getMessageProperties().getDeliveryTag(); + try { + channel.basicAck(deliveryTag, false); + } catch (IOException e) { + System.out.println("receiveMessageB网络中断"); + } + System.out.println(orderEntity.getOrderSn() + " 消息处理完成"); + } + } \ No newline at end of file diff --git a/kkmall-order/src/main/resources/application.properties b/kkmall-order/src/main/resources/application.properties new file mode 100644 index 0000000..4556951 --- /dev/null +++ b/kkmall-order/src/main/resources/application.properties @@ -0,0 +1,15 @@ +# RabbitMQ配置 +spring.rabbitmq.host=localhost +spring.rabbitmq.port=5672 +# 虚拟主机配置 +spring.rabbitmq.virtual-host=/ +# 开启发送端消息抵达Broker确认 +#该配置已过时 +#spring.rabbitmq.publisher-confirms=true +spring.rabbitmq.publisher-confirm-type=CORRELATED +# 开启发送端消息抵达Queue确认 +spring.rabbitmq.publisher-returns=true +# 只要消息抵达Queue,就会异步发送优先回调returnfirm +spring.rabbitmq.template.mandatory=true +# 手动ack消息,不使用默认的消费端确认 +spring.rabbitmq.listener.simple.acknowledge-mode=manual diff --git a/kkmall-order/src/test/java/name/lkk/kkmall/order/KkmallOrderApplicationTests.java b/kkmall-order/src/test/java/name/lkk/kkmall/order/KkmallOrderApplicationTests.java index d7c2001..b754961 100644 --- a/kkmall-order/src/test/java/name/lkk/kkmall/order/KkmallOrderApplicationTests.java +++ b/kkmall-order/src/test/java/name/lkk/kkmall/order/KkmallOrderApplicationTests.java @@ -1,11 +1,148 @@ package name.lkk.kkmall.order; +import lombok.extern.slf4j.Slf4j; +import name.lkk.kkmall.order.entity.OrderEntity; +import name.lkk.kkmall.order.entity.OrderItemEntity; +import name.lkk.kkmall.order.entity.OrderReturnReasonEntity; +import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.springframework.amqp.core.*; +import org.springframework.amqp.rabbit.connection.CorrelationData; +import org.springframework.amqp.rabbit.core.RabbitTemplate; +import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; +import java.util.Date; +import java.util.HashMap; +import java.util.UUID; + +@Slf4j @SpringBootTest class KkmallOrderApplicationTests { + @Autowired + private AmqpAdmin amqpAdmin; + + @Autowired + private RabbitTemplate rabbitTemplate; + + + private String queue = "hello-java-queue"; + + + private String exchange = "hello-java-exchange"; + + + private String routeKey = "hello.java"; + + @Test + public void sendMessageTest2() { + OrderEntity entity = new OrderEntity(); + entity.setId(1L); + entity.setCommentTime(new Date()); + entity.setCreateTime(new Date()); + entity.setConfirmStatus(0); + entity.setAutoConfirmDay(1); + entity.setGrowth(1); + entity.setMemberId(12L); + + OrderItemEntity orderEntity = new OrderItemEntity(); + orderEntity.setCategoryId(225L); + orderEntity.setId(1L); + orderEntity.setOrderSn("mall"); + orderEntity.setSpuName("华为"); + for (int i = 0; i < 10; i++) { + if (i % 2 == 0) { + entity.setReceiverName("OrderEntity-" + i); + rabbitTemplate.convertAndSend(this.exchange, this.routeKey, entity, new CorrelationData(UUID.randomUUID().toString().replace("-", ""))); + } else { + orderEntity.setOrderSn("OrderItemEntity-" + i); + rabbitTemplate.convertAndSend(this.exchange, this.routeKey, orderEntity, new CorrelationData(UUID.randomUUID().toString().replace("-", ""))); + } + log.info("\n路由键:" + this.routeKey + "的消息发送成功"); + } + } + + @Test + public void sendMessageTest() { + OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity(); + reasonEntity.setId(1L); + reasonEntity.setCreateTime(new Date()); + reasonEntity.setName("reason"); + reasonEntity.setStatus(1); + reasonEntity.setSort(2); + String msg = "Hello World"; + //1、发送消息,如果发送的消息是个对象,会使用序列化机制,将对象写出去,对象必须实现Serializable接口 + + //2、发送的对象类型的消息,可以是一个json +// rabbitTemplate.convertAndSend("hello-java-exchange","hello.java", +// reasonEntity,new CorrelationData(UUID.randomUUID().toString())); + for (int i = 0; i < 10; i++) { + if (i % 2 == 1) { + rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", + reasonEntity, new CorrelationData(UUID.randomUUID().toString())); + } else { + rabbitTemplate.convertAndSend("hello-java-exchange", "hello2.java", + reasonEntity, new CorrelationData(UUID.randomUUID().toString())); + } + } + log.info("消息发送完成:{}", reasonEntity); + } + + /** + * 1、如何创建Exchange、Queue、Binding + * 1)、使用AmqpAdmin进行创建 + * 2、如何收发消息 + */ + @Test + @DisplayName("创建一个交换器 ") + public void createExchange() { + + Exchange directExchange = new DirectExchange("hello-java-exchange", true, false); + amqpAdmin.declareExchange(directExchange); + log.info("Exchange[{}]创建成功:", "hello-java-exchange"); + } + + + @Test + @DisplayName("创建一个队列") + public void testCreateQueue() { + Queue queue = new Queue("hello-java-queue", true, false, false); + amqpAdmin.declareQueue(queue); + log.info("Queue[{}]创建成功:", "hello-java-queue"); + } + + /** + * 目的地 目的地类型 交换机 路由键 + * String destination, DestinationType destinationType, String exchange, String routingKey, + * + * @Nullable Map arguments + */ + @Test + @DisplayName("创建一个绑定") + public void createBinding() { + + Binding binding = new Binding("hello-java-queue", + Binding.DestinationType.QUEUE, + "hello-java-exchange", + "hello.java", + null); + amqpAdmin.declareBinding(binding); + log.info("Binding[{}]创建成功:", "hello-java-binding"); + + } + + @Test + public void create() { + HashMap arguments = new HashMap<>(); + arguments.put("x-dead-letter-exchange", "order-event-exchange"); + arguments.put("x-dead-letter-routing-key", "order.release.order"); + arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟 + Queue queue = new Queue("order.delay.queue", true, false, false, arguments); + amqpAdmin.declareQueue(queue); + log.info("Queue[{}]创建成功:", "order.delay.queue"); + } + @Test void contextLoads() { }