黑马点评:
登录相关
- 使用 JWT 实现无状态认证,登录成功后生成一个包含用户信息的 Token 返回给前端,由前端每次请求时携带在请求头中。后端通过拦截器统一拦截请求,解析并校验 Token 的合法性。
<用户信息只包括:userId,username,role等必要信息 => 安全>
-
校验通过后,我们会将解析出来的用户信息保存到
ThreadLocal中,构建一个线程级的用户上下文,在整个请求处理流程中都可以方便地获取当前登录用户,避免层层传参,提升代码整洁度和可维护性。 -
ThreadLocal原理:每个线程拥有独立的ThreadLocalMap对象。所有线程使用同一个
ThreadLocal对象作为键,但值互不影响。通过线程级别的变量隔离,避免多线程竞争,实现线程安全。
超卖问题:
需要先判断是否有库存,有则扣减库存并返回成功;没有则返回失败。这包括两步操作
1.如何解决卖超问题
--在sql加上判断防止数据变为负数 =乐观锁
--redis预减库存减少数据库访问 内存标记(过滤掉无库存的访问)减少redis访问 请求先入队列缓冲,异步下单,增强用户体验
异步下单:
-- 请求先入队缓冲,异步下单,增强用户体验
-- 请求出队,生成订单,减少库存
-- "客户端"定时轮询检查是否秒杀成功
- 数据库层面:
- 悲观锁(FOR UPDATE-行锁) ❌ 别提,浪费时间
- 乐观锁(版本号法 - SQL加上判断防止数据变成负数) ⭐⭐⭐
- 应用层面:
- 分布式锁(互斥) ❌ 别提,浪费时间
- 缓存+消息队列 ⭐⭐⭐
- 限流+熔断
❌❌❌1.1 行锁
1BEGIN;
2SELECT stock FROM products WHERE id = 1 FOR UPDATE;
3-- 业务逻辑:检查库存是否足够,执行扣减操作
4UPDATE products SET stock = stock - 1 WHERE id = 1;
5COMMIT;
10000条(10线程循环1000次)请求,吞吐量:791.2/sec 互斥锁,串行访问
1Integer stock = baseMapper.selectSecKillStockForUpdate(voucherId); // Fordate加行锁,事务结束会释放
2if (stock <= 0){
3 throw new BusinessException(500, "fail");
4}
5
6boolean update = lambdaUpdate().set(SeckillVoucher::getStock, stock - 1)
7 .eq(SeckillVoucher::getVoucherId, voucherId)
8 .update();
多事务的并发逻辑
1-- 事务1(加X锁)
2BEGIN;
3SELECT * FROM accounts WHERE id = 1 FOR UPDATE; -- 持有X锁
4
5-- 事务2(普通SELECT)
6BEGIN;
7SELECT * FROM accounts WHERE id = 1; -- ✅ 正常读取(MVCC快照)
8 -- ❌ 如果事务2也执行 FOR UPDATE 则被阻塞
9
10-- 事务3(尝试修改)
11UPDATE accounts SET balance = 100 WHERE id = 1; -- ❌ 被阻塞(X锁互斥)
⭐⭐⭐1.2 CAS方法
SQL加上防止为负数
1-- (X锁互斥)
2UPDATE products
3SET stock = stock - 1
4WHERE id = 1 AND stock > 0;
10000条(10线程循环1000次)请求,吞吐量:944.7/sec
1// 注意把Stock不满足条件的过滤!
2
3boolean update = lambdaUpdate()
4 .setSql("stock = stock - 1") // 这个很关键 不要使用 set(::getStock, stock-1) 错误原因:基于应用层计算的值,而不是数据库的当前值。
5 .eq(SeckillVoucher::getVoucherId, voucherId)
6 .gt(SeckillVoucher::getStock, 0) // eq(::getStock, stock)
7 .update();
⭐⭐⭐ MVCC(版本链+ReadView) - 读操作而言
- 事务 B 的更新操作:
- 更新操作会直接操作数据库的当前值,而不是基于事务 B 的 ReadView。
- 事务 B 的更新操作会检查数据库的当前值,发现
stock = 0,因此更新失败。
🎉🎉🎉讲库存加载到Redis中,在Redis中进行扣减库存
1// seckill_key = "seckill:product:1"
2// =Lua脚本(判断+扣减)=> 再处理MySQL // 原子化查询+修改
然后,也不需要扣减MySQL中的库存,这样减少锁竞争,修改为直接插入订单(根据订单和Redis数据,进行核对)。
为了进一步加速响应,引入MQ,解耦下单的库存校验和写入MySQL操作。
2.3 🏷️缓存+消息队列 (服务解耦):
1// 库存初始化:将MySQL商品库存存入Redis中,提升查询和扣减效率;
2
3// 库存预扣减(Redis):用户请求时,先在 Redis 中扣减库存; == Lua脚本
4Long ok = redisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), "1");
5rabbitTemplate.convertAndSend(exchangerName, routingKey, message);
6
7// 订单异步处理(MQ):扣减成功 => MQ => 订单处理接口 => 插入新订单。
10000条(10线程循环1000次)请求,吞吐量:1485.7/sec
🏷️限流+熔断: (仅理论)
1// 1. Nginx限流(最基础,拦截恶意流量) - 只针对IP
2http {
3 limit_req_zone $binary_remote_addr zone=mylimit:10m rate 100r/s;
4 server {
5 location /seckill {
6 limit_req zone=mylimit burst=50 nodelay;
7 proxy_pass http://backend;
8 }
9 }
10}
11
12// 2. Redis令牌桶(应用层用户级限流,精准控制)
13// def try_acquire_token(user_id, rate=5, capacity=10):
14bucket_key = f"token_bucket:{user_id}"
15last_time_key = f"last_time:{user_id}"
16
17// 下面需要为lua脚本 ====
18now = time.time()
19last_time = float(r.get(last_time_key) or now)
20elapsed = now - last_time
21
22// 计算补充的令牌
23tokens_to_add = int(elapsed * rate)
24current_tokens = min(capacity, int(r.get(bucket_key) or capacity) + tokens_to_add)
25
26if current_tokens > 0:
27 r.set(bucket_key, current_tokens - 1) # 消耗 1 个令牌
28 r.set(last_time_key, now) # 更新时间
29 return True
30return False
31
32// ====
33
34
35// 3. 前端排队(减少无效请求,避免一窝蜂涌入)
一人一单问题:
(首先不能重复下单,还要扣减库存,并且新增订单)
- **单体架构 **
- synchronized加锁 + 插入时的唯一索引
- 集群架构
- *** Redis-Lua脚本原子化查询库存+重复下单***
1.1 🏷️Redis预扣减+检查: 使用set结构存储购买过的用户,避免重复下单
1-- 检查库存
2-- 检查重复下单
3if (redis.call('exists', orderKey) == 0) then
4 redis.call('sadd', orderKey, '') -- !!!初始化需要从MySQL中加载已经买过的用户
5 return 2
6end
7if (redis.call('sismember', orderKey, userId) == 1) then
8 return 2
9end
10
11redis.call('sadd', orderKey, userId)
12-- 在Redis中,使用Set集合存储买过的用户
1.2 🏷️JVM锁+MySQL唯一索引实现:
唯一索引减少查询是否重复下单的时间,直接融入到插入操作成功or失败中
1ALTER TABLE orders ADD UNIQUE INDEX idx_uid_item (user_id, item_id);
实际场景中,我们需要的是 “同一用户对同一商品只能购买一次”,而非单纯限制用户或商品ID的唯一性。 唯一索引为商品ID+用户ID
1@Transactional
2public Result createOrder(Long userId, Long itemId) {
3 boolean stockReduced = false; // 标记是否已扣减库存
4 try {
5 // 1. 检查库存(加锁)
6 Item item = itemMapper.selectForUpdate(itemId);
7 if (item.getStock() <= 0) {
8 return Result.fail("库存不足");
9 }
10
11 // 2. 扣减库存(捕获可能异常)
12 int affectedRows = itemMapper.reduceStock(itemId);
13 if (affectedRows == 0) { // 扣减失败(如库存不足)
14 throw new BusinessException("库存不足");
15 }
16 stockReduced = true; // 标记已扣减
17
18 // 3. 创建订单
19 orderMapper.insert(new Order(userId, itemId));
20 return Result.success();
21
22 } catch (DuplicateKeyException e) {
23 // 仅当库存已扣减时才回滚
24 if (stockReduced) {
25 itemMapper.recoverStock(itemId);
26 }
27 throw new BusinessException("请勿重复购买");
28
29 } catch (Exception e) {
30 // 仅当库存已扣减时才回滚
31 if (stockReduced) {
32 itemMapper.recoverStock(itemId);
33 }
34 throw new BusinessException("下单失败,请重试");
35 }
36}
2.1 🏷️分布式锁现实
解决单机JVM锁,在集群下失效的问题。
1-- 上锁,分布式的
2不存在:创建并返回True
业务 => 扣库存 + 创建订单
1-- 解锁,分布式,只能解锁自己上的锁
2// 如果可以释放别人的锁,会出现以下情况
3// 事务A的锁过期释放了(这里没有续时间or暂时卡了) => 事务B加锁 => 事务A活了过来*解锁* => 事务C加锁 ...
4事务B和C拿了同一把锁,寄了 => 只允许删自己加的锁
5事务A和B拿了同一把锁,寄了 => 看门狗续时间
lua脚本
1Boolean lock = redisTemplate.opsForValue().setIfAbsent(lock_key, lock_value, 3, TimeUnit.SECONDS);
2
3try {
4 if (lock) {
5 ...
6 }
7} finally {
8 redisTemplate.execute(SECKILL_UNLOCK_SCRIPT, Collections.singletonList(lock_key), lock_value); // 删除锁和判断锁是否当前线程拥有 原子化
9}
Redission
1RLock lock = redissonClient.getLock(lock_key);
2try {
3 boolean ok = lock.tryLock(1, 3, TimeUnit.SECONDS); // 不阻塞
4 if(ok) {...}
5 finally {
6 if (lock.isHeldByCurrentThread()) lock.unlock();
7 }
8}
9
10try {
11 boolean ok = lock.lock(); // 阻塞,且未指定过期时间 => 触发看门狗
12 if(ok) {...}
13 finally {
14 if (lock.isHeldByCurrentThread()) lock.unlock();
15 }
16}
超卖问题:
-
一开始我们是使用synchronzied在扣减库存的代码块中加锁,进行互斥,讲查询MySQL库存和扣减库存原子化。但是这样并发的性能不高。然后我们调整为CAS乐观锁的形式,我们不加锁,修改sql语句,给where条件中添加上库存大于0的条件,当库存不足时,update操作(会加行锁)就执行失败,表示扣减失败
-
之后,为了提高并发效率,我们使用redis,预扣减库存提高秒杀商品的并发,我们先讲秒杀商品的库存缓存到redis中,在redis中进行 库存的检查和扣减(原子化),因为是基于内存的,效率会比mysql快的多。
一人一单问题:
- 在redis预扣减缓存中进行了修改,我们使用redis中的set集合,存储购买过的用户Id,判断用户是否重复购买。 因为redis是单线程的,因此我们使用lua脚本,讲这一个操作,打包成一个脚本文件,给redis执行,保证操作的原子性。
- 然后我们发现当库存不足的时候,用户会一直点击下单,给redis带来了不必要的查询,因此我们在服务中引入一个布尔变量,标识是否有库存,当没有时,直接返回,降低redis的请求量。
- 另一种不使用redis的解决方法:
后面我们在单体系统下遇到了一人多单超卖问题,我们通过乐观锁(update sql添加库存不能为负)解决;我们对业务进行了变更,将一人多单变成了一人一单,结果在高并发场景下同一用户发送相同请求仍然出现了超卖问题,我们通过悲观锁解决了;由于用户量的激增,我们将单体系统升级成了集群,结果由于锁只能在一个JVM中可见导致又出现了,在高并发场景下同一用户发送下单请求出现超卖问题,我们通过实现分布式锁成功解决集群下的超卖问题;释放锁时,判断锁是否是当前线程 和 删除锁两个操作不是原子性的,可能导致超卖问题,我们通过将两个操作封装到一个Lua脚本成功解决了;
为了解决锁的不可重入性,我们通过将锁以hash结构的形式存储,锁的值为线程ID拼接锁重入次数,每次释放锁都value-1,获取锁value+1,从而实现锁的可重入性,并且将释放锁和获取锁的操作封装到Lua脚本中以确保原子性。
最最后,我们发现可以直接使用现有比较成熟的方案Redisson来解决上诉出现的所有问题🤣,什么不可重试、不可重入、超市释放、原子性等问题Redisson都提供相对应的解决方法(。^▽^)
优惠卷秒杀
异步下单:
1// 主线程
21. Lua脚本
32. 成功=>放入队列;失败=>返回异常
4
51.1. Lua脚本 => 库存 =扣库存=> 是否下单 =记录用户下单 => 返回0
6 返回1 返回2
7
8// 子线程
9队列中取出订单信息 => 插入订单 => 是否付款 => 更新成功
10 => End
缓存三兄弟:
💡缓存穿透: 恶意请求查询不存在的数据,绕过缓存直接访问数据库

