一、RabbitMQ

见我自己总结的两篇博客

二、安装 RabbitMQ

1
2
3
4
docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

#修改为自动重启:
docker update rabbitmq --restart=always
  • 4369,25672(Erlang 发现&集群端口)
  • 5672,5671(AMQP 端口)
  • 15672(web 管理后台端口)
  • 61613,61614(STOMP 协议端口)
  • 1883,8883(MQTT 协议端口)

启动

配置文件:

1598619251787

三、整合 SpringBoot

image-20211227225010432

1、在订单服务中引入依赖

1
2
3
4
5
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2、配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27

spring:
rabbitmq:
host: 192.168.56.10
port: 5672
# 虚拟主机
virtual-host: /
# 开启发送端抵达队列确认【发送端确认机制+本地事务表】
publisher-returns: true
# 开启发送确认【发送端确认机制+本地事务表】
publisher-confirm-type: correlated
# 只要抵达队列,优先回调return confirm
template:
mandatory: true
# 使用手动确认模式,关闭自动确认【消息丢失】
listener:
simple:
acknowledge-mode: manual

3@EnableRabbit 加在启动类上【发送消息可以不需要这个注解,监听消息必须使用这个注解】

4、RabbitAutoConfiguration 生效,给容器自动配置了很多类
RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate

5、接收消息注解:
@RabbitListener(queues={"hello-java-queue"})
@RabbitHandler

3、基本使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
1、如何创建Exchanger、Queue、Binding
1)使用AmqpAdmin
@Test
void createExchange() {
// 创建交换机
amqpAdmin.declareExchange(new DirectExchange("hello-java-exchange", true, false));
log.info("Exchange创建[{}]成功", "hello-java-exchange");
}

@Test
void createQueue() {
amqpAdmin.declareQueue(new Queue("hello-java-queue", true, false, false));
log.info("Queue创建[{}]成功", "hello-java-queue");
}

@Test
void createBinding() {
// 创建绑定
// String destination【目的地,队列name或 交换机name(如果作为路由的话)】
// Binding.DestinationType destinationType【目的地类型 queue还是exchange(路由)】
// String exchange【交换机】
// String routingKey【路由键】
// @Nullable Map<String, Object> arguments【自定义参数】
amqpAdmin.declareBinding(new Binding("hello-java-queue", Binding.DestinationType.QUEUE,"hello-java-exchange",
"hello.java", null));
log.info("Binding创建[{}]成功", "hello-java-binding");
}

2、如何发送消息【1、交换机;2、路由键;3、消息】
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void sendMsg() {
rabbitTemplate.convertAndSend("hello-java-exchange", "hello.java", "hello world");
}

3、使用json格式的序列化器
否则使用jdk的序列化器
@Configuration
public class MyRabbitConfig {

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}
}

4、接收消息【1、精确交换机:一条消息只会被消费一次;2、绑定队列就可以了】
1)必须使用@EnableRabbit
2)监听方法必须放在@Component
3@RabbitListener(queues={"hello-java-queue"})放在类上
@RabbitHandler:标在方法上【作用:重载处理不同类型的数据】

1598623702380

接受消息代码:

1598624514669

1598627460409

RabbitMQ 消息确认机制

RabbitMQ 消息确认机制-可靠抵达【手动确认+拒绝(拒绝的进入死信路由)】

1598625968345

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
事务消息:发送返回【信息才到达】

保证消息不丢失,可靠抵达,可以使用事务消息,性能下降250倍,为此引入 确认机制
发送端确认机制: 两个:
publisher confirmCallback 确认模式【如果投递到Broker了,回调confirmCallback方法】
publisher returnCallback未投递到queue 退回模式【如果没有投递到queue,调用returnCallback】

消费端确认机制:【消费者收到消息,给服务器发送确认,服务器删除该消息】
consumer ack机制(消息确认机制)【让Broker知道哪些消息被消费者正确消费了(如果没有则重新投递)】
1、默认是自动确认的,只要消息接收到,客户端会自动确认,服务端会移除这个消息
BUG:消息丢失,消费者监听队列【所有消息会一次性发送到通道,所以自动确认宕机会导致消息丢失】
2、手动确认:处理一个确认一个【否则是未签收状态,服务器宕机则会重新进入ready状态不会丢失】
参数1:消息下标,参数2:是否批量签收
签收:channel.basicAck(message.getMessageProperties().getDeliverTag(), false);

最终解决方案:确认机制+本地事务表
1、发送消息的时候生成消息ID,然后在回调方法里面修改数据库里消息的状态
2、定时扫描数据库消息的状态,没有成功的重新投递一次
3、消费消息时使用手动签收机制【不要使用自动签收】

配置:
spring:
rabbitmq:
host: 192.168.56.10
port: 5672
# 虚拟主机
virtual-host: /
# 开启发送端抵达队列确认【发送端确认机制+本地事务表】
publisher-returns: true
# 开启发送确认【发送端确认机制+本地事务表】
publisher-confirm-type: correlated
# 只要抵达队列,优先回调return confirm
template:
mandatory: true
# 使用手动确认模式,关闭自动确认【消息丢失】
listener:
simple:
acknowledge-mode: manual


@Configuration
public class MyRabbitConfig {
@Autowired
private RabbitTemplate rabbitTemplate;

@Bean
public MessageConverter messageConverter() {
return new Jackson2JsonMessageConverter();
}

/**
* 定制RabbitTemplate
* 1、服务收到消息就会回调
* 1、spring.rabbitmq.publisher-confirms: true
* 2、设置确认回调
* 2、消息正确抵达队列就会进行回调
* 1、spring.rabbitmq.publisher-returns: true
* spring.rabbitmq.template.mandatory: true
* 2、设置确认回调ReturnCallback
*
* 3、消费端确认(保证每个消息都被正确消费,此时才可以broker删除这个消息)
*
*/
@PostConstruct //MyRabbitConfig对象创建完成以后,执行这个方法
public void initRabbitTemplate() {

/**
* 1、只要消息抵达Broker就ack=true
* correlationData:当前消息的唯一关联数据(这个是消息的唯一id)
* ack:消息是否成功收到
* cause:失败的原因
*/
//设置确认回调
rabbitTemplate.setConfirmCallback((correlationData,ack,cause) -> {
System.out.println("confirm...correlationData["+correlationData+"]==>ack:["+ack+"]==>cause:["+cause+"]");
});


/**
* 只要消息没有投递给指定的队列,就触发这个失败回调
* message:投递失败的消息详细信息
* replyCode:回复的状态码
* replyText:回复的文本内容
* exchange:当时这个消息发给哪个交换机
* routingKey:当时这个消息用哪个路邮键
*/
rabbitTemplate.setReturnCallback((message,replyCode,replyText,exchange,routingKey) -> {
System.out.println("Fail Message["+message+"]==>replyCode["+replyCode+"]" +
"==>replyText["+replyText+"]==>exchange["+exchange+"]==>routingKey["+routingKey+"]");
});
}
}

1
2
3
使用方法:
监听队列的方法参数上加上通道Channel
然后channel 签收或拒绝 ack/reject(成为死信)

15986262981291598626838638

1598627865733

消费者消费:

签收+拒收【并返回服务器入队】multiple:批量签收,requeue:是否重新入队

image-20211227231131739

四、项目搭建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
1、等待付款(详情页)/mydata/nginx/html/static/order/detail

