RabbitMQ学习

This commit is contained in:
Kirk Lin 2021-07-11 21:13:46 +08:00
parent 8985215608
commit e226535ed3
6 changed files with 402 additions and 3 deletions

View file

@ -24,6 +24,12 @@
<artifactId>kkmall-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--引入消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- 整合Spring-seesion -->
<dependency>
<groupId>org.springframework.boot</groupId>

View file

@ -0,0 +1,97 @@
package name.lkk.kkmall.order.config;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
/**
* RabbitMQ配置类
*
* @author kirklin
*/
@Configuration
public class MyRabbitConfig {
private RabbitTemplate rabbitTemplate;
/**
* 当一个接口有2个不同实现时,使用@Autowired注解时会报org.springframework.beans.factory.NoUniqueBeanDefinitionException异常信息
* 1使用Qualifier注解选择一个对象的名称,通常比较常用
* <p>
* 2Primary可以理解为默认优先选择,不可以同时设置多个,内部实质是设置BeanDefinition的primary属性
*
* @param connectionFactory
* @return
*/
@Primary
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
this.rabbitTemplate = rabbitTemplate;
rabbitTemplate.setMessageConverter(messageConverter());
initRabbitTemplate();
return rabbitTemplate;
}
/**
* 配置消息序列化方式
*
* @return
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* 定制RabbitTemplate
* 1服务收到消息就会回调
* 1spring.rabbitmq.publisher-confirms: true
* 2设置确认回调
* 2消息正确抵达队列就会进行回调
* 1spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2设置确认回调ReturnCallback
* <p>
* 3消费端确认(保证每个消息都被正确消费此时才可以broker删除这个消息)
*/
// @PostConstruct //MyRabbitConfig对象创建完成以后执行这个方法
public void initRabbitTemplate() {
/**
* 1只要消息抵达Broker就ack=true
* correlationData当前消息的唯一关联数据(这个是消息的唯一id)
* ack消息是否成功收到
* cause失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
System.out.println("confirm...correlationData[" + correlationData + "]==>ack:[" + ack + "]==>cause:[" + cause + "]");
});
/**
* 只要消息没有投递给指定的队列就触发这个失败回调
* message投递失败的消息详细信息
* replyCode回复的状态码
* replyText回复的文本内容
* exchange当时这个消息发给哪个交换机
* routingKey当时这个消息用哪个路由键
*/
// rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
// System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
// "==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
// });
//新版本需要开启强制委托模式
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(returned -> System.out.println("消息e-> q失败,详细信息:" + returned));
}
}

View file

