商城业务 --> 订单提交-->柔性事物【可靠消息+最终一致性方案】

This commit is contained in:
Kirk Lin 2021-07-14 17:05:21 +08:00
parent c2a1a8d604
commit 3ac43525c9
23 changed files with 812 additions and 35 deletions

View file

@ -0,0 +1,177 @@
package name.lkk.common.to.mq;
import lombok.Data;
import java.math.BigDecimal;
import java.util.Date;
@Data
public class OrderTo {
private Long id;
/**
* member_id
*/
private Long memberId;
/**
* 订单号
*/
private String orderSn;
/**
* 使用的优惠券
*/
private Long couponId;
/**
* create_time
*/
private Date createTime;
/**
* 用户名
*/
private String memberUsername;
/**
* 订单总额
*/
private BigDecimal totalAmount;
/**
* 应付总额
*/
private BigDecimal payAmount;
/**
* 运费金额
*/
private BigDecimal freightAmount;
/**
* 促销优化金额促销价满减阶梯价
*/
private BigDecimal promotionAmount;
/**
* 积分抵扣金额
*/
private BigDecimal integrationAmount;
/**
* 优惠券抵扣金额
*/
private BigDecimal couponAmount;
/**
* 后台调整订单使用的折扣金额
*/
private BigDecimal discountAmount;
/**
* 支付方式1->支付宝2->微信3->银联 4->货到付款
*/
private Integer payType;
/**
* 订单来源[0->PC订单1->app订单]
*/
private Integer sourceType;
/**
* 订单状态0->待付款1->待发货2->已发货3->已完成4->已关闭5->无效订单
*/
private Integer status;
/**
* 物流公司(配送方式)
*/
private String deliveryCompany;
/**
* 物流单号
*/
private String deliverySn;
/**
* 自动确认时间
*/
private Integer autoConfirmDay;
/**
* 可以获得的积分
*/
private Integer integration;
/**
* 可以获得的成长值
*/
private Integer growth;
/**
* 发票类型[0->不开发票1->电子发票2->纸质发票]
*/
private Integer billType;
/**
* 发票抬头
*/
private String billHeader;
/**
* 发票内容
*/
private String billContent;
/**
* 收票人电话
*/
private String billReceiverPhone;
/**
* 收票人邮箱
*/
private String billReceiverEmail;
/**
* 收货人姓名
*/
private String receiverName;
/**
* 收货人电话
*/
private String receiverPhone;
/**
* 收货人邮编
*/
private String receiverPostCode;
/**
* 省份/直辖市
*/
private String receiverProvince;
/**
* 城市
*/
private String receiverCity;
/**
*
*/
private String receiverRegion;
/**
* 详细地址
*/
private String receiverDetailAddress;
/**
* 订单备注
*/
private String note;
/**
* 确认收货状态[0->未确认1->已确认]
*/
private Integer confirmStatus;
/**
* 删除状态0->未删除1->已删除
*/
private Integer deleteStatus;
/**
* 下单时使用的积分
*/
private Integer useIntegration;
/**
* 支付时间
*/
private Date paymentTime;
/**
* 发货时间
*/
private Date deliveryTime;
/**
* 确认收货时间
*/
private Date receiveTime;
/**
* 评价时间
*/
private Date commentTime;
/**
* 修改时间
*/
private Date modifyTime;
}

View file

@ -0,0 +1,33 @@
package name.lkk.common.to.mq;
import lombok.Data;
@Data
public class StockDetailTo {
private Long id;
/**
* sku_id
*/
private Long skuId;
/**
* sku_name
*/
private String skuName;
/**
* 购买个数
*/
private Integer skuNum;
/**
* 工作单id
*/
private Long taskId;
/**
* 仓库id
*/
private Long wareId;
/**
* 1-已锁定 2-已解锁 3-扣减
*/
private Integer lockStatus;
}

View file

@ -0,0 +1,18 @@
package name.lkk.common.to.mq;
import lombok.Data;
@Data
public class StockLockedTo {
/**
* 库存工作单id
*/
private Long id;
/**
* 工作详情id
*/
private StockDetailTo detailTo;
}