2、订单页(订单列表页)/mydata/nginx/html/static/order/list

3、结算页(订单确认页)/mydata/nginx/html/static/order/confirm

4、收银页(支付页)/mydata/nginx/html/static/order/pay

# gulimall
192.168.56.10 gulimall.com
192.168.56.10 search.gulimall.com
192.168.56.10 item.gulimall.com
192.168.56.10 auth.gulimall.com
192.168.56.10 cart.gulimall.com
192.168.56.10 order.gulimall.com

127.0.0.1 ssoserver.com
127.0.0.1 client1.com
127.0.0.1 client2.com

#网关
- id: gulimall_order_route
uri: lb://gulimall-order
predicates:
- Host=order.gulimall.com

等待付款,详情页:

1598667020590

1598667039261


订单页:

1598667069559


结算页,订单确认页:

1598667130041


收银页:

1598667165660

五、订单服务

1、订单流程

订单生成 -> 支付订单 -> 卖家发货 -> 确认收货 -> 交易成功

2、登录拦截

因为订单系统必然涉及到用户信息,因此进入订单系统的请求必须是已经登录的,所以我们必须通过拦截器对为登录订单请求进行拦截

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
@Component
public class LoginInterceptor implements HandlerInterceptor {

public static ThreadLocal<MemberResponseVo> loginUser = new ThreadLocal<>();

@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) throws Exception {
HttpSession session = request.getSession();
MemberResponseVo memberResponseVo = (MemberResponseVo) session.getAttribute(AuthServerConstant.LOGIN_USER);
if(memberResponseVo != null){
loginUser.set(memberResponseVo);
return true;
}else{
session.setAttribute("msg","请先登录");
response.sendRedirect("http://auth.gulimall.com/login.html");
return false;
}
}
}
1
2
3
4
5
6
7
8
9
10
11
@Configuration
public class GulimallWebConfig implements WebMvcConfigurer {

@Resource
private LoginInterceptor loginInterceptor;

@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(loginInterceptor).addPathPatterns("/**");
}
}

3、订单确认页

3.1 模型抽取

跳转到确认页时需要携带的数据模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
public class OrderConfirmVo {

@Getter
@Setter
/** 会员收获地址列表 **/
private List<MemberAddressVo> memberAddressVos;

@Getter @Setter
/** 所有选中的购物项 **/
private List<OrderItemVo> items;

/** 发票记录 **/
@Getter @Setter
/** 优惠券(会员积分) **/
private Integer integration;

/** 防止重复提交的令牌 **/
@Getter @Setter
private String orderToken;

@Getter @Setter
Map<Long,Boolean> stocks;

public Integer getCount() {
Integer count = 0;
if (items != null && items.size() > 0) {
for (OrderItemVo item : items) {
count += item.getCount();
}
}
return count;
}


/** 订单总额 **/
//BigDecimal total;
//计算订单总额
public BigDecimal getTotal() {
BigDecimal totalNum = BigDecimal.ZERO;
if (items != null && items.size() > 0) {
for (OrderItemVo item : items) {
//计算当前商品的总价格
BigDecimal itemPrice = item.getPrice().multiply(new BigDecimal(item.getCount().toString()));
//再计算全部商品的总价格
totalNum = totalNum.add(itemPrice);
}
}
return totalNum;
}


/** 应付价格 **/
//BigDecimal payPrice;
public BigDecimal getPayPrice() {
return getTotal();
}
}

3.2 数据获取

  • 查询购物项、库存和收货地址都要远程调用远程服务,串行会浪费大量时间,因此我们使用 CompletableFuture 进行异步编排
  • 可能由于延迟,订单提交按钮可能被点击多次,为了防止重复提交的问题,我们在返回订单确认页时,在 redis 中生成一个随机的令牌,过期时间为 30min,提交的订单会携带这个令牌,我们将会在订单提交的处理页面核验令牌。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@RequestMapping("/toTrade")
public String toTrade(Model model) {
OrderConfirmVo confirmVo = orderService.confirmOrder();
model.addAttribute("confirmOrder", confirmVo);
return "confirm";
}

@Override
public OrderConfirmVo confirmOrder() throws ExecutionException, InterruptedException {
MemberResponseVo memberResponseVo = LoginInterceptor.loginUser.get();
OrderConfirmVo orderConfirmVo = new OrderConfirmVo();

//TODO :获取当前线程请求头信息(解决Feign异步调用丢失请求头问题)
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();

// 开启第一个异步任务
CompletableFuture<Void> addressFuture = CompletableFuture.runAsync(() -> {

// 每个线程都来共享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);

// 1、远程查询所有的收获地址列表
List<MemberAddressVo> address = memberFeignService.getAddress(memberResponseVo.getId());
orderConfirmVo.setMemberAddressVos(address);
}, executor);


CompletableFuture<Void> cartInfoFuture = CompletableFuture.runAsync(() -> {

// 每个线程都来功享之前的请求数据
RequestContextHolder.setRequestAttributes(requestAttributes);

// 2. 查出所有选中购物项
List<OrderItemVo> checkedItems = cartFeignService.getCurrentCartItems();
orderConfirmVo.setItems(checkedItems);
// fegin在远程调用之前要构造请求,调用很多的拦截器
}, executor);

// 3、查询用户积分
Integer integration = memberResponseVo.getIntegration();
orderConfirmVo.setIntegeration(integration);

// 4、价格数据自动计算

// TODO 5、防重令牌(防止表单重复提交)
// 为用户设置一个token,三十分钟过期时间(存在redis)
String token = UUID.randomUUID().toString().replace("-", "");
redisTemplate.opsForValue().set(OrderConstant.USER_ORDER_TOKEN_PREFIX+memberResponseVo.getId(),token,30, TimeUnit.MINUTES);
orderConfirmVo.setOrderToken(token);

CompletableFuture.allOf(addressFuture,cartInfoFuture).get();

return orderConfirmVo;
}

3.3 Feign 远程调用丢失请求头问题

feign远程调用的请求头中没有含有JSESSIONIDcookie,所以也就不能得到服务端的session数据,cart 认为没登录,获取不了用户信息

1
2
3
4
5
6
Request targetRequest(RequestTemplate template) {
for (RequestInterceptor interceptor : requestInterceptors) {
interceptor.apply(template);
}
return target.apply(template);
}

但是在feign的调用过程中,会使用容器中的RequestInterceptorRequestTemplate进行处理,因此我们可以通过向容器中导入定制的RequestInterceptor为请求加上cookie

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class GuliFeignConfig {
@Bean
public RequestInterceptor requestInterceptor() {
return new RequestInterceptor() {
@Override
public void apply(RequestTemplate template) {
//1. 使用RequestContextHolder拿到老请求的请求数据
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes();
if (requestAttributes != null) {
HttpServletRequest request = requestAttributes.getRequest();
if (request != null) {
//2. 将老请求得到cookie信息放到feign请求上
String cookie = request.getHeader("Cookie");
template.header("Cookie", cookie);
}
}
}
};
}
}
  • RequestContextHolder为 SpingMVC 中共享request数据的上下文,底层由ThreadLocal实现

经过RequestInterceptor处理后的请求如下,已经加上了请求头的Cookie信息

3.4 Feign 异步情况丢失上下文问题

  • 由于RequestContextHolder使用ThreadLocal共享数据,所以在开启异步时获取不到老请求的信息,自然也就无法共享cookie