- 布隆过滤器
- 商品ID,通过多个哈希函数,将元素映射到bitMap中的多个位置,只有每个位置都为1才表明这个对象存在。
- guava库有实现好的,直接用。
1# 伪代码,仅供学习
2class BloomFilter:
3 def __init__(self, size, hash_count):
4 """
5 初始化布隆过滤器
6 :param size: 位数组大小(m)
7 :param hash_count: 哈希函数数量(k)
8 """
9 self.size = size
10 self.hash_count = hash_count
11 self.bit_array = [0] * size # 初始化位数组(全0)
12
13 def add(self, item):
14 """
15 添加元素到布隆过滤器
16 """
17 for seed in range(self.hash_count):
18 # 通过不同的种子模拟多个哈希函数
19 index = self._hash(item, seed) % self.size
20 self.bit_array[index] = 1 # 设置对应位为1
21
22 def contains(self, item):
23 """
24 检查元素是否可能存在
25 :return:
26 - True: 可能存在(可能有误判)
27 - False: 一定不存在
28 """
29 for seed in range(self.hash_count):
30 index = self._hash(item, seed) % self.size
31 if self.bit_array[index] == 0:
32 return False # 只要有一位为0,则一定不存在
33 return True # 所有位均为1,可能存在
34
35 def _hash(self, item, seed):
36 """
37 哈希函数(伪代码,实际可用MurmurHash等)
38 :param item: 输入元素
39 :param seed: 哈希种子(模拟不同哈希函数)
40 :return: 哈希值
41 """
42 hash_value = 0
43 for char in str(item):
44 hash_value = (hash_value * seed + ord(char)) & 0xFFFFFFFF
45 return hash_value
- 缓存空对象
🏷️ 缓存空对象实现方案