View file

@ -1,11 +1,13 @@
package name.lkk.kkmall.order;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableRabbit
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients("name.lkk.kkmall.order.feign")

View file

@ -0,0 +1,112 @@
package name.lkk.kkmall.order.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* Tips:容器中的所有bean都会自动创建到RabbitMQ中 [只在RabbitMQ没有这个队列交换机绑定才会自动创建反之亦然]
* 配置订单服务的延迟队列可靠消息+死信路由
*/
@Configuration
public class KkMallMQConfig {
@Value("${kkMallRabbitmq.MQConfig.queues}")
private String queues;
@Value("${kkMallRabbitmq.MQConfig.eventExchange}")
private String eventExchange;
@Value("${kkMallRabbitmq.MQConfig.routingKey}")
private String routingKey;
@Value("${kkMallRabbitmq.MQConfig.delayQueue}")
private String delayQueue;
@Value("${kkMallRabbitmq.MQConfig.createOrder}")
private String createOrder;
@Value("${kkMallRabbitmq.MQConfig.ReleaseOther}")
private String ReleaseOther;
@Value("${kkMallRabbitmq.MQConfig.ReleaseOtherKey}")
private String ReleaseOtherKey;
@Value("${kkMallRabbitmq.MQConfig.ttl}")
private Long ttl;
/**
* String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
*/
@Bean
public Queue orderDelayQueue() {
Map<String, Object> arguments = new HashMap<>(3);
arguments.put("x-dead-letter-exchange", eventExchange);
arguments.put("x-dead-letter-routing-key", routingKey);
arguments.put("x-message-ttl", ttl);
Queue queue = new Queue(delayQueue, true, false, false, arguments);
return queue;
}
@Bean
public Queue orderReleaseOrderQueue() {
Queue queue = new Queue(queues, true, false, false);
return queue;
}
/**
* String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
*
* @return
*/
@Bean
public Exchange orderEventExchange() {
return new TopicExchange(eventExchange, true, false);
}
/**
* String destination, DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
*/
@Bean
public Binding orderCreateOrderBinding() {
return new Binding(delayQueue, Binding.DestinationType.QUEUE, eventExchange, createOrder, null);
}
/**
* Tips得先保证Ware服务中的队列配置创建成功得先保证Ware服务中的队列配置创建成功否则会报错
*/
@Bean
public Binding orderReleaseOrderBinding() {
return new Binding(queues, Binding.DestinationType.QUEUE, eventExchange, routingKey, null);
}
/**
* Tips得先保证Ware服务中的队列配置创建成功得先保证Ware服务中的队列配置创建成功否则会报错
* 订单释放直接和库存释放进行绑定
*/
@Bean
public Binding orderReleaseOtherBinding() {
return new Binding(ReleaseOther, Binding.DestinationType.QUEUE, eventExchange, ReleaseOtherKey + ".#", null);
}
@Bean
public Queue orderSecKillQueue() {
return new Queue("order.seckill.order.queue", true, false, false);
}
@Bean
public Binding orderSecKillQueueBinding() {
return new Binding("order.seckill.order.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.seckill.order", null);
}
}

View file