@ -0,0 +1,62 @@
package name.lkk.kkmall.order.controller;
import name.lkk.kkmall.order.entity.OrderEntity;
import name.lkk.kkmall.order.entity.OrderItemEntity;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Date;
import java.util.UUID;
/**
* 测试rabbitMQ
*/
@RestController
public class RabbitController {
@Autowired
private RabbitTemplate rabbitTemplate;
private String exchange = "hello-java-exchange";
private String routeKey = "hello.java";
@GetMapping("/sendMQ")
public String sendMQ(@RequestParam(value = "num", required = false, defaultValue = "10") Integer num) {
OrderEntity entity = new OrderEntity();
entity.setId(1L);
entity.setCommentTime(new Date());
entity.setCreateTime(new Date());
entity.setConfirmStatus(0);
entity.setAutoConfirmDay(1);
entity.setGrowth(1);
entity.setMemberId(12L);
OrderItemEntity orderEntity = new OrderItemEntity();
orderEntity.setCategoryId(225L);
orderEntity.setId(1L);
orderEntity.setOrderSn("mall");
orderEntity.setSpuName("华为");
for (int i = 0; i < num; i++) {
if (i % 2 == 0) {
entity.setReceiverName("OrderEntity ==> " + i);
rabbitTemplate.convertAndSend(this.exchange, this.routeKey, entity, new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
} else {
orderEntity.setOrderSn("OrderItemEntity ==> " + i);
rabbitTemplate.convertAndSend(this.exchange, this.routeKey, orderEntity, new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
// 测试消息发送失败
//rabbitTemplate.convertAndSend(this.exchange, "fail", orderEntity);
}
}
return "ok";
}
}

View file

@ -1,18 +1,26 @@
package name.lkk.kkmall.order.service.impl;
import org.springframework.stereotype.Service;
import java.util.Map;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.rabbitmq.client.Channel;
import name.lkk.common.utils.PageUtils;
import name.lkk.common.utils.Query;
import name.lkk.kkmall.order.dao.OrderItemDao;
import name.lkk.kkmall.order.entity.OrderEntity;
import name.lkk.kkmall.order.entity.OrderItemEntity;
import name.lkk.kkmall.order.entity.OrderReturnReasonEntity;
import name.lkk.kkmall.order.service.OrderItemService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
import java.util.Map;
@RabbitListener(queues = {"hello-java-queue"})
@Service("orderItemService")
public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEntity> implements OrderItemService {
@ -26,4 +34,78 @@ public class OrderItemServiceImpl extends ServiceImpl<OrderItemDao, OrderItemEnt
return new PageUtils(page);
}
// /**
// * queues声明需要监听的队列
// * channel当前传输数据的通道
// */
@RabbitHandler
public void revieveMessage(Message message,
OrderReturnReasonEntity content
, Channel channel) {
//拿到主体内容
byte[] body = message.getBody();
//拿到的消息头属性信息
MessageProperties messageProperties = message.getMessageProperties();
System.out.println("接受到的消息...内容" + message + "===内容:" + content);
long deliveryTag = message.getMessageProperties().getDeliveryTag();
// 只签收当前货物 不批量签收
try {
channel.basicAck(deliveryTag, false);
System.out.println("revieveMessage==》已签收消息" + deliveryTag);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 1.Message message: 原生消息类型 详细信息
* 2.T<发送消息的类型> OrderEntity orderEntity [Spring自动帮我们转换]
* 3.Channel channel: 当前传输数据的通道
* <p>
* // 同一个消息只能被一个人收到
*
* @RabbitListener 只能标注在类方法上 配合 @RabbitHandler
* @RabbitHandler: 只能标注在方法上 [重载区分不同的消息]
*/
@RabbitHandler
public void receiveMessageA(Message message, OrderEntity orderEntity, Channel channel) {
System.out.println("接受到消息: " + message + "\n内容" + orderEntity);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
}
// 这个是一个数字 通道内自增
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
// 只签收当前货物 不批量签收
channel.basicAck(deliveryTag, false);
// deliveryTag: 货物的标签 multiple: 是否批量拒收 requeue: 是否重新入队
// channel.basicNack(deliveryTag, false,true);
// 批量拒绝
// channel.basicReject();
} catch (IOException e) {
System.out.println("receiveMessageA网络中断");
}
System.out.println(orderEntity.getReceiverName() + " 消息处理完成");
}
@RabbitHandler
public void receiveMessageB(Message message, OrderItemEntity orderEntity, Channel channel) {
System.out.println("接受到消息: " + message + "\n内容" + orderEntity);
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
}
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
channel.basicAck(deliveryTag, false);
} catch (IOException e) {
System.out.println("receiveMessageB网络中断");
}
System.out.println(orderEntity.getOrderSn() + " 消息处理完成");
}
}

View file

@ -0,0 +1,15 @@
# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
#该配置已过时
#spring.rabbitmq.publisher-confirms=true
spring.rabbitmq.publisher-confirm-type=CORRELATED
# 开启发送端消息抵达Queue确认
spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual

View file