1// 缓存命中
2if (StrUtil.isNotBlank(s)) { // 不为null,不为""
3 SeckillVoucher seckillVoucher = JSONUtil.toBean(s, SeckillVoucher.class);
4 return Result.ok(seckillVoucher);
5}
6
7// 未命中,是否是之前缓存的空对象
8if (Objects.nonNull(s)) { // "" 不为null
9 throw new BusinessException(500, "查询缓存空对象!");
10}
11
12SeckillVoucher seckillVoucher = lambdaQuery()
13 .eq(SeckillVoucher::getVoucherId, id)
14 .one();
15// 缓存空对象 一个线程重建缓存就好
16synchronized (CacheTestService.class) {
17 String t = redisTemplate.opsForValue().get(key);
18 if (Objects.nonNull(t))
19 throw new BusinessException(500, "查询缓存空对象!");
20
21 if (Objects.isNull(seckillVoucher)) {
22 redisTemplate.opsForValue().set(key, "", 3, TimeUnit.SECONDS);
23 throw new BusinessException(500, "缓存空对象!");
24 }
25}
26// 缓存重建
27redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(seckillVoucher), 3, TimeUnit.SECONDS);
28
29return Result.ok(seckillVoucher);
💡缓存雪崩:量缓存同时失效,导致请求全部压到数据库