@ -1,20 +1,15 @@
package name.lkk.kkmall.order.controller;
import name.lkk.common.utils.PageUtils;
import name.lkk.common.utils.R;
import name.lkk.kkmall.order.entity.OrderEntity;
import name.lkk.kkmall.order.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.*;
import java.util.Arrays;
import java.util.Map;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import name.lkk.kkmall.order.entity.OrderEntity;
import name.lkk.kkmall.order.service.OrderService;
import name.lkk.common.utils.PageUtils;
import name.lkk.common.utils.R;
/**
@ -30,12 +25,19 @@ public class OrderController {
@Autowired
private OrderService orderService;
@GetMapping("/status/{orderSn}")
public R getOrderStatus(@PathVariable("orderSn") String orderSn) {
OrderEntity orderEntity = orderService.getOrderByOrderSn(orderSn);
return R.ok().setData(orderEntity);
}
/**
* 列表
*/
@RequestMapping("/list")
//@RequiresPermissions("order:order:list")
public R list(@RequestParam Map<String, Object> params){
public R list(@RequestParam Map<String, Object> params) {
PageUtils page = orderService.queryPage(params);
return R.ok().put("page", page);

View file

@ -0,0 +1,37 @@
package name.lkk.kkmall.order.listener;
import com.rabbitmq.client.Channel;
import name.lkk.kkmall.order.entity.OrderEntity;
import name.lkk.kkmall.order.service.OrderService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
/**
* <p>Title: OrderCloseListener</p>
* Description
* date2020/7/4 1:36
*/
@Service
@RabbitListener(queues = "${kkMallRabbitmq.MQConfig.queues}")
public class OrderCloseListener {
@Autowired
private OrderService orderService;
@RabbitHandler
public void listener(OrderEntity entity, Channel channel, Message message) throws IOException {
try {
orderService.closeOrder(entity);
// 手动调用支付宝收单
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}

View file

@ -21,10 +21,26 @@ public interface OrderService extends IService<OrderEntity> {
PageUtils queryPage(Map<String, Object> params);
OrderEntity getOrderByOrderSn(String orderSn);
/**
* 确认订单
*
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException;
/**
* 提交订单
*
* @param submitVo
* @return
*/
SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo);
void closeOrder(OrderEntity entity);
}

View file

@ -8,6 +8,7 @@ import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import name.lkk.common.enume.OrderStatusEnum;
import name.lkk.common.exception.NotStockException;
import name.lkk.common.to.mq.OrderTo;
import name.lkk.common.utils.PageUtils;
import name.lkk.common.utils.Query;
import name.lkk.common.utils.R;
@ -25,7 +26,11 @@ import name.lkk.kkmall.order.service.OrderItemService;
import name.lkk.kkmall.order.service.OrderService;
import name.lkk.kkmall.order.to.OrderCreateTo;
import name.lkk.kkmall.order.vo.*;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.StringRedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;
@ -67,6 +72,18 @@ public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> impleme
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${kkMallRabbitmq.MQConfig.eventExchange}")
private String eventExchange;
@Value("${kkMallRabbitmq.MQConfig.createOrder}")
private String createOrder;
@Value("${kkMallRabbitmq.MQConfig.ReleaseOtherKey}")
private String ReleaseOtherKey;
@Autowired
private RabbitTemplate rabbitTemplate;
private ThreadLocal<OrderSubmitVo> confirmVoThreadLocal = new ThreadLocal<>();
@ -80,6 +97,11 @@ public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> impleme
return new PageUtils(page);
}
@Override
public OrderEntity getOrderByOrderSn(String orderSn) {
return this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
}
@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
MemberRsepVo memberRsepVo = LoginUserInterceptor.threadLocal.get();
@ -189,7 +211,7 @@ public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> impleme
if (r.getCode() == 0) {
// 库存足够 锁定成功 给MQ发送消息
submitVo.setOrderEntity(order.getOrder());
// rabbitTemplate.convertAndSend(this.eventExchange, this.createOrder, order.getOrder());
rabbitTemplate.convertAndSend(this.eventExchange, this.createOrder, order.getOrder());
// int i = 10/0;
} else {
// 锁定失败
@ -205,6 +227,30 @@ public class OrderServiceImpl extends ServiceImpl<OrderDao, OrderEntity> impleme
}
@Override
public void closeOrder(OrderEntity entity) {
log.info("收到过期的订单信息--准关闭订单:" + entity.getOrderSn());
// 查询这个订单的最新状态
OrderEntity orderEntity = this.getById(entity.getId());
if (orderEntity.getStatus().equals(OrderStatusEnum.CREATE_NEW.getCode())) {
OrderEntity update = new OrderEntity();
update.setId(entity.getId());
update.setStatus(OrderStatusEnum.CANCELED.getCode());
this.updateById(update);
// 发送给MQ告诉它有一个订单被自动关闭了
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(orderEntity, orderTo);
try {
// 保证消息 100% 发出去 每一个消息在数据库保存详细信息
// 定期扫描数据库 将失败的消息在发送一遍
rabbitTemplate.convertAndSend(eventExchange, ReleaseOtherKey, orderTo);
} catch (AmqpException e) {
// 将没发送成功的消息进行重试发送.
}
}
}
/**
* 保存订单所有数据
*/

View file

@ -1,14 +1,23 @@
package name.lkk.kkmall.order.web;
import name.lkk.kkmall.order.entity.OrderEntity;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.ResponseBody;
import java.util.Date;
import java.util.UUID;
@Controller
public class HelloController {
@Autowired
RabbitTemplate rabbitTemplate;
/**
* 用于测试各个页面是否能正常访问
@ -22,5 +31,15 @@ public class HelloController {
return page;
}
@ResponseBody
@GetMapping("/test/createOrder")
public String createOrderTest() {
OrderEntity orderEntity = new OrderEntity();
orderEntity.setOrderSn(UUID.randomUUID().toString().replace("-", ""));
orderEntity.setModifyTime(new Date());
rabbitTemplate.convertAndSend("order-event-exchange", "order.create.order", orderEntity);
return "下单成功";
}
}

View file

@ -12,4 +12,4 @@ spring.rabbitmq.publisher-returns=true
# 只要消息抵达Queue就会异步发送优先回调returnfirm
spring.rabbitmq.template.mandatory=true
# 手动ack消息不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL

View file

@ -42,3 +42,17 @@ kkmall:
core-size: 20
max-size: 200
keep-alive-time: 10
kkMallRabbitmq:
MQConfig:
# 订单队列
queues: order.release.order.queue
delayQueue: order.delay.queue
eventExchange: order-event-exchange
routingKey: order.release.order
createOrder: order.create.order
# 订单自动过期时间 单位:(毫秒)
ttl: 900000
# 库存解锁队列
ReleaseOther: stock.release.stock.queue
ReleaseOtherKey: order.release.other

View file

@ -24,7 +24,11 @@
<artifactId>kkmall-common</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<!--引入消息队列-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>

View file

@ -1,10 +1,12 @@
package name.lkk.kkmall.ware;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
import org.springframework.cloud.openfeign.EnableFeignClients;
@EnableRabbit
@SpringBootApplication
@EnableDiscoveryClient
@EnableFeignClients("name.lkk.kkmall.ware.feign")

View file

@ -0,0 +1,108 @@
package name.lkk.kkmall.ware.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.Exchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* <p>Title: MyRabbitConfig</p>
* Description
* date2020/7/3 19:41
*/
@Configuration
public class MyRabbitConfig {
@Value("${kkMallRabbitmq.MQConfig.queues}")
private String queues;
@Value("${kkMallRabbitmq.MQConfig.eventExchange}")
private String eventExchange;
@Value("${kkMallRabbitmq.MQConfig.routingKey}")
private String routingKey;
@Value("${kkMallRabbitmq.MQConfig.delayQueue}")
private String delayQueue;
@Value("${kkMallRabbitmq.MQConfig.letterRoutingKey}")
private String letterRoutingKey;
@Value("${kkMallRabbitmq.MQConfig.ttl}")
private Integer ttl;
/**
* 第一次执行时rabbitMQ中无队列可以开启这个方法
* 创建完成后得关掉否则影响正常业务
*/
// @RabbitListener(queues = "stock.release.stock.queue")
// public void handle(){
//
// }
/**
* 消息转换器
*/
@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
/**
* String name, boolean durable, boolean autoDelete, Map<String, Object> arguments
*/
@Bean
public Exchange stockEventExchange() {
return new TopicExchange(eventExchange, true, false);
}
/**
* String name, boolean durable, boolean exclusive, boolean autoDelete, @Nullable Map<String, Object> arguments
*/
@Bean
public Queue stockReleaseQueue() {
return new Queue(queues, true, false, false);
}
@Bean
public Queue stockDelayQueue() {
Map<String, Object> args = new HashMap<>();
// 信死了 交给 [stock-event-exchange] 交换机
args.put("x-dead-letter-exchange", eventExchange);
// 死信路由
args.put("x-dead-letter-routing-key", letterRoutingKey);
args.put("x-message-ttl", ttl);
return new Queue(delayQueue, true, false, false, args);
}
/**
* 普通队列的绑定关系
* String destination, DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
*/
@Bean
public Binding stockLockedReleaseBinding() {
return new Binding(queues, Binding.DestinationType.QUEUE, eventExchange, letterRoutingKey + ".#", null);
}
/**
* 延时队列的绑定关系
* String destination, DestinationType destinationType, String exchange, String routingKey, @Nullable Map<String, Object> arguments
*/
@Bean
public Binding stockLockedBinding() {
return new Binding(delayQueue, Binding.DestinationType.QUEUE, eventExchange, routingKey, null);
}
}

View file

@ -6,11 +6,7 @@ import org.springframework.cloud.openfeign.FeignClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
/**
* <p>Title: OrderFeignService</p>
* Description
* date2020/7/3 22:15
*/
@FeignClient("kkmall-order")
public interface OrderFeignService {

View file

@ -0,0 +1,52 @@
package name.lkk.kkmall.ware.listener;
import com.rabbitmq.client.Channel;
import name.lkk.common.to.mq.OrderTo;
import name.lkk.common.to.mq.StockLockedTo;
import name.lkk.kkmall.ware.service.WareSkuService;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
@RabbitListener(queues = "${kkMallRabbitmq.MQConfig.queues}")
public class StockReleaseListener {
@Autowired
private WareSkuService wareSkuService;
/**
* 下单成功 库存解锁 接下来业务调用失败
* <p>
* 只要解锁库存消息失败 一定要告诉服务解锁失败
*/
@RabbitHandler
public void handleStockLockedRelease(StockLockedTo to, Message message, Channel channel) throws IOException {
try {
wareSkuService.unlockStock(to);
// 执行成功的 回复 [仅回复自己的消息]
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
/**
* 订单关闭后 发送的消息这里接收
*/
@RabbitHandler
public void handleOrderCloseRelease(OrderTo to, Message message, Channel channel) throws IOException {
try {
wareSkuService.unlockStock(to);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);
}
}
}

View file

@ -16,5 +16,7 @@ import java.util.Map;
public interface WareOrderTaskService extends IService<WareOrderTaskEntity> {
PageUtils queryPage(Map<String, Object> params);
WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn);
}

View file

@ -2,6 +2,8 @@ package name.lkk.kkmall.ware.service;
import com.baomidou.mybatisplus.extension.service.IService;
import name.lkk.common.to.es.SkuHasStockVo;
import name.lkk.common.to.mq.OrderTo;
import name.lkk.common.to.mq.StockLockedTo;
import name.lkk.common.utils.PageUtils;
import name.lkk.kkmall.ware.entity.WareSkuEntity;
import name.lkk.kkmall.ware.vo.WareSkuLockVo;
@ -36,5 +38,11 @@ public interface WareSkuService extends IService<WareSkuEntity> {
Boolean orderLockStock(WareSkuLockVo vo);
void unlockStock(StockLockedTo to);
/**
* 由于订单超时而自动释放订单之后来解锁库存
*/
void unlockStock(OrderTo to);
}

View file

@ -1,16 +1,16 @@
package name.lkk.kkmall.ware.service.impl;
import org.springframework.stereotype.Service;
import java.util.Map;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import name.lkk.common.utils.PageUtils;
import name.lkk.common.utils.Query;
import name.lkk.kkmall.ware.dao.WareOrderTaskDao;
import name.lkk.kkmall.ware.entity.WareOrderTaskEntity;
import name.lkk.kkmall.ware.service.WareOrderTaskService;
import org.springframework.stereotype.Service;
import java.util.Map;
@Service("wareOrderTaskService")
@ -26,4 +26,9 @@ public class WareOrderTaskServiceImpl extends ServiceImpl<WareOrderTaskDao, Ware
return new PageUtils(page);
}
@Override
public WareOrderTaskEntity getOrderTaskByOrderSn(String orderSn) {
return this.getOne(new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn));
}
}

View file

@ -1,10 +1,16 @@
package name.lkk.kkmall.ware.service.impl;
import com.alibaba.fastjson.TypeReference;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import lombok.extern.slf4j.Slf4j;
import name.lkk.common.enume.OrderStatusEnum;
import name.lkk.common.exception.NotStockException;
import name.lkk.common.to.es.SkuHasStockVo;
import name.lkk.common.to.mq.OrderTo;
import name.lkk.common.to.mq.StockDetailTo;
import name.lkk.common.to.mq.StockLockedTo;
import name.lkk.common.utils.PageUtils;
import name.lkk.common.utils.Query;
import name.lkk.common.utils.R;
@ -12,14 +18,19 @@ import name.lkk.kkmall.ware.dao.WareSkuDao;
import name.lkk.kkmall.ware.entity.WareOrderTaskDetailEntity;
import name.lkk.kkmall.ware.entity.WareOrderTaskEntity;
import name.lkk.kkmall.ware.entity.WareSkuEntity;
import name.lkk.kkmall.ware.feign.OrderFeignService;
import name.lkk.kkmall.ware.feign.ProductFeignService;
import name.lkk.kkmall.ware.service.WareOrderTaskDetailService;
import name.lkk.kkmall.ware.service.WareOrderTaskService;
import name.lkk.kkmall.ware.service.WareSkuService;
import name.lkk.kkmall.ware.vo.OrderItemVo;
import name.lkk.kkmall.ware.vo.OrderVo;
import name.lkk.kkmall.ware.vo.SkuWareHasStock;
import name.lkk.kkmall.ware.vo.WareSkuLockVo;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.util.ObjectUtils;
@ -30,6 +41,7 @@ import java.util.stream.Collectors;
@Service("wareSkuService")
@Slf4j
public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> implements WareSkuService {
@Autowired
@ -43,6 +55,18 @@ public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> i
@Autowired
private WareOrderTaskDetailService orderTaskDetailService;
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private OrderFeignService orderFeignService;
@Value("${kkMallRabbitmq.MQConfig.eventExchange}")
private String eventExchange;
@Value("${kkMallRabbitmq.MQConfig.routingKey}")
private String routingKey;
/**
* 商品库存的模糊查询
* skuId: 1
@ -52,7 +76,7 @@ public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> i
public PageUtils queryPage(Map<String, Object> params) {
QueryWrapper<WareSkuEntity> wrapper = new QueryWrapper<>();
String id = (String) params.get("skuId");
if(!ObjectUtils.isEmpty(id)){
if (!ObjectUtils.isEmpty(id)) {
wrapper.eq("sku_id", id);
}
id = (String) params.get("wareId");
@ -159,14 +183,14 @@ public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> i
// TODO 告诉MQ库存锁定成功 一个订单锁定成功 消息队列就会有一个消息
WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity(null, skuId, "", hasStock.getNum(), taskEntity.getId(), wareId, 1);
orderTaskDetailService.save(detailEntity);
// StockLockedTo stockLockedTo = new StockLockedTo();
// stockLockedTo.setId(taskEntity.getId());
// StockDetailTo detailTo = new StockDetailTo();
// BeanUtils.copyProperties(detailEntity, detailTo);
// // 防止回滚以后找不到数据 把详细信息页
// stockLockedTo.setDetailTo(detailTo);
//
// rabbitTemplate.convertAndSend(eventExchange, routingKey ,stockLockedTo);
StockLockedTo stockLockedTo = new StockLockedTo();
stockLockedTo.setId(taskEntity.getId());
StockDetailTo detailTo = new StockDetailTo();
BeanUtils.copyProperties(detailEntity, detailTo);
// 防止回滚以后找不到数据
stockLockedTo.setDetailTo(detailTo);
rabbitTemplate.convertAndSend(eventExchange, routingKey, stockLockedTo);
skuStocked = false;
break;
}
@ -181,4 +205,79 @@ public class WareSkuServiceImpl extends ServiceImpl<WareSkuDao, WareSkuEntity> i
return true;
}
@Override
public void unlockStock(StockLockedTo to) {
log.info("收到解锁库存的消息" + to);
// 库存id
Long id = to.getId();
StockDetailTo detailTo = to.getDetailTo();
Long detailId = detailTo.getId();
/**
* 解锁库存
* 查询数据库关系这个订单的详情
* : 证明库存锁定成功
* 1.没有这个订单, 必须解锁
* 2.有这个订单 不是解锁库存
* 订单状态已取消,解锁库存
* 没取消不能解锁 ;
* 没有就是库存锁定失败 库存回滚了 这种情况无需回滚
*/
WareOrderTaskDetailEntity byId = orderTaskDetailService.getById(detailId);
if (byId != null) {
// 解锁
WareOrderTaskEntity taskEntity = orderTaskService.getById(id);
String orderSn = taskEntity.getOrderSn();
// 根据订单号 查询订单状态 已取消才解锁库存
R orderStatus = orderFeignService.getOrderStatus(orderSn);
if (orderStatus.getCode() == 0) {
// 订单数据返回成功
OrderVo orderVo = orderStatus.getData(new TypeReference<OrderVo>() {
});
// 订单不存在
if (orderVo == null || orderVo.getStatus().equals(OrderStatusEnum.CANCELED.getCode())) {
// 订单已取消 状态1 已锁定 这样才可以解锁
if (byId.getLockStatus() == 1) {
unLock(detailTo.getSkuId(), detailTo.getWareId(), detailTo.getSkuNum(), detailId);
}
}
} else {
// 消息拒绝 重新放回队列 让别人继续消费解锁
throw new RuntimeException("远程服务失败");
}
} else {
// 无需解锁
}
}
/**
* 防止订单服务卡顿 导致订单状态一直改不了 库存消息有限到期 最后导致卡顿的订单 永远无法解锁库存
*/
@Transactional
@Override
public void unlockStock(OrderTo to) {
log.info("\n订单超时自动关闭,准备解锁库存");
String orderSn = to.getOrderSn();
// 查一下最新的库存状态 防止重复解锁库存[Order服务可能会提前解锁]
WareOrderTaskEntity taskEntity = orderTaskService.getOrderTaskByOrderSn(orderSn);
Long taskEntityId = taskEntity.getId();
// 按照工作单找到所有 没有解锁的库存 进行解锁 状态为1等于已锁定
List<WareOrderTaskDetailEntity> entities = orderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", taskEntityId).eq("lock_status", 1));
for (WareOrderTaskDetailEntity entity : entities) {
unLock(entity.getSkuId(), entity.getWareId(), entity.getSkuNum(), entity.getId());
}
}
/**
* 解锁库存
*/
private void unLock(Long skuId, Long wareId, Integer num, Long taskDeailId) {
// 更新库存
wareSkuDao.unlockStock(skuId, wareId, num);
// 更新库存工作单的状态
WareOrderTaskDetailEntity detailEntity = new WareOrderTaskDetailEntity();
detailEntity.setId(taskDeailId);
detailEntity.setLockStatus(2);
orderTaskDetailService.updateById(detailEntity);
}
}

View file

@ -0,0 +1,15 @@
# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
# 虚拟主机配置
spring.rabbitmq.virtual-host=/
# 开启发送端消息抵达Broker确认
#该配置已过时
#spring.rabbitmq.publisher-confirms=true
#spring.rabbitmq.publisher-confirm-type=CORRELATED
## 开启发送端消息抵达Queue确认
#spring.rabbitmq.publisher-returns=true
## 只要消息抵达Queue就会异步发送优先回调returnfirm
#spring.rabbitmq.template.mandatory=true
# 手动ack消息不使用默认的消费端确认
spring.rabbitmq.listener.simple.acknowledge-mode=MANUAL

View file

@ -22,3 +22,13 @@ mybatis-plus:
id-type: auto
server:
port: 11000
kkMallRabbitmq:
MQConfig:
queues: stock.release.stock.queue
delayQueue: stock.delay.queue
eventExchange: stock-event-exchange
routingKey: stock.locked
letterRoutingKey: stock.release
# 库存自动过期时间 单位:(毫秒)
ttl: 900000