在这种情况下,我们需要在开启异步的时候将老请求的RequestContextHolder的数据设置进去

3.5 运费收件信息获取

数据封装

1
2
3
4
5
@Data
public class FareVo {
private MemberAddressVo address;
private BigDecimal fare;
}

在页面将选中地址的 id 传给请求

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@RequestMapping("/fare/{addrId}")
public FareVo getFare(@PathVariable("addrId") Long addrId) {
return wareInfoService.getFare(addrId);
}

@Override
public FareVo getFare(Long addrId) {
FareVo fareVo = new FareVo();
R info = memberFeignService.info(addrId);
if (info.getCode() == 0) {
MemberAddressVo address = info.getData("memberReceiveAddress", new TypeReference<MemberAddressVo>() {
});
fareVo.setAddress(address);
String phone = address.getPhone();
//取电话号的最后两位作为邮费
String fare = phone.substring(phone.length() - 2, phone.length());
fareVo.setFare(new BigDecimal(fare));
}
return fareVo;
}

4、订单提交

4.1 模型抽取

页面提交数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
@Data
public class OrderSubmitVo {

/** 收获地址的id **/
private Long addrId;

/** 支付方式 **/
private Integer payType;
//无需提交要购买的商品,去购物车再获取一遍
//优惠、发票

/** 防重令牌 **/
private String orderToken;

/** 应付价格 **/
private BigDecimal payPrice;

/** 订单备注 **/
private String remarks;

//用户相关的信息,直接去session中取出即可
}

成功后转发至支付页面携带数据

1
2
3
4
5
6
7
8
@Data
public class SubmitOrderResponseVo {

private OrderEntity order;

/** 错误状态码 **/
private Integer code;
}

4.2 提交订单

  • 提交订单成功,则携带返回数据转发至支付页面
  • 提交订单失败,则携带错误信息重定向至确认页
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@RequestMapping("/submitOrder")
public String submitOrder(OrderSubmitVo submitVo, Model model, RedirectAttributes attributes) {
try{
SubmitOrderResponseVo responseVo=orderService.submitOrder(submitVo);
Integer code = responseVo.getCode();
if (code==0){
model.addAttribute("order", responseVo.getOrder());
return "pay";
}else {
String msg = "下单失败;";
switch (code) {
case 1:
msg += "防重令牌校验失败";
break;
case 2:
msg += "商品价格发生变化";
break;
}
attributes.addFlashAttribute("msg", msg);
return "redirect:http://order.gulimall.com/toTrade";
}
}catch (Exception e){
if (e instanceof NoStockException){
String msg = "下单失败,商品无库存";
attributes.addFlashAttribute("msg", msg);
}
return "redirect:http://order.gulimall.com/toTrade";
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
    @Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {
SubmitOrderResponseVo responseVo = new SubmitOrderResponseVo();
responseVo.setCode(0);
//1. 验证防重令牌
MemberResponseVo memberResponseVo = LoginInterceptor.loginUser.get();
String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long execute = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), submitVo.getOrderToken());
if (execute == 0L) {
//1.1 防重令牌验证失败
responseVo.setCode(1);
return responseVo;
}else {
//2. 创建订单、订单项
OrderCreateTo order =createOrderTo(memberResponseVo,submitVo);

//3. 验价
BigDecimal payAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = submitVo.getPayPrice();
if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
//4. 保存订单
saveOrder(order);
//5. 锁定库存
List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {
OrderItemVo orderItemVo = new OrderItemVo();
orderItemVo.setSkuId(item.getSkuId());
orderItemVo.setCount(item.getSkuQuantity());
return orderItemVo;
}).collect(Collectors.toList());
R r = wareFeignService.orderLockStock(orderItemVos);
//5.1 锁定库存成功
if (r.getCode()==0){
// int i = 10 / 0;
responseVo.setOrder(order.getOrder());
responseVo.setCode(0);
return responseVo;
}else {
//5.1 锁定库存失败
String msg = (String) r.get("msg");
throw new NoStockException(msg);
}

}else {
//验价失败
responseVo.setCode(2);
return responseVo;
}
}
}
1.验证防重令牌

为防止在获取令牌、对比值和删除令牌之间发生错误导入令牌校验出错,我们必须使用脚本保证原子性操作