@ -1,11 +1,148 @@
package name.lkk.kkmall.order;
import lombok.extern.slf4j.Slf4j;
import name.lkk.kkmall.order.entity.OrderEntity;
import name.lkk.kkmall.order.entity.OrderItemEntity;
import name.lkk.kkmall.order.entity.OrderReturnReasonEntity;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import java.util.Date;
import java.util.HashMap;
import java.util.UUID;
@Slf4j
@SpringBootTest
class KkmallOrderApplicationTests {
@Autowired
private AmqpAdmin amqpAdmin;
@Autowired
private RabbitTemplate rabbitTemplate;
private String queue = "hello-java-queue";
private String exchange = "hello-java-exchange";
private String routeKey = "hello.java";
@Test
public void sendMessageTest2() {
OrderEntity entity = new OrderEntity();
entity.setId(1L);
entity.setCommentTime(new Date());
entity.setCreateTime(new Date());
entity.setConfirmStatus(0);
entity.setAutoConfirmDay(1);
entity.setGrowth(1);
entity.setMemberId(12L);
OrderItemEntity orderEntity = new OrderItemEntity();
orderEntity.setCategoryId(225L);
orderEntity.setId(1L);
orderEntity.setOrderSn("mall");
orderEntity.setSpuName("华为");
for (int i = 0; i < 10; i++) {
if (i % 2 == 0) {
entity.setReceiverName("OrderEntity-" + i);
rabbitTemplate.convertAndSend(this.exchange, this.routeKey, entity, new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
} else {
orderEntity.setOrderSn("OrderItemEntity-" + i);
rabbitTemplate.convertAndSend(this.exchange, this.routeKey, orderEntity, new CorrelationData(UUID.randomUUID().toString().replace("-", "")));
}
log.info("\n路由键" + this.routeKey + "的消息发送成功");
}
}
@Test
public void sendMessageTest() {
OrderReturnReasonEntity reasonEntity = new OrderReturnReasonEntity();
reasonEntity.setId(1L);
reasonEntity.setCreateTime(new Date());
reasonEntity.setName("reason");
reasonEntity.setStatus(1);
reasonEntity.setSort(2);
String msg = "Hello World";
//1发送消息,如果发送的消息是个对象会使用序列化机制将对象写出去对象必须实现Serializable接口
//2发送的对象类型的消息可以是一个json
// rabbitTemplate.convertAndSend("hello-java-exchange","hello.java",
// reasonEntity,new CorrelationData(UUID.randomUUID().toString()));
for (int i = 0; i < 10; i++) {
if (i % 2 == 1) {
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java",
reasonEntity, new CorrelationData(UUID.randomUUID().toString()));
} else {
rabbitTemplate.convertAndSend("hello-java-exchange", "hello2.java",
reasonEntity, new CorrelationData(UUID.randomUUID().toString()));
}
}
log.info("消息发送完成:{}", reasonEntity);
}
/**
* 1如何创建ExchangeQueueBinding
* 1使用AmqpAdmin进行创建
* 2如何收发消息
*/
@Test
@DisplayName("创建一个交换器 ")
public void createExchange() {
Exchange directExchange = new DirectExchange("hello-java-exchange", true, false);
amqpAdmin.declareExchange(directExchange);
log.info("Exchange[{}]创建成功:", "hello-java-exchange");
}
@Test
@DisplayName("创建一个队列")
public void testCreateQueue() {
Queue queue = new Queue("hello-java-queue", true, false, false);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:", "hello-java-queue");
}
/**
* 目的地 目的地类型 交换机 路由键
* String destination, DestinationType destinationType, String exchange, String routingKey,
*
* @Nullable Map<String, Object> arguments
*/
@Test
@DisplayName("创建一个绑定")
public void createBinding() {
Binding binding = new Binding("hello-java-queue",
Binding.DestinationType.QUEUE,
"hello-java-exchange",
"hello.java",
null);
amqpAdmin.declareBinding(binding);
log.info("Binding[{}]创建成功:", "hello-java-binding");
}
@Test
public void create() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "order-event-exchange");
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
Queue queue = new Queue("order.delay.queue", true, false, false, arguments);
amqpAdmin.declareQueue(queue);
log.info("Queue[{}]创建成功:", "order.delay.queue");
}
@Test
void contextLoads() {
}