- 过期时间随机化 ⭐⭐⭐ baseline + random_offset
- Hot数据定时刷新缓存 @Scheduled
- 多级缓存(本地-Redis-数据库)
💡缓存击穿:热点 Key 突然失效,大量并发请求直接打到数据库
- 互斥重建:第一个重建(MySQL=>Redis),后面的等一下再访问缓存
- 逻辑过期:返回过期数据,但是互斥(第一个访问的)重建缓存 ❗数据一致性不高

1// 缓存击穿(热点Key,瞬时重建问题)
2public Shop queryWithMutex(Long id) {
3 // 1. 先Redis
4 String key = "cache:shop:" + id;
5 String shopJson = stringRedisTemplate.opsForValue().get(key);
6 // Cache命中 shopJson 不为null,"", "\t"(不全是空白字符)
7 if (StrUtil.isNotBlank(shopJson)) {
8 Shop shop = JSONUtil.toBean(shopJson, Shop.class);
9 return shop;
10 }
11 // 空对象
12 if (shopJson != null) { // 为""
13 return null;
14 }
15
16 // 1. 实现缓存重建
17 String lockKey = "lock:shop:" + id;
18 Shop shop = null;
19 try {
20 // 1.1 获取互斥锁
21 boolean isLock = tryLock(lockKey);
22 // 1.2 判断是否获取成功
23 if (!isLock) {
24 // 1.3 失败则休眠且轮询
25 Thread.sleep(50);
26 return queryWithMutex(id); ⭐⭐⭐
27 }
28 // 未命中 => MySQL 缓存重建
29 shop = lambdaQuery().eq(Shop::getId, id).one();
30 Thread.sleep(500); // 模拟复杂的重建任务
31 if (shop == null) {
32 // 将空对象缓存
33 stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
34 return null;
35 }
36 // log.debug(shop.toString());
37 stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
38 } catch (InterruptedException e) {
39 e.printStackTrace();
40 } finally {
41 unlock(lockKey);
42 }
43 return shop;
44}
45
46public Shop queryWithLogicExpire(Long id) {
47 // 1. 先Redis
48 String key = "cache:shop:" + id;
49 // log.debug("Key:" + key);
50 String shopJson = stringRedisTemplate.opsForValue().get(key);
51
52 if (StrUtil.isBlank(shopJson)) {
53 // 缓存不存在,查询数据库并回填
54 return saveShop2Redis(id, 20L);
55 }
56 // RedisData: private LocalDateTime expireTime;
57 // private Object data;
58 RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
59 Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
60
61 if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {
62 // 未过期
63 return shop;
64 }
65 String lockKey = "lock:shop:" + id;
66 // 过期进行更新
67 boolean isLock = tryLock(lockKey);
68 if (isLock) {
69 // 独立线程进行重建缓存
70 CACHE_REBUILD_EXECUTOR.submit(() -> {
71 try {
72 saveShop2Redis(id, 20L);
73 Thread.sleep(500);
74 } catch (Exception e) {
75 e.printStackTrace();
76 } finally {
77 unlock(lockKey);
78 }
79 });
80 }
81 return shop; // 可能会返回过期数据。 因为没有进行等待
82}
Redis&MySQL数据一致性
- 先更新Redis,再更新MySQL ❌不推荐
- 先更新MySQL,再更新Redis ❌不推荐
- 先删除Redis,再更新MySQL,最后写回Redis ❌不推荐
- 先更新MySQL,再删除Redis,等请求重新缓存(惰性) ✔️推荐
- 缓存双删除策略。更新MySQL之前,删除一次Redis;更新完MySQL后,再进行一次延迟删除 ✔️推荐
数据库没问题,但是缓存有问题,等待一段实践
- 使用Binlog异步更新缓存,监听数据库的binlog变化,通过异步方式更新Redis缓存 ✔️推荐
WebSocket
1@Component
2@ServerEndpoint("/websocket")
3public class MyWebSocketEndpoint {
4
5 // 会话对象 server:client == 1:n
6 private static Map<String, Session> sessionMap = new HashMap<>();
7
8
9 @OnOpen
10 public void onOpen(Session session) {
11 System.out.println("onOpen: " + session.getId());
12 sessionMap.put(session.getId(), session);
13 sendMsg("Hi, can I help you?", session.getId());
14 }
15
16 @OnMessage
17 public void onMessage(String message, Session session) {
18 System.out.println("receive Message: " + message);
19 sendMsg("Yse! I am thinking ... this question!", session.getId());
20 }
21
22 @OnClose
23 public void OnClose(Session session) {
24 sendMsg("Bye!", session.getId());
25 System.out.println("OnClose: " + session.getId());
26 sessionMap.remove(session.getId());
27 }
28
29 public void sendMsg(String msg, String sid) {
30 try {
31 sessionMap.get(sid).getBasicRemote().sendText(msg); // 向session对象sendTest
32 } catch (IOException e) {
33 e.printStackTrace();
34 }
35 }
36
37}
前端:
1var ws = new WebSocket("ws://localhost:8080/chat");
2// 初始化连接
3ws.onopen = function (e) {
4
5}
6//接受消息
7ws.onmessage = function (ev) {
8
9}
10// 关闭连接
11ws.onclose = function (ev) {
12
13}
亮点模块:
RabbitMQ
-
异步处理
-
削峰填谷(流量削峰)
=> 削峰(缓冲流量):流量高峰时,MQ 充当“缓冲区”,将请求先存储到消息队列中,避免系统直接崩溃。
填谷(平滑流量):消费者按稳定速率消费消息,保证系统负载均衡。
- 消息可靠投递
- 发布确认机制 - 是否重发
- RabbitMQ持久化
- 消息可靠消费
- 手动ACK
- 限流机制 - 一次只取一个消息
- 死信队列(❗兜底)
- 消息唯一消费
- Redis存储消费过的ID (多个消费者接收到相同的消息时候) - 发布/订阅(Fanout 交换机-广播),消息会被复制到多个队列。
- ⭐RabbitMQ 中,默认情况下,消息只会被一个消费者消费,队列是点对点模型,一条消息只能被消费一次。多个消费者订阅同一个队列时,RabbitMQ 采用 “轮询分发”(Round Robin),每条消息只会被其中一个消费者处理。
- 消息可靠投递
1 RabbitMQ
2publisher => [exchanger => queue] => consumer
3error 1. 4. 2. 5. 3.
1.2. 投递失败 => 重新投递
4.5. MQ宕机 => 持久化
3.. ACK确认 => 重新消费
1rebbitmq:
2 # ...
3 publisher-confirm-type: correlated # 开启发布确认机制
4 publisher-returns: true # 投递消息抵达队列
5 listener:
6 simple:
7 acknowledge-mode: manual # 消费者手动确认ACK
8 prefetch: 1 # 限制消费者一次处理一条数据
生产者:
1// 确保交换机收到消息
2rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
3 if (ack) {
4 log.debug("[√] Product: Msg send success!");
5 }else {
6 log.error("[×] Product: Msg send fail!, 原因: {}", cause.toUpperCase());
7 msgRetryCount.putIfAbsent(correlationData.getId(), 3);
8 retrySendMsg(correlationData);
9 }
10}));
11
12// 确保队列收到消息
13rabbitTemplate.setReturnsCallback(returnedMessage -> {
14 log.error("[×] 消息路由失败!, 原因: {}, Msg: {}", returnedMessage.getReplyText(), returnedMessage.getMessage());
15 // 这里Roting Key错误,配置错误,投递死信队列比较好
16});
17}
18
19private void retrySendMsg(CorrelationData correlationData) {
20 Integer count = msgRetryCount.get(correlationData.getId());
21 if (count > 0) {
22 String message = msgCacheMap.get(correlationData.getId());
23 rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME, ReliabilityMQConfig.ROUTING_KEY, message, correlationData);
24 msgRetryCount.computeIfPresent(correlationData.getId(), (s, integer) -> integer-1);
25 }else {
26 log.error("发送到死信交换机!");
27 }
28}
29
30// ⭐⭐⭐模拟错误情况 1. 2.
31public void sendMessage(String message) {
32 log.debug("[>] Send Msg: " + message);
33 for (int i = 1; i <= 3; i++) {
34 CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
35 msgCacheMap.put(correlationData.getId(), message + ":" + i); // 缓存消息
36 if (i == 2) {
37 rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME + "error", ReliabilityMQConfig.ROUTING_KEY, message + ":" + i, correlationData);
38 }else if (i == 3) {
39 rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME, ReliabilityMQConfig.ROUTING_KEY + "error", message + ":" + i, correlationData);
40 }else {
41 rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME, ReliabilityMQConfig.ROUTING_KEY, message, correlationData);
42 }
43 }
44}
消费者:
1@RabbitListener(queues = ReliabilityMQConfig.QUEUE_NAME)
2public void receiveMessage(String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws InterruptedException {
3 try {
4 log.debug("[×] 开始处理消息: " + message);
5 Thread.sleep(2000);
6 double p = RandomUtil.randomDouble(0, 1);
7 if (p >= 0.5) {
8 throw new IOException("手动造成异常");
9 }
10 // 1. process success
11 channel.basicAck(deliveryTag, false); // deliveryTag:msgId; multiple=false:ackCurrentMsg else ackPreAllMsg
12 log.debug("[√] process Msg success: {}", message);
13 } catch (IOException e) {
14 // 2. process fail
15 log.debug("[×] process Msg fail: " + message);
16 try {
17 // 退回消息
18 channel.basicNack(deliveryTag, false, true); // msgId, 是否批量拒绝:false:仅拒绝当前Msg, true:拒绝所有deliveryTag小于等于当前的deliveryTag消息 ⭐⭐⭐ 消费失败时重新入队,确保消息不丢失。
19 } catch (IOException ioException) {
20 log.debug("[<] Msg notAck fail");
21 }
22 }
23}
- 微服务解耦
- 订单超时取消
拼团交易
抽象责任链模板
定义抽象任务接口
1public interface StrategyHandler<T, D, R> {
2
3 StrategyHandler DEFAULT = (T, D) -> null;
4
5 R apply(T requestParameter, D dynamicContext) throws Exception;
6}
7
8public interface StrategyMapper<T, D, R> {
9
10 StrategyHandler<T, D, R> get(T requestParameter, D dynamicContext) throws Exception;
11
12}
定义抽象节点接口
1public abstract class AbstractStrategyRouter<T, D, R> implements StrategyMapper<T, D, R>, StrategyHandler<T, D, R> {
2
3 protected StrategyHandler<T, D, R> defaultHandler = StrategyHandler.DEFAULT;
4
5 public R router(T requestParameter, D dynamicContext) throws Exception {
6 StrategyHandler<T, D, R> handler = get(requestParameter, dynamicContext);
7 if (handler != null) return handler.apply(requestParameter, dynamicContext);
8 return defaultHandler.apply(requestParameter, dynamicContext);
9 }
10
11}
大致就是 [apply + next],责任链模式。
1// 基本使用
2class XXX {
3 ObjectXXX nextNode;
4
5 apply{... => next()}
6
7 next{nextNode.apply()}
8}
多实例链路模式
双向链表的实现思想
定义链表:
1public class Node<E> {
2 E item;
3
4 Node<E> next;
5 Node<E> prev;
6
7 public Node(Node<E> prev, E item, Node<E> next) {
8 this.item = item;
9 this.next = next;
10 this.prev = prev;
11 }
12}
定义节点处理对象
1public interface ILogicHandler<T, D, R> {
2
3 // 子类可以不实现
4 default R next(T requestParameter, D dynamicContext) {
5 return null;
6 }
7
8 R apply(T requestParameter, D dynamicContext) throws Exception;
9
10}
实现具体的业务节点
1public class RuleLogic201 implements ILogicHandler<String, Rule02TradeRuleFactory.DynamicContext, String> {
2
3 public String apply(String requestParameter, Rule02TradeRuleFactory.DynamicContext dynamicContext) throws Exception{
4
5 // 具体业务逻辑;
6
7 return next(requestParameter, dynamicContext);
8 }
9
10}
装配业务责任链
1public class LinkArmory<T, D, R> {
2 private final BusinessLinkedList<T, D, R> logicLink;
3
4
5 @SafeVarargs
6 public LinkArmory(String linkName, ILogicHandler<T, D, R>... logicHandlers) {
7 logicLink = new BusinessLinkedList<>(linkName);
8 for (ILogicHandler<T, D, R> logicHandler: logicHandlers){
9 logicLink.add(logicHandler);
10 }
11 }
12 public BusinessLinkedList<T, D, R> getLogicLink() {
13 return logicLink;
14 }
15}
责任链实现业务管道
商品折扣价格试算
1// 从工厂拿到对应的责任链出发节点
2=> RootNode
3 => multiThread[前置处理|检查信息] => doApply[业务+路由]
4=> SwitchNode
5 => multiThread[前置处理] => doApply[业务+路由]
6=> TagNode
7 => doApply[业务+路由]
8=> MarketNode
9 => multiThread[⭐前置处理⭐拼单试算] => doApply[业务+路由]
10=> EndNode
11 => multiThread[前置处理] => doApply[业务|构建结果]
12
13// SwitchNode:根据条件判断业务是否降级or限流
14// TagNode:根据用户信息判断拼团活动能否参与与可见
15// MarketNode:根据拼团活动,计算折扣价格
16// 策略模式:根据优惠Id,计算不同的折扣
拼团锁单校验
1// 实现具体的业务节点
2
3// 1. ActivityUsabilityRuleFilter:校验活动状态和活动时间的有效性
4
5// 2. UserTakeLimitRuleFilter:判断用户参与次数与活动限制次数关系
6
7// 装配业务责任链
拼团结算校验
1// 1. SCRuleFilter: 校验拼团的渠道来源是否合法,是否在黑名单中
2// 2. SettableRuleFilter:拼团活动的用户支付时间是否在有效时间内
3// 3. OutTradeNoRuleFilter:外部交易单号是否正确
4// 4. EndRuleFilter:打包结果
AOP+消息订阅实现热更新
通过AOP编程和Redis发布/订阅功能实现业务规则(限流阈值、降级策略)的热更新
自定义注解
1@Retention(RetentionPolicy.RUNTIME)
2@Target(ElementType.FIELD)
3@Documented
4public @interface DCCValue {
5 String value() default "";
6}
定义热更新服务
1@Service
2public class DCCService {
3
4 /**
5 * 降级开关 0关闭 1开启
6 */
7 @DCCValue("downgradeSwitch:0")
8 private String downgradeSwitch;
9
10 @DCCValue("cutRange:100")
11 private String cutRange;
12
13 public boolean isCutRange(String userId) {
14 // 计算哈希码的绝对值
15 int hashCode = Math.abs(userId.hashCode());
16
17 // 获取最后两位
18 int lastTwoDigits = hashCode % 100;
19
20 // 判断是否在切量范围内
21 if (lastTwoDigits <= Integer.parseInt(cutRange)) {
22 log.error(cutRange);
23 return true;
24 }
25
26 return false;
27 }
28
29 public boolean isDowngradeSwitch() {
30 return this.downgradeSwitch.equals("1");
31 }
32}
⭐注意:在SwitchNode中,调用这些方法进行判断是否放行
基于Redis的发布订阅进行参数调整
1/ DCCValueBeanFactory implements BeanPostProcessor
2private static final String BASE_CONFIG_PATH = "group_buy_market_dcc_";
3
4private final RedissonClient redissonClient;
5
6private final Map<String, Object> dccObjGroup = new HashMap<>();
7
8public DCCValueBeanFactory(RedissonClient redissonClient) {
9 this.redissonClient = redissonClient;
10}
11
12@Bean("dccTopic")
13public RTopic dccRedisTopicListener(RedissonClient redissonClient) {
14 log.info("初始化Redis Topic监听事件 测试连通{}", redissonClient.getBucket("test").get());
15 // 发布订阅模式的Topic实现
16 RTopic topic = redissonClient.getTopic("group_buy_market_dcc");
17 // 消息类型,回调接口:Topic名称,接收的消息
18 topic.addListener(String.class, ((charSequence, s) -> {
19 log.info("监听到Redis Topic: {}", charSequence.toString());
20 String[] split = s.split(Constants.SPLIT);
21
22 // 获取值
23 String attribute = split[0];
24 String key = BASE_CONFIG_PATH + attribute;
25 String value = split[1];
26
27 // 设置值 => 可以存储任意类型的值(如 String、Integer、自定义对象等),Redisson 会自动进行序列化和反序列化。
28 RBucket<String> bucket = redissonClient.getBucket(key);
29 boolean exists = bucket.isExists();
30 if (!exists) return;
31 bucket.set(value);
32
33 Object objBean = dccObjGroup.get(key); // 获取内存中的对象
34 if (objBean == null) return;
35
36 Class<?> objBeanClass = objBean.getClass();
37 // 检查objBean是否为代理对象
38 if (AopUtils.isAopProxy(objBean)) {
39 objBeanClass = AopUtils.getTargetClass(objBean);
40 }
41
42 try {
43 Field field = objBeanClass.getDeclaredField(attribute);
44 field.setAccessible(true);
45 field.set(objBean, value);
46 field.setAccessible(false);
47
48 log.info("DCC 节点监听,动态设置值 {} {}", key,value);
49 } catch (Exception e) {
50 throw new RuntimeException(e);
51 }
52
53 }));
54 return topic;
55}
56
57
58// implements BeanPostProcessor 重写方法 -- 并在每个 bean 初始化完成后自动调用该方法。
59@Override
60public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
61 // 注意:增加 AOP 代理后,获得类的方式要通过 AopProxyUtils.getTargetClass(bean); 不能直接 bean.class 因为代理后类的结构发生了变化,这样不能获得自己的自定义注解了
62 Class<?> targetBeanClass = bean.getClass();
63 Object targetBeanObject = bean;
64 if (AopUtils.isAopProxy(targetBeanObject)) {
65 targetBeanClass = AopUtils.getTargetClass(targetBeanObject);
66 targetBeanObject = AopProxyUtils.getSingletonTarget(bean);
67 }
68
69 // 这里判断该类是否有对应的自定义注解
70 Field[] fields = targetBeanClass.getDeclaredFields();
71 for (Field field : fields) {
72 if (!field.isAnnotationPresent(DCCValue.class)) {
73 continue;
74 }
75
76 DCCValue dccValue = field.getAnnotation(DCCValue.class);
77 String value = dccValue.value();
78 if (StringUtils.isBlank(value)) {
79 throw new RuntimeException(field.getName() + "@DCCValue is not config value config case「isSwitch/isSwitch:1」");
80 }
81
82 String[] split = value.split(":");
83 String key = BASE_CONFIG_PATH.concat(split[0]);
84 String defaultValue = split[1];
85
86 String setValue = defaultValue;
87
88 try {
89 if (StringUtils.isBlank(defaultValue)) {
90 throw new RuntimeException("dcc config error " + key + " is not null - 请配置默认值!");
91 }
92
93 RBucket<String> bucket = redissonClient.getBucket(key);
94 boolean exists = bucket.isExists();
95 if (!exists) {
96 bucket.set(defaultValue);
97 } else {
98 setValue = bucket.get();
99 }
100
101 field.setAccessible(true);
102 field.set(targetBeanObject, setValue);
103 field.setAccessible(false);
104 log.info("初始化值 key:{} value:{}", key, setValue);
105 } catch (IllegalAccessException e) {
106 throw new RuntimeException(e);
107 }
108
109 dccObjGroup.put(key, targetBeanObject);
110 }
111 return bean;
112}
实现Api接口更新Redis中的值
1@Override
2@GetMapping("update_config")
3public Response<Boolean> updateConfig(@RequestParam String key, @RequestParam String value) {
4 try {
5 log.info("DCC 动态配置值变更 key:{} value:{}", key, value);
6 dccTopic.publish(key + "," + value);
7 return Response.<Boolean>builder()
8 .code(ResponseCode.SUCCESS.getCode())
9 .info(ResponseCode.SUCCESS.getInfo())
10 .build();
11 } catch (Exception e) {
12 log.error("DCC 动态配置值变更失败! key:{} value:{} {}", key, value, e.getMessage());
13 return Response.<Boolean>builder()
14 .code(ResponseCode.UN_ERROR.getCode())
15 .info(ResponseCode.UN_ERROR.getInfo())
16 .build();
17 }
18}
更新请求 => Redis => 订阅服务 => 修改参数