1
2
3
4
5
6
7
MemberResponseVo memberResponseVo = LoginInterceptor.loginUser.get();
String script= "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
Long execute = redisTemplate.execute(new DefaultRedisScript<>(script,Long.class), Arrays.asList(OrderConstant.USER_ORDER_TOKEN_PREFIX + memberResponseVo.getId()), submitVo.getOrderToken());
if (execute == 0L) {
//1.1 防重令牌验证失败
responseVo.setCode(1);
return responseVo;
2.创建订单、订单项

抽取模型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Data
public class OrderCreateTo {

private OrderEntity order;

private List<OrderItemEntity> orderItems;

/** 订单计算的应付价格 **/
private BigDecimal payPrice;

/** 运费 **/
private BigDecimal fare;

}

创建订单、订单项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
//2. 创建订单、订单项
OrderCreateTo order =createOrderTo(memberResponseVo,submitVo);

private OrderCreateTo createOrderTo(MemberResponseVo memberResponseVo, OrderSubmitVo submitVo) {
//2.1 用IdWorker生成订单号
String orderSn = IdWorker.getTimeId();
//2.2 构建订单
OrderEntity entity = buildOrder(memberResponseVo, submitVo,orderSn);
//2.3 构建订单项
List<OrderItemEntity> orderItemEntities = buildOrderItems(orderSn);
//2.4 计算价格
compute(entity, orderItemEntities);
OrderCreateTo createTo = new OrderCreateTo();
createTo.setOrder(entity);
createTo.setOrderItems(orderItemEntities);
return createTo;
}

构建订单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
private OrderEntity buildOrder(MemberResponseVo memberResponseVo, OrderSubmitVo submitVo, String orderSn) {

OrderEntity orderEntity =new OrderEntity();

orderEntity.setOrderSn(orderSn);

//1) 设置用户信息
orderEntity.setMemberId(memberResponseVo.getId());
orderEntity.setMemberUsername(memberResponseVo.getUsername());

//2) 获取邮费和收件人信息并设置
FareVo fareVo = wareFeignService.getFare(submitVo.getAddrId());
BigDecimal fare = fareVo.getFare();
orderEntity.setFreightAmount(fare);
MemberAddressVo address = fareVo.getAddress();
orderEntity.setReceiverName(address.getName());
orderEntity.setReceiverPhone(address.getPhone());
orderEntity.setReceiverPostCode(address.getPostCode());
orderEntity.setReceiverProvince(address.getProvince());
orderEntity.setReceiverCity(address.getCity());
orderEntity.setReceiverRegion(address.getRegion());
orderEntity.setReceiverDetailAddress(address.getDetailAddress());

//3) 设置订单相关的状态信息
orderEntity.setStatus(OrderStatusEnum.CREATE_NEW.getCode());
orderEntity.setConfirmStatus(0);
orderEntity.setAutoConfirmDay(7);

return orderEntity;
}

构建订单项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
private OrderItemEntity buildOrderItem(OrderItemVo item) {
OrderItemEntity orderItemEntity = new OrderItemEntity();
Long skuId = item.getSkuId();
//1) 设置sku相关属性
orderItemEntity.setSkuId(skuId);
orderItemEntity.setSkuName(item.getTitle());
orderItemEntity.setSkuAttrsVals(StringUtils.collectionToDelimitedString(item.getSkuAttrValues(), ";"));
orderItemEntity.setSkuPic(item.getImage());
orderItemEntity.setSkuPrice(item.getPrice());
orderItemEntity.setSkuQuantity(item.getCount());
//2) 通过skuId查询spu相关属性并设置
R r = productFeignService.getSpuBySkuId(skuId);
if (r.getCode() == 0) {
SpuInfoTo spuInfo = r.getData(new TypeReference<SpuInfoTo>() {
});
orderItemEntity.setSpuId(spuInfo.getId());
orderItemEntity.setSpuName(spuInfo.getSpuName());
orderItemEntity.setSpuBrand(spuInfo.getBrandName());
orderItemEntity.setCategoryId(spuInfo.getCatalogId());
}
//3) 商品的优惠信息(不做)

//4) 商品的积分成长,为价格x数量
orderItemEntity.setGiftGrowth(item.getPrice().multiply(new BigDecimal(item.getCount())).intValue());
orderItemEntity.setGiftIntegration(item.getPrice().multiply(new BigDecimal(item.getCount())).intValue());

//5) 订单项订单价格信息
orderItemEntity.setPromotionAmount(BigDecimal.ZERO);
orderItemEntity.setCouponAmount(BigDecimal.ZERO);
orderItemEntity.setIntegrationAmount(BigDecimal.ZERO);

//6) 实际价格
BigDecimal origin = orderItemEntity.getSkuPrice().multiply(new BigDecimal(orderItemEntity.getSkuQuantity()));
BigDecimal realPrice = origin.subtract(orderItemEntity.getPromotionAmount())
.subtract(orderItemEntity.getCouponAmount())
.subtract(orderItemEntity.getIntegrationAmount());
orderItemEntity.setRealAmount(realPrice);

return orderItemEntity;
}

计算订单价格

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private void compute(OrderEntity entity, List<OrderItemEntity> orderItemEntities) {
//总价
BigDecimal total = BigDecimal.ZERO;
//优惠价格
BigDecimal promotion=new BigDecimal("0.0");
BigDecimal integration=new BigDecimal("0.0");
BigDecimal coupon=new BigDecimal("0.0");
//积分
Integer integrationTotal = 0;
Integer growthTotal = 0;

for (OrderItemEntity orderItemEntity : orderItemEntities) {
total=total.add(orderItemEntity.getRealAmount());
promotion=promotion.add(orderItemEntity.getPromotionAmount());
integration=integration.add(orderItemEntity.getIntegrationAmount());
coupon=coupon.add(orderItemEntity.getCouponAmount());
integrationTotal += orderItemEntity.getGiftIntegration();
growthTotal += orderItemEntity.getGiftGrowth();
}

entity.setTotalAmount(total);
entity.setPromotionAmount(promotion);
entity.setIntegrationAmount(integration);
entity.setCouponAmount(coupon);
entity.setIntegration(integrationTotal);
entity.setGrowth(growthTotal);

//付款价格=商品价格+运费
entity.setPayAmount(entity.getFreightAmount().add(total));

//设置删除状态(0-未删除,1-已删除)
entity.setDeleteStatus(0);
}
3.验价

将页面提交的价格和后台计算的价格进行对比,若不同则提示用户商品价格发生变化

1
2
3
4
5
6
7
8
9
BigDecimal payAmount = order.getOrder().getPayAmount();
BigDecimal payPrice = submitVo.getPayPrice();
if (Math.abs(payAmount.subtract(payPrice).doubleValue()) < 0.01) {
/****************/
}else {
//验价失败
responseVo.setCode(2);
return responseVo;
}
4.保存订单
1
2
3
4
5
6
7
private void saveOrder(OrderCreateTo orderCreateTo) {
OrderEntity order = orderCreateTo.getOrder();
order.setCreateTime(new Date());
order.setModifyTime(new Date());
this.save(order);
orderItemService.saveBatch(orderCreateTo.getOrderItems());
}
5.锁定库存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
List<OrderItemVo> orderItemVos = order.getOrderItems().stream().map((item) -> {
OrderItemVo orderItemVo = new OrderItemVo();
orderItemVo.setSkuId(item.getSkuId());
orderItemVo.setCount(item.getSkuQuantity());
return orderItemVo;
}).collect(Collectors.toList());
R r = wareFeignService.orderLockStock(orderItemVos);
//5.1 锁定库存成功
if (r.getCode()==0){
responseVo.setOrder(order.getOrder());
responseVo.setCode(0);
return responseVo;
}else {
//5.2 锁定库存失败
String msg = (String) r.get("msg");
throw new NoStockException(msg);
}
  • 找出所有库存大于商品数的仓库
  • 遍历所有满足条件的仓库,逐个尝试锁库存,若锁库存成功则退出遍历
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@RequestMapping("/lock/order")
public R orderLockStock(@RequestBody List<OrderItemVo> itemVos) {
try {
Boolean lock = wareSkuService.orderLockStock(itemVos);
return R.ok();
} catch (NoStockException e) {
return R.error(BizCodeEnum.NO_STOCK_EXCEPTION.getCode(), BizCodeEnum.NO_STOCK_EXCEPTION.getMsg());
}
}

@Transactional
@Override
public Boolean orderLockStock(List<OrderItemVo> itemVos) {
List<SkuLockVo> lockVos = itemVos.stream().map((item) -> {
SkuLockVo skuLockVo = new SkuLockVo();
skuLockVo.setSkuId(item.getSkuId());
skuLockVo.setNum(item.getCount());
//找出所有库存大于商品数的仓库
List<Long> wareIds = baseMapper.listWareIdsHasStock(item.getSkuId(), item.getCount());
skuLockVo.setWareIds(wareIds);
return skuLockVo;
}).collect(Collectors.toList());

for (SkuLockVo lockVo : lockVos) {
boolean lock = true;
Long skuId = lockVo.getSkuId();
List<Long> wareIds = lockVo.getWareIds();
//如果没有满足条件的仓库,抛出异常
if (wareIds == null || wareIds.size() == 0) {
throw new NoStockException(skuId);
}else {
for (Long wareId : wareIds) {
Long count=baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);
if (count==0){
lock=false;
}else {
lock = true;
break;
}
}
}
if (!lock) throw new NoStockException(skuId);
}
return true;
}

这里通过异常机制控制事务回滚,如果在锁定库存失败则抛出NoStockExceptions,订单服务和库存服务都会回滚。

六、分布式事务

1、分布式事务

2、整合 Spring cloud alibaba Seata

在 common 添加依赖

seata-all 使用 0.9【所以启动 事务协调者 0.9 版本的】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
<!--seata 分布式事务-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
<exclusions>
<exclusion>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-all</artifactId>
<version>0.9.0</version>
</dependency>

2、使用一个 @GlobalTransactional 注解在业务方法上【TM 事务管理器上】
资源管理器上只需要标注@Transactional 就可以了【各远程方法】

1
2
3
4
5
@GlobalTransactional
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {
}

3、各数据库上创建表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
-- 注意此处0.3.0+ 增加唯一索引 ux_undo_log
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;

安装 事务协调器:seata-server-1.3.0.zip
https://github.com/seata/seata/releases,下载服务器软件包,将其解压缩。

5、修改事务协调器的配置 registry.conf

1
2
3
4
5
6
7
8
9
10
11

1)把自己注册到nacos,并且设置nacos的url
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"


2)配置放在file【也可以在nacos上】
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"

七、使用消息队列实现最终一致性

1、延迟队列的定义与实现

  • 定义:

    延迟队列存储的对象肯定是对应的延时消息,所谓”延时消息” 是指消息被发送以后,并不想让消费者立即拿到消息,而是等待指定时间后,消费者才拿到这个进行消费。

  • 实现:

    RabbitMQ 可以通过设置队列的 TTL 和 死信路由实现延迟队列

    • TTL:

    RabbitMQ 可以正确 Queue 设置 x-expires 或者 针对 Message 设置 x-message-ttl, 来控制消息的生存时间。如果超时(两者同时设置以最先到期的时间为准),则消息变为 dead letter(死信)

    • 死信路由 DLX

    RabbitMQ 的 Queue 可以配置 x-dead-letter-exchange 和 x-dead-letter-router-key(可选)两个参数,如果队列内出现了 dead letter, 则按照这两个参数重新路由转发到指定的队列。

    • x-dead-letter-exchange:出现 dead letter 之后将 dead letter 重新发送到指定 exchange
    • x-dead-letter-routing-key: 出现 dead letter 之后将 dead letter 重新按照指定的 routing-key 发送

针对订单创建以上消息队列,创建订单时消息会被发送至队列 order.delay.queue , 经过 TTL 的时间后消息会变成死信以 order.release.order 的路由键经交换机转发至队列 order.release.order.queue ,在通过监听该队列的消息来实现过期订单的处理。

2、延迟队列使用场景

为什么不能用定时任务完成?

如果恰好在一次扫描后完成业务逻辑,那么就会等待两个扫描周期才能扫到过期的订单,不能保证时效性

3、 定时关单与库存解锁主体逻辑

  • 订单超时未支付触发订单过期状态修改与库存解锁

创建订单时消息会被发送至队列 order.delay.queue , 经过 TTL 的时间后消息会变成死信以 order.release.order 的路由键交换机转发至队列 order.release.order.queue, 在通过监听该队列的消息来实现过期订单的处理

  • 如果该订单已支付,则无需处理
  • 否则说明该订单已过期,修改该订单的状态并通过并通过路由键 order.release.other 发送消息至队列 stock.release.stock.queue 进行库存解锁
  • 库存锁定后延迟检查是否需要解锁库存

在库存锁定后通过 路由键 stock.locked 发送至 延迟队列stock.delay.queue , 延迟时间到,死信通过 路由键stock.release 转发至 stock.release.stock.queue, 通过监听该队列进行判断当前订单状态,来确认库存是否需要解锁

  • 由于 关闭订单库存解锁 都有可能被执行多次,因此要保证业务逻辑的幂等性,在执行业务是重新查询当前的状态进行判断
  • 订单关闭和库存解锁都会进行库存解锁的操作,来确保业务异常或者订单过期时库存会被可靠解锁

4、创建业务交换机和队列

订单模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
@Configuration
public class MyRabbitmqConfig {
@Bean
public Exchange orderEventExchange() {
/**
* String name,
* boolean durable,
* boolean autoDelete,
* Map<String, Object> arguments
*/
return new TopicExchange("order-event-exchange", true, false);
}

/**
* 延迟队列
* @return
*/
@Bean
public Queue orderDelayQueue() {
/**
Queue(String name, 队列名字
boolean durable, 是否持久化
boolean exclusive, 是否排他
boolean autoDelete, 是否自动删除
Map<String, Object> arguments) 属性
*/
HashMap<String, Object> arguments = new HashMap<>();
//死信交换机
arguments.put("x-dead-letter-exchange", "order-event-exchange");
//死信路由键
arguments.put("x-dead-letter-routing-key", "order.release.order");
arguments.put("x-message-ttl", 60000); // 消息过期时间 1分钟
return new Queue("order.delay.queue",true,false,false,arguments);
}

/**
* 普通队列
*
* @return
*/
@Bean
public Queue orderReleaseQueue() {

Queue queue = new Queue("order.release.order.queue", true, false, false);

return queue;
}

/**
* 创建订单的binding
* @return
*/
@Bean
public Binding orderCreateBinding() {
/**
* String destination, 目的地(队列名或者交换机名字)
* DestinationType destinationType, 目的地类型(Queue、Exhcange)
* String exchange,
* String routingKey,
* Map<String, Object> arguments
* */
return new Binding("order.delay.queue", Binding.DestinationType.QUEUE, "order-event-exchange", "order.create.order", null);
}

@Bean
public Binding orderReleaseBinding() {
return new Binding("order.release.order.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.order",
null);
}

@Bean
public Binding orderReleaseOrderBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"order-event-exchange",
"order.release.other.#",
null);
}
}

库存模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
@Configuration
public class MyRabbitmqConfig {

@Bean
public Exchange stockEventExchange() {
return new TopicExchange("stock-event-exchange", true, false);
}

/**
* 延迟队列
* @return
*/
@Bean
public Queue stockDelayQueue() {
HashMap<String, Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange", "stock-event-exchange");
arguments.put("x-dead-letter-routing-key", "stock.release");
// 消息过期时间 2分钟
arguments.put("x-message-ttl", 120000);
return new Queue("stock.delay.queue", true, false, false, arguments);
}

/**
* 普通队列,用于解锁库存
* @return
*/
@Bean
public Queue stockReleaseStockQueue() {
return new Queue("stock.release.stock.queue", true, false, false, null);
}


/**
* 交换机和延迟队列绑定
* @return
*/
@Bean
public Binding stockLockedBinding() {
return new Binding("stock.delay.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.locked",
null);
}

/**
* 交换机和普通队列绑定
* @return
*/
@Bean
public Binding stockReleaseBinding() {
return new Binding("stock.release.stock.queue",
Binding.DestinationType.QUEUE,
"stock-event-exchange",
"stock.release.#",
null);
}
}

5、库存自动解锁

5.1 库存锁定

在库存锁定是添加一下逻辑

  • 由于可能订单回滚的情况,所以为了能够得到库存锁定的信息,在锁定需要记录库存工作单,其中包括订单信息和锁定库存时的信息(仓库 id, 商品 id, 锁了几件….)
  • 在锁定成功后,向延迟对队列发消息,带上库存锁定的相关信息
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
@Transactional
@Override
public Boolean orderLockStock(WareSkuLockVo wareSkuLockVo) {
//因为可能出现订单回滚后,库存锁定不回滚的情况,但订单已经回滚,得不到库存锁定信息,因此要有库存工作单
WareOrderTaskEntity taskEntity = new WareOrderTaskEntity();
taskEntity.setOrderSn(wareSkuLockVo.getOrderSn());
taskEntity.setCreateTime(new Date());
wareOrderTaskService.save(taskEntity);

List<OrderItemVo> itemVos = wareSkuLockVo.getLocks();
List<SkuLockVo> lockVos = itemVos.stream().map((item) -> {
SkuLockVo skuLockVo = new SkuLockVo();
skuLockVo.setSkuId(item.getSkuId());
skuLockVo.setNum(item.getCount());
List<Long> wareIds = baseMapper.listWareIdsHasStock(item.getSkuId(), item.getCount());
skuLockVo.setWareIds(wareIds);
return skuLockVo;
}).collect(Collectors.toList());

for (SkuLockVo lockVo : lockVos) {
boolean lock = true;
Long skuId = lockVo.getSkuId();
List<Long> wareIds = lockVo.getWareIds();
if (wareIds == null || wareIds.size() == 0) {
throw new NoStockException(skuId);
}else {
for (Long wareId : wareIds) {
Long count=baseMapper.lockWareSku(skuId, lockVo.getNum(), wareId);
if (count==0){
lock=false;
}else {
//锁定成功,保存工作单详情
WareOrderTaskDetailEntity detailEntity = WareOrderTaskDetailEntity.builder()
.skuId(skuId)
.skuName("")
.skuNum(lockVo.getNum())
.taskId(taskEntity.getId())
.wareId(wareId)
.lockStatus(1).build();
wareOrderTaskDetailService.save(detailEntity);
//发送库存锁定消息至延迟队列
StockLockedTo lockedTo = new StockLockedTo();
lockedTo.setId(taskEntity.getId());
StockDetailTo detailTo = new StockDetailTo();
BeanUtils.copyProperties(detailEntity,detailTo);
lockedTo.setDetailTo(detailTo);
rabbitTemplate.convertAndSend("stock-event-exchange","stock.locked",lockedTo);

lock = true;
break;
}
}
}
if (!lock) throw new NoStockException(skuId);
}
return true;
}

5.2 监听队列

  • 延迟队列会将过期的消息路由至"stock.release.stock.queue",通过监听该队列实现库存的解锁
  • 为保证消息的可靠到达,我们使用手动确认消息的模式,在解锁成功后确认消息,若出现异常则重新归队
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Component
@RabbitListener(queues = {"stock.release.stock.queue"})
public class StockReleaseListener {

@Autowired
private WareSkuService wareSkuService;

@RabbitHandler
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
log.info("************************收到库存解锁的消息********************************");
try {
wareSkuService.unlock(stockLockedTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}

5.3 库存解锁

  • 如果工作单详情不为空,说明该库存锁定成功
    • 查询最新的订单状态,如果订单不存在,说明订单提交出现异常回滚,或者订单处于已取消的状态,我们都对已锁定的库存进行解锁
  • 如果工作单详情为空,说明库存未锁定,自然无需解锁
  • 为保证幂等性,我们分别对订单的状态和工作单的状态都进行了判断,只有当订单过期且工作单显示当前库存处于锁定的状态时,才进行库存的解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
@Override
public void unlock(StockLockedTo stockLockedTo) {
StockDetailTo detailTo = stockLockedTo.getDetailTo();
WareOrderTaskDetailEntity detailEntity = wareOrderTaskDetailService.getById(detailTo.getId());
//1.如果工作单详情不为空,说明该库存锁定成功
if (detailEntity != null) {
WareOrderTaskEntity taskEntity = wareOrderTaskService.getById(stockLockedTo.getId());
R r = orderFeignService.infoByOrderSn(taskEntity.getOrderSn());
if (r.getCode() == 0) {
OrderTo order = r.getData("order", new TypeReference<OrderTo>() {
});
//没有这个订单||订单状态已经取消 解锁库存
if (order == null||order.getStatus()== OrderStatusEnum.CANCLED.getCode()) {
//为保证幂等性,只有当工作单详情处于被锁定的情况下才进行解锁
if (detailEntity.getLockStatus()== WareTaskStatusEnum.Locked.getCode()){
unlockStock(detailTo.getSkuId(), detailTo.getSkuNum(), detailTo.getWareId(), detailEntity.getId());
}
}
}else {
throw new RuntimeException("远程调用订单服务失败");
}
}else {
//无需解锁
}
}

6、定时关单

6.1 提交订单

1
2
3
4
5
6
7
8
9
10
11
@Transactional
@Override
public SubmitOrderResponseVo submitOrder(OrderSubmitVo submitVo) {

//提交订单的业务处理。。。

//发送消息到订单延迟队列,判断过期订单
rabbitTemplate.convertAndSend("order-event-exchange","order.create.order",order.getOrder());


}

6.2 监听队列

创建订单的消息会进入延迟队列,最终发送至队列order.release.order.queue,因此我们对该队列进行监听,进行订单的关闭

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Component
@RabbitListener(queues = {"order.release.order.queue"})
public class OrderCloseListener {

@Autowired
private OrderService orderService;

@RabbitHandler
public void listener(OrderEntity orderEntity, Message message, Channel channel) throws IOException {
System.out.println("收到过期的订单信息,准备关闭订单" + orderEntity.getOrderSn());
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try {
orderService.closeOrder(orderEntity);
channel.basicAck(deliveryTag,false);
} catch (Exception e){
channel.basicReject(deliveryTag,true);
}

}
}

6.3 关闭订单

  • 由于要保证幂等性,因此要查询最新的订单状态判断是否需要关单
  • 关闭订单后也需要解锁库存,因此发送消息进行库存、会员服务对应的解锁
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@Override
public void closeOrder(OrderEntity orderEntity) {
//因为消息发送过来的订单已经是很久前的了,中间可能被改动,因此要查询最新的订单
OrderEntity newOrderEntity = this.getById(orderEntity.getId());
//如果订单还处于新创建的状态,说明超时未支付,进行关单
if (newOrderEntity.getStatus() == OrderStatusEnum.CREATE_NEW.getCode()) {
OrderEntity updateOrder = new OrderEntity();
updateOrder.setId(newOrderEntity.getId());
updateOrder.setStatus(OrderStatusEnum.CANCLED.getCode());
this.updateById(updateOrder);

//关单后发送消息通知其他服务进行关单相关的操作,如解锁库存
OrderTo orderTo = new OrderTo();
BeanUtils.copyProperties(newOrderEntity,orderTo);
rabbitTemplate.convertAndSend("order-event-exchange", "order.release.other",orderTo);
}
}

6.4 解锁库存

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
@Slf4j
@Component
@RabbitListener(queues = {"stock.release.stock.queue"})
public class StockReleaseListener {

@Autowired
private WareSkuService wareSkuService;

@RabbitHandler
public void handleStockLockedRelease(StockLockedTo stockLockedTo, Message message, Channel channel) throws IOException {
log.info("************************收到库存解锁的消息********************************");
try {
wareSkuService.unlock(stockLockedTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}

@RabbitHandler
public void handleStockLockedRelease(OrderTo orderTo, Message message, Channel channel) throws IOException {
log.info("************************从订单模块收到库存解锁的消息********************************");
try {
wareSkuService.unlock(orderTo);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
}
}
}
1
2
3
4
5
6
7
8
9
10
11
@Override
public void unlock(OrderTo orderTo) {
//为防止重复解锁,需要重新查询工作单
String orderSn = orderTo.getOrderSn();
WareOrderTaskEntity taskEntity = wareOrderTaskService.getBaseMapper().selectOne((new QueryWrapper<WareOrderTaskEntity>().eq("order_sn", orderSn)));
//查询出当前订单相关的且处于锁定状态的工作单详情
List<WareOrderTaskDetailEntity> lockDetails = wareOrderTaskDetailService.list(new QueryWrapper<WareOrderTaskDetailEntity>().eq("task_id", taskEntity.getId()).eq("lock_status", WareTaskStatusEnum.Locked.getCode()));
for (WareOrderTaskDetailEntity lockDetail : lockDetails) {
unlockStock(lockDetail.getSkuId(),lockDetail.getSkuNum(),lockDetail.getWareId(),lockDetail.getId());
}
}

八、支付

1、支付宝加密原理

  • 支付宝加密采用 RSA 非对称加密,分别在客户端和支付端有两个公钥和私钥
  • 在发送订单数据时,直接使用明文但会使用 商户私钥 加一个对应的签名,支付宝端会使用 商户公钥 对签名进行验签,只有数据明文和签名对应的时候才能说明传输正确。
  • 支付宝成功后,支付宝发送支付成功数据之外,还会使用 支付宝私钥 加一个对应的签名,商户端收到支付成功数据之后会使用 支付宝公钥 验签,成功后才能确认。

image-20220329233453861

2、配置支付宝沙箱环境

image-20220329234110995

3、环境搭建

导入支付宝 sdk

1
2
3
4
5
<dependency>
<groupId>com.alipay.sdk</groupId>
<artifactId>alipay-sdk-java</artifactId>
<version>4.9.28.ALL</version>
</dependency>

抽取支付工具类并进行配置

成功调用该接口后,返回的数据就是支付页面的 html,因此后续会使用@ResponseBody

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@ConfigurationProperties(prefix = "alipay")
@Component
@Data
public class AlipayTemplate {

//在支付宝创建的应用的id
private String app_id = "2016102600763190";

// 商户私钥,您的PKCS8格式RSA2私钥
private String merchant_private_key = "MjXN6Hnj8k2GAriRFt0BS9gjihbl9Rt38VMNbBi3Vt3Cy6TOwANLLJ/DfnYjRqwCG81fkyKlDqdsamdfCiTysCa0gQKBgQDYQ45LSRxAOTyM5NliBmtev0lbpDa7FqXL0UFgBel5VgA1Ysp0+6ex2n73NBHbaVPEXgNMnTdzU3WF9uHF4Gj0mfUzbVMbj/YkkHDOZHBggAjEHCB87IKowq/uAH/++Qes2GipHHCTJlG6yejdxhOsMZXdCRnidNx5yv9+2JI37QKBgQCw0xn7ZeRBIOXxW7xFJw1WecUV7yaL9OWqKRHat3lFtf1Qo/87cLl+KeObvQjjXuUe07UkrS05h6ijWyCFlBo2V7Cdb3qjq4atUwScKfTJONnrF+fwTX0L5QgyQeDX5a4yYp4pLmt6HKh34sI5S/RSWxDm7kpj+/MjCZgp6Xc51g==";

// 支付宝公钥,查看地址:https://openhome.alipay.com/platform/keyManage.htm 对应APPID下的支付宝公钥。
private String alipay_public_key = "MIIBIjA74UKxt2F8VMIRKrRAAAuIMuawIsl4Ye+G12LK8P1ZLYy7ZJpgZ+Wv5nOs3DdoEazgCERj/ON8lM1KBHZOAV+TkrIcyi7cD1gfv4a1usikrUqm8/qhFvoiUfyHJFv1ymT7C4BI6aHzQ2zcUlSQPGoPl4C11tgnSkm3DlH2JZKgaIMcCOnNH+qctjNh9yIV9zat2qUiXbxmrCTtxAmiI3I+eVsUNwvwIDAQAB";

// 服务器[异步通知]页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
// 支付宝会悄悄的给我们发送一个请求,告诉我们支付成功的信息
private String notify_url="http://**.natappfree.cc/payed/notify";

// 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
//同步通知,支付成功,一般跳转到成功页
private String return_url="http://order.gulimall.com/memberOrder.html";

// 签名方式
private String sign_type = "RSA2";

// 字符编码格式
private String charset = "utf-8";

// 支付宝网关; https://openapi.alipaydev.com/gateway.do
private String gatewayUrl = "https://openapi.alipaydev.com/gateway.do";

public String pay(PayVo vo) throws AlipayApiException {

//AlipayClient alipayClient = new DefaultAlipayClient(AlipayTemplate.gatewayUrl, AlipayTemplate.app_id, AlipayTemplate.merchant_private_key, "json", AlipayTemplate.charset, AlipayTemplate.alipay_public_key, AlipayTemplate.sign_type);
//1、根据支付宝的配置生成一个支付客户端
AlipayClient alipayClient = new DefaultAlipayClient(gatewayUrl,
app_id, merchant_private_key, "json",
charset, alipay_public_key, sign_type);

//2、创建一个支付请求 //设置请求参数
AlipayTradePagePayRequest alipayRequest = new AlipayTradePagePayRequest();
alipayRequest.setReturnUrl(return_url);
alipayRequest.setNotifyUrl(notify_url);

//商户订单号,商户网站订单系统中唯一订单号,必填
String out_trade_no = vo.getOut_trade_no();
//付款金额,必填
String total_amount = vo.getTotal_amount();
//订单名称,必填
String subject = vo.getSubject();
//商品描述,可空
String body = vo.getBody();

alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
+ "\"total_amount\":\""+ total_amount +"\","
+ "\"subject\":\""+ subject +"\","
+ "\"body\":\""+ body +"\","
+ "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");

String result = alipayClient.pageExecute(alipayRequest).getBody();

//会收到支付宝的响应,响应的是一个页面,只要浏览器显示这个页面,就会自动来到支付宝的收银台页面
System.out.println("支付宝的响应:"+result);

return result;

}

4、订单支付与同步通知

点击支付跳转到支付接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
@ResponseBody
@GetMapping(value = "aliPayOrder", produces = "text/html")
public String aliPayOrder(@RequestParam("orderSn") String orderSn) throws AlipayApiException {
System.out.println("接收到订单信息orderSn:" + orderSn);
// 获取当前订单并设置支付订单相关信息
PayVo payVo = orderService.getOrderPay(orderSn);
String pay = alipayTemplate.pay(payVo);
return pay;
}

@Override
public PayVo getOrderPay(String orderSn) {
OrderEntity orderEntity = this.getOne(new QueryWrapper<OrderEntity>().eq("order_sn", orderSn));
PayVo payVo = new PayVo();
//交易号
payVo.setOut_trade_no(orderSn);
//支付金额设置为两位小数,否则会报错
BigDecimal payAmount = orderEntity.getPayAmount().setScale(2, BigDecimal.ROUND_UP);
payVo.setTotal_amount(payAmount.toString());

List<OrderItemEntity> orderItemEntities = orderItemService.list(new QueryWrapper<OrderItemEntity>().eq("order_sn", orderSn));
OrderItemEntity orderItemEntity = orderItemEntities.get(0);
//订单名称
payVo.setSubject(orderItemEntity.getSkuName());
//商品描述
payVo.setBody(orderItemEntity.getSkuAttrsVals());
return payVo;
}

设置成功回调地址为订单详情页

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
	// 页面跳转同步通知页面路径 需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
//同步通知,支付成功,一般跳转到成功页
private String return_url="http://order.gulimall.com/memberOrder.html";

/**
* 获取当前用户的所有订单
* @return
*/
@GetMapping("/memberOrder.html")
public String memberOrderPage(@RequestParam(value = "pageNum",required = false,defaultValue = "0") Integer pageNum, Model model){
Map<String, Object> params = new HashMap<>();
params.put("page",pageNum.toString());
// 分页查询当前用户的所有订单及对应的订单项
R orderInfo = orderFeignService.listWithItem(params);
model.addAttribute("orders",orderInfo);
return "orderList";
}

注意:需要给 gulimall-member 项目基本配置

  1. spring-session 依赖
  2. spring-session 配置
  3. LoginInterceptor 拦截器

image-20220406232015966

5、异步通知

  • 订单支付成功后支付宝会回调商户接口,这个时候需要修改订单状态
  • 由于同步跳转可能由于网络问题失败,所以使用异步通知
  • 支付宝使用的是最大努力通知方案,保障数据一致性,隔一段时间会通知商户支付成功,直到返回success

5.1 内网穿透设置异步通知地址

  • 将外网映射到本地的order.gulimall.com:80

  • 由于回调的请求头不是order.gulimall.com,因此 nginx 转发到网关后找不到对应的服务,所以需要对 nginx 进行设置

/payed/notify异步通知转发至订单服务

设置异步通知的地址

1
2
3
// 服务器[异步通知]页面路径  需http://格式的完整路径,不能加?id=123这类自定义参数,必须外网可以正常访问
// 支付宝会悄悄的给我们发送一个请求,告诉我们支付成功的信息
private String notify_url="http://****.natappfree.cc/payed/notify";

5.2 支付包支付异步通知

异步通知的参数

1
2
3
4
5
6
7
8
9
10
@PostMapping("/payed/notify")
public String handlerAlipay(HttpServletRequest request) {
System.out.println("收到支付宝异步通知******************");
Map<String, String[]> parameterMap = request.getParameterMap();
for (String key : parameterMap.keySet()) {
String value = request.getParameter(key);
System.out.println("key:"+key+"===========>value:"+value);
}
return "success";
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
收到支付宝异步通知******************
key:gmt_create===========>value:2020-10-18 09:13:26
key:charset===========>value:utf-8
key:gmt_payment===========>value:2020-10-18 09:13:34
key:notify_time===========>value:2020-10-18 09:13:35
key:subject===========>value:华为
key:sign===========>value:aqhKWzgzTLE84Scy5d8i3f+t9f7t7IE5tK/s5iHf3SdFQXPnTt6MEVtbr15ZXmITEo015nCbSXaUFJvLiAhWpvkNEd6ysraa+2dMgotuHPIHnIUFwvdk+U4Ez+2A4DBTJgmwtc5Ay8mYLpHLNR9ASuEmkxxK2F3Ov6MO0d+1DOjw9c/CCRRBWR8NHSJePAy/UxMzULLtpMELQ1KUVHLgZC5yym5TYSuRmltYpLHOuoJhJw8vGkh2+4FngvjtS7SBhEhR1GvJCYm1iXRFTNgP9Fmflw+EjxrDafCIA+r69ZqoJJ2Sk1hb4cBsXgNrFXR2Uj4+rQ1Ec74bIjT98f1KpA==
key:buyer_id===========>value:2088622954825223
key:body===========>value:上市年份:2020;内存:64G
key:invoice_amount===========>value:6300.00
key:version===========>value:1.0
key:notify_id===========>value:2020101800222091334025220507700182
key:fund_bill_list===========>value:[{"amount":"6300.00","fundChannel":"ALIPAYACCOUNT"}]
key:notify_type===========>value:trade_status_sync
key:out_trade_no===========>value:12345523123
key:total_amount===========>value:6300.00
key:trade_status===========>value:TRADE_SUCCESS
key:trade_no===========>value:2020101822001425220501264292
key:auth_app_id===========>value:2016102600763190
key:receipt_amount===========>value:6300.00
key:point_amount===========>value:0.00
key:app_id===========>value:2016102600763190
key:buyer_pay_amount===========>value:6300.00
key:sign_type===========>value:RSA2
key:seller_id===========>value:2088102181115314

各参数详细意义见支付宝开放平台异步通知

验证签名

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@PostMapping("/payed/notify")
public String handlerAlipay(HttpServletRequest request, PayAsyncVo payAsyncVo) throws AlipayApiException {
System.out.println("收到支付宝异步通知******************");
// 只要收到支付宝的异步通知,返回 success 支付宝便不再通知
// 获取支付宝POST过来反馈信息
//TODO 需要验签
Map<String, String> params = new HashMap<>();
Map<String, String[]> requestParams = request.getParameterMap();
for (String name : requestParams.keySet()) {
String[] values = requestParams.get(name);
String valueStr = "";
for (int i = 0; i < values.length; i++) {
valueStr = (i == values.length - 1) ? valueStr + values[i]
: valueStr + values[i] + ",";
}
//乱码解决,这段代码在出现乱码时使用
// valueStr = new String(valueStr.getBytes("ISO-8859-1"), "utf-8");
params.put(name, valueStr);
}

boolean signVerified = AlipaySignature.rsaCheckV1(params, alipayTemplate.getAlipay_public_key(),
alipayTemplate.getCharset(), alipayTemplate.getSign_type()); //调用SDK验证签名

if (signVerified){
System.out.println("支付宝异步通知验签成功");
//修改订单状态
orderService.handlerPayResult(payAsyncVo);
return "success";
}else {
System.out.println("支付宝异步通知验签失败");
return "error";
}
}

修改订单状态与保存交易流水

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Transactional(rollbackFor = Exception.class)
@Override
public String handlePayResult(PayAsyncVo asyncVo) {

//保存交易流水信息
PaymentInfoEntity paymentInfo = new PaymentInfoEntity();
paymentInfo.setOrderSn(asyncVo.getOut_trade_no());
paymentInfo.setAlipayTradeNo(asyncVo.getTrade_no());
paymentInfo.setTotalAmount(new BigDecimal(asyncVo.getBuyer_pay_amount()));
paymentInfo.setSubject(asyncVo.getBody());
paymentInfo.setPaymentStatus(asyncVo.getTrade_status());
paymentInfo.setCreateTime(new Date());
paymentInfo.setCallbackTime(asyncVo.getNotify_time());
//添加到数据库中
this.paymentInfoService.save(paymentInfo);

//修改订单状态
//获取当前状态
String tradeStatus = asyncVo.getTrade_status();

if (tradeStatus.equals("TRADE_SUCCESS") || tradeStatus.equals("TRADE_FINISHED")) {
//支付成功状态
String orderSn = asyncVo.getOut_trade_no(); //获取订单号
this.updateOrderStatus(orderSn,OrderStatusEnum.PAYED.getCode(),PayConstant.ALIPAY);
}

return "success";
}

/**
* 修改订单状态
* @param orderSn
* @param code
*/
private void updateOrderStatus(String orderSn, Integer code,Integer payType) {
this.baseMapper.updateOrderStatus(orderSn,code,payType);
}

public class PayConstant {

public static final Integer ALIPAY = 1;

public static final Integer WXPAY = 2;

}

6、收单

由于可能出现订单已经过期后,库存已经解锁,但支付成功后再修改订单状态的情况,需要设置支付有效时间,只有在有效期内才能进行支付

1
2
3
4
5
6
7
alipayRequest.setBizContent("{\"out_trade_no\":\""+ out_trade_no +"\","
+ "\"total_amount\":\""+ total_amount +"\","
+ "\"subject\":\""+ subject +"\","
+ "\"body\":\""+ body +"\","
//设置过期时间为1m
+"\"timeout_express\":\"1m\","
+ "\"product_code\":\"FAST_INSTANT_TRADE_PAY\"}");

超时后订单显示

image-20220407002035239

image-20220407002006990