黑马点评:

登录相关

  • 使用 JWT 实现无状态认证,登录成功后生成一个包含用户信息的 Token 返回给前端,由前端每次请求时携带在请求头中。后端通过拦截器统一拦截请求,解析并校验 Token 的合法性。

<用户信息只包括:userId,username,role等必要信息 => 安全>

  • 校验通过后,我们会将解析出来的用户信息保存到 ThreadLocal 中,构建一个线程级的用户上下文,在整个请求处理流程中都可以方便地获取当前登录用户,避免层层传参,提升代码整洁度和可维护性。

  • ThreadLocal原理:每个线程拥有独立的ThreadLocalMap对象。所有线程使用同一个 ThreadLocal 对象作为键,但值互不影响。通过线程级别的变量隔离,避免多线程竞争,实现线程安全。

超卖问题:

需要先判断是否有库存,有则扣减库存并返回成功;没有则返回失败。这包括两步操作

1.如何解决卖超问题

--在sql加上判断防止数据变为负数 =乐观锁
--redis预减库存减少数据库访问 内存标记(过滤掉无库存的访问)减少redis访问 请求先入队列缓冲,异步下单,增强用户体验

异步下单:
-- 请求先入队缓冲,异步下单,增强用户体验
-- 请求出队,生成订单,减少库存
-- "客户端"定时轮询检查是否秒杀成功 
  1. 数据库层面:
    • 悲观锁(FOR UPDATE-行锁) ❌ 别提,浪费时间
    • 乐观锁(版本号法 - SQL加上判断防止数据变成负数) ⭐⭐⭐
  2. 应用层面:
    • 分布式锁(互斥) ❌ 别提,浪费时间
    • 缓存+消息队列 ⭐⭐⭐
    • 限流+熔断

❌❌❌1.1 行锁

1BEGIN;
2SELECT stock FROM products WHERE id = 1 FOR UPDATE;
3-- 业务逻辑:检查库存是否足够,执行扣减操作
4UPDATE products SET stock = stock - 1 WHERE id = 1;
5COMMIT;

10000条(10线程循环1000次)请求,吞吐量:791.2/sec 互斥锁,串行访问

1Integer stock = baseMapper.selectSecKillStockForUpdate(voucherId); // Fordate加行锁,事务结束会释放
2if (stock <= 0){
3    throw new BusinessException(500, "fail");
4}
5
6boolean update = lambdaUpdate().set(SeckillVoucher::getStock, stock - 1)
7    .eq(SeckillVoucher::getVoucherId, voucherId)
8    .update();

多事务的并发逻辑

 1-- 事务1(加X锁)
 2BEGIN;
 3SELECT * FROM accounts WHERE id = 1 FOR UPDATE; -- 持有X锁
 4
 5-- 事务2(普通SELECT)
 6BEGIN;
 7SELECT * FROM accounts WHERE id = 1; -- ✅ 正常读取(MVCC快照)
 8                                  -- ❌ 如果事务2也执行 FOR UPDATE 则被阻塞
 9
10-- 事务3(尝试修改)
11UPDATE accounts SET balance = 100 WHERE id = 1; -- ❌ 被阻塞(X锁互斥)

⭐⭐⭐1.2 CAS方法

SQL加上防止为负数

1-- (X锁互斥)
2UPDATE products 
3SET stock = stock - 1
4WHERE id = 1 AND stock > 0;  

10000条(10线程循环1000次)请求,吞吐量:944.7/sec

1// 注意把Stock不满足条件的过滤!
2
3boolean update = lambdaUpdate()
4                .setSql("stock = stock - 1") // 这个很关键 不要使用 set(::getStock, stock-1) 错误原因:基于应用层计算的值,而不是数据库的当前值。
5                .eq(SeckillVoucher::getVoucherId, voucherId)
6                .gt(SeckillVoucher::getStock, 0) // eq(::getStock, stock) 
7                .update();

⭐⭐⭐ MVCC(版本链+ReadView) - 读操作而言

  • 事务 B 的更新操作
    • 更新操作会直接操作数据库的当前值,而不是基于事务 B 的 ReadView。
    • 事务 B 的更新操作会检查数据库的当前值,发现 stock = 0,因此更新失败。

🎉🎉🎉讲库存加载到Redis中,在Redis中进行扣减库存

1// seckill_key = "seckill:product:1"
2// =Lua脚本(判断+扣减)=> 再处理MySQL   // 原子化查询+修改 

然后,也不需要扣减MySQL中的库存,这样减少锁竞争,修改为直接插入订单(根据订单和Redis数据,进行核对)。

为了进一步加速响应,引入MQ,解耦下单的库存校验和写入MySQL操作。

2.3 🏷️缓存+消息队列 (服务解耦):

1// 库存初始化:将MySQL商品库存存入Redis中,提升查询和扣减效率;
2
3// 库存预扣减(Redis):用户请求时,先在 Redis 中扣减库存;  == Lua脚本
4Long ok = redisTemplate.execute(SECKILL_SCRIPT, Collections.emptyList(), "1");
5rabbitTemplate.convertAndSend(exchangerName, routingKey, message);
6
7// 订单异步处理(MQ):扣减成功 => MQ => 订单处理接口 => 插入新订单。

10000条(10线程循环1000次)请求,吞吐量:1485.7/sec

🏷️限流+熔断: (仅理论)

 1// 1. Nginx限流(最基础,拦截恶意流量)  - 只针对IP
 2http {
 3    limit_req_zone $binary_remote_addr zone=mylimit:10m rate 100r/s;
 4    server {
 5        location /seckill {
 6            limit_req zone=mylimit burst=50 nodelay;
 7            proxy_pass http://backend;
 8        }
 9    }
10}
11
12// 2. Redis令牌桶(应用层用户级限流,精准控制)
13// def try_acquire_token(user_id, rate=5, capacity=10):
14bucket_key = f"token_bucket:{user_id}"
15last_time_key = f"last_time:{user_id}"
16
17// 下面需要为lua脚本  ====
18now = time.time()
19last_time = float(r.get(last_time_key) or now)
20elapsed = now - last_time    
21    
22// 计算补充的令牌
23tokens_to_add = int(elapsed * rate)
24current_tokens = min(capacity, int(r.get(bucket_key) or capacity) + tokens_to_add)
25    
26if current_tokens > 0:
27    r.set(bucket_key, current_tokens - 1)  # 消耗 1 个令牌
28    r.set(last_time_key, now)  # 更新时间
29    return True
30return False    
31    
32//  ====
33        
34        
35// 3. 前端排队(减少无效请求,避免一窝蜂涌入)

一人一单问题:

(首先不能重复下单,还要扣减库存,并且新增订单)

  • **单体架构 **
    • synchronized加锁 + 插入时的唯一索引
  • 集群架构
    • *** Redis-Lua脚本原子化查询库存+重复下单***

1.1 🏷️Redis预扣减+检查: 使用set结构存储购买过的用户,避免重复下单

 1-- 检查库存
 2-- 检查重复下单
 3if (redis.call('exists', orderKey) == 0) then
 4    redis.call('sadd', orderKey, '')  -- !!!初始化需要从MySQL中加载已经买过的用户
 5    return 2
 6end
 7if (redis.call('sismember', orderKey, userId) == 1) then
 8    return 2
 9end
10
11redis.call('sadd', orderKey, userId)
12-- 在Redis中,使用Set集合存储买过的用户

1.2 🏷️JVM锁+MySQL唯一索引实现:

唯一索引减少查询是否重复下单的时间,直接融入到插入操作成功or失败中

1ALTER TABLE orders ADD UNIQUE INDEX idx_uid_item (user_id, item_id);

实际场景中,我们需要的是 “同一用户对同一商品只能购买一次”,而非单纯限制用户或商品ID的唯一性。 唯一索引为商品ID+用户ID

 1@Transactional
 2public Result createOrder(Long userId, Long itemId) {
 3    boolean stockReduced = false; // 标记是否已扣减库存
 4    try {
 5        // 1. 检查库存(加锁)
 6        Item item = itemMapper.selectForUpdate(itemId);
 7        if (item.getStock() <= 0) {
 8            return Result.fail("库存不足");
 9        }
10
11        // 2. 扣减库存(捕获可能异常)
12        int affectedRows = itemMapper.reduceStock(itemId);
13        if (affectedRows == 0) { // 扣减失败(如库存不足)
14            throw new BusinessException("库存不足");
15        }
16        stockReduced = true; // 标记已扣减
17
18        // 3. 创建订单
19        orderMapper.insert(new Order(userId, itemId));
20        return Result.success();
21
22    } catch (DuplicateKeyException e) {
23        // 仅当库存已扣减时才回滚
24        if (stockReduced) {
25            itemMapper.recoverStock(itemId);
26        }
27        throw new BusinessException("请勿重复购买");
28
29    } catch (Exception e) {
30        // 仅当库存已扣减时才回滚
31        if (stockReduced) {
32            itemMapper.recoverStock(itemId);
33        }
34        throw new BusinessException("下单失败,请重试");
35    }
36}

2.1 🏷️分布式锁现实

解决单机JVM锁,在集群下失效的问题。

1-- 上锁,分布式的
2不存在:创建并返回True

业务 => 扣库存 + 创建订单

1-- 解锁,分布式,只能解锁自己上的锁
2// 如果可以释放别人的锁,会出现以下情况
3// 事务A的锁过期释放了(这里没有续时间or暂时卡了) => 事务B加锁 => 事务A活了过来*解锁* => 事务C加锁 ... 
4事务B和C拿了同一把锁,寄了  => 只允许删自己加的锁
5事务A和B拿了同一把锁,寄了  => 看门狗续时间

lua脚本

1Boolean lock = redisTemplate.opsForValue().setIfAbsent(lock_key, lock_value, 3, TimeUnit.SECONDS);
2
3try {
4    if (lock) {
5        ...
6    }
7} finally {
8	redisTemplate.execute(SECKILL_UNLOCK_SCRIPT, Collections.singletonList(lock_key), lock_value); // 删除锁和判断锁是否当前线程拥有 原子化
9}    

Redission

 1RLock lock = redissonClient.getLock(lock_key);
 2try {
 3    boolean ok = lock.tryLock(1, 3, TimeUnit.SECONDS);  // 不阻塞
 4    if(ok) {...}
 5    finally {
 6        if (lock.isHeldByCurrentThread())  lock.unlock();
 7    }
 8}
 9
10try {
11    boolean ok = lock.lock();  // 阻塞,且未指定过期时间 => 触发看门狗
12    if(ok) {...}
13    finally {
14        if (lock.isHeldByCurrentThread())  lock.unlock();
15    }
16}

超卖问题:

  • 一开始我们是使用synchronzied在扣减库存的代码块中加锁,进行互斥,讲查询MySQL库存和扣减库存原子化。但是这样并发的性能不高。然后我们调整为CAS乐观锁的形式,我们不加锁,修改sql语句,给where条件中添加上库存大于0的条件,当库存不足时,update操作(会加行锁)就执行失败,表示扣减失败

  • 之后,为了提高并发效率,我们使用redis,预扣减库存提高秒杀商品的并发,我们先讲秒杀商品的库存缓存到redis中,在redis中进行 库存的检查和扣减(原子化),因为是基于内存的,效率会比mysql快的多。

一人一单问题:

  • 在redis预扣减缓存中进行了修改,我们使用redis中的set集合,存储购买过的用户Id,判断用户是否重复购买。 因为redis是单线程的,因此我们使用lua脚本,讲这一个操作,打包成一个脚本文件,给redis执行,保证操作的原子性。
  • 然后我们发现当库存不足的时候,用户会一直点击下单,给redis带来了不必要的查询,因此我们在服务中引入一个布尔变量,标识是否有库存,当没有时,直接返回,降低redis的请求量。
  • 另一种不使用redis的解决方法:

后面我们在单体系统下遇到了一人多单超卖问题,我们通过乐观锁(update sql添加库存不能为负)解决;我们对业务进行了变更,将一人多单变成了一人一单,结果在高并发场景下同一用户发送相同请求仍然出现了超卖问题,我们通过悲观锁解决了;由于用户量的激增,我们将单体系统升级成了集群,结果由于锁只能在一个JVM中可见导致又出现了,在高并发场景下同一用户发送下单请求出现超卖问题,我们通过实现分布式锁成功解决集群下的超卖问题;释放锁时,判断锁是否是当前线程 和 删除锁两个操作不是原子性的,可能导致超卖问题,我们通过将两个操作封装到一个Lua脚本成功解决了;

为了解决锁的不可重入性,我们通过将锁以hash结构的形式存储,锁的值为线程ID拼接锁重入次数,每次释放锁都value-1,获取锁value+1,从而实现锁的可重入性,并且将释放锁和获取锁的操作封装到Lua脚本中以确保原子性。

最最后,我们发现可以直接使用现有比较成熟的方案Redisson来解决上诉出现的所有问题🤣,什么不可重试、不可重入、超市释放、原子性等问题Redisson都提供相对应的解决方法(。^▽^)

优惠卷秒杀

异步下单:

 1// 主线程
 21. Lua脚本
 32. 成功=>放入队列失败=>返回异常    
 4    
 51.1. Lua脚本 => 库存 =扣库存=> 是否下单 =记录用户下单 => 返回0
 6               返回1            返回2
 7
 8// 子线程
 9队列中取出订单信息 => 插入订单 => 是否付款 => 更新成功
10                            => End
image-20250316232011370

缓存三兄弟:

💡缓存穿透: 恶意请求查询不存在的数据,绕过缓存直接访问数据库

image-20241121195359815

  • 布隆过滤器
  1. 商品ID,通过多个哈希函数,将元素映射到bitMap中的多个位置,只有每个位置都为1才表明这个对象存在。
  2. guava库有实现好的,直接用。
 1# 伪代码,仅供学习
 2class BloomFilter:
 3    def __init__(self, size, hash_count):
 4        """
 5        初始化布隆过滤器
 6        :param size: 位数组大小(m)
 7        :param hash_count: 哈希函数数量(k)
 8        """
 9        self.size = size
10        self.hash_count = hash_count
11        self.bit_array = [0] * size  # 初始化位数组(全0)
12
13    def add(self, item):
14        """
15        添加元素到布隆过滤器
16        """
17        for seed in range(self.hash_count):
18            # 通过不同的种子模拟多个哈希函数
19            index = self._hash(item, seed) % self.size
20            self.bit_array[index] = 1  # 设置对应位为1
21
22    def contains(self, item):
23        """
24        检查元素是否可能存在
25        :return: 
26            - True: 可能存在(可能有误判)
27            - False: 一定不存在
28        """
29        for seed in range(self.hash_count):
30            index = self._hash(item, seed) % self.size
31            if self.bit_array[index] == 0:
32                return False  # 只要有一位为0,则一定不存在
33        return True  # 所有位均为1,可能存在
34
35    def _hash(self, item, seed):
36        """
37        哈希函数(伪代码,实际可用MurmurHash等)
38        :param item: 输入元素
39        :param seed: 哈希种子(模拟不同哈希函数)
40        :return: 哈希值
41        """
42        hash_value = 0
43        for char in str(item):
44            hash_value = (hash_value * seed + ord(char)) & 0xFFFFFFFF
45        return hash_value
  • 缓存空对象

🏷️ 缓存空对象实现方案

image-20241121195847292

 1// 缓存命中
 2if (StrUtil.isNotBlank(s)) { // 不为null,不为""
 3    SeckillVoucher seckillVoucher = JSONUtil.toBean(s, SeckillVoucher.class);
 4    return Result.ok(seckillVoucher);
 5}
 6
 7// 未命中,是否是之前缓存的空对象
 8if (Objects.nonNull(s)) { // "" 不为null
 9    throw new BusinessException(500, "查询缓存空对象!");
10}
11
12SeckillVoucher seckillVoucher = lambdaQuery()
13    .eq(SeckillVoucher::getVoucherId, id)
14    .one();
15// 缓存空对象 一个线程重建缓存就好
16synchronized (CacheTestService.class) {
17    String t = redisTemplate.opsForValue().get(key);
18    if (Objects.nonNull(t))
19        throw new BusinessException(500, "查询缓存空对象!");
20
21    if (Objects.isNull(seckillVoucher)) {
22        redisTemplate.opsForValue().set(key, "", 3, TimeUnit.SECONDS);
23        throw new BusinessException(500, "缓存空对象!");
24    }
25}
26// 缓存重建
27redisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(seckillVoucher), 3, TimeUnit.SECONDS);
28
29return Result.ok(seckillVoucher);

💡缓存雪崩:量缓存同时失效,导致请求全部压到数据库

image-20241121201618172

  • 过期时间随机化 ⭐⭐⭐ baseline + random_offset
  • Hot数据定时刷新缓存 @Scheduled
  • 多级缓存(本地-Redis-数据库)

💡缓存击穿:热点 Key 突然失效,大量并发请求直接打到数据库

  • 互斥重建:第一个重建(MySQL=>Redis),后面的等一下再访问缓存
  • 逻辑过期:返回过期数据,但是互斥(第一个访问的)重建缓存 ❗数据一致性不高

image-20241121210053886

 1// 缓存击穿(热点Key,瞬时重建问题)
 2public Shop queryWithMutex(Long id) {
 3    // 1. 先Redis
 4    String key = "cache:shop:" + id;
 5    String shopJson = stringRedisTemplate.opsForValue().get(key);
 6    // Cache命中  shopJson 不为null,"", "\t"(不全是空白字符)
 7    if (StrUtil.isNotBlank(shopJson)) {
 8        Shop shop = JSONUtil.toBean(shopJson, Shop.class);
 9        return shop;
10    }
11    // 空对象
12    if (shopJson != null) {   // 为""
13        return null;
14    }
15
16    // 1. 实现缓存重建
17    String lockKey = "lock:shop:" + id;
18    Shop shop = null;
19    try {
20        // 1.1 获取互斥锁
21        boolean isLock = tryLock(lockKey);
22        // 1.2 判断是否获取成功
23        if (!isLock) {
24            // 1.3 失败则休眠且轮询
25            Thread.sleep(50);
26            return queryWithMutex(id); ⭐⭐⭐
27        }
28        // 未命中 => MySQL 缓存重建
29        shop = lambdaQuery().eq(Shop::getId, id).one();
30        Thread.sleep(500); // 模拟复杂的重建任务
31        if (shop == null) {
32            // 将空对象缓存
33            stringRedisTemplate.opsForValue().set(key, "", RedisConstants.CACHE_NULL_TTL, TimeUnit.MINUTES);
34            return null;
35        }
36        // log.debug(shop.toString());
37        stringRedisTemplate.opsForValue().set(key, JSONUtil.toJsonStr(shop), RedisConstants.CACHE_SHOP_TTL, TimeUnit.MINUTES);
38    } catch (InterruptedException e) {
39        e.printStackTrace();
40    } finally {
41        unlock(lockKey);
42    }
43    return shop;
44}
45
46public Shop queryWithLogicExpire(Long id) {
47    // 1. 先Redis
48    String key = "cache:shop:" + id;
49    // log.debug("Key:" + key);
50    String shopJson = stringRedisTemplate.opsForValue().get(key);
51
52    if (StrUtil.isBlank(shopJson)) {
53        // 缓存不存在,查询数据库并回填
54        return saveShop2Redis(id, 20L);
55    }
56    // RedisData:  private LocalDateTime expireTime;
57    //             private Object data;
58    RedisData redisData = JSONUtil.toBean(shopJson, RedisData.class);
59    Shop shop = JSONUtil.toBean((JSONObject) redisData.getData(), Shop.class);
60
61    if (redisData.getExpireTime().isAfter(LocalDateTime.now())) {
62        // 未过期
63        return shop;
64    }
65    String lockKey = "lock:shop:" + id;
66    // 过期进行更新
67    boolean isLock = tryLock(lockKey);
68    if (isLock) {
69        // 独立线程进行重建缓存
70        CACHE_REBUILD_EXECUTOR.submit(() -> {
71            try {
72                saveShop2Redis(id, 20L);
73                Thread.sleep(500);
74            } catch (Exception e) {
75                e.printStackTrace();
76            } finally {
77                unlock(lockKey);
78            }
79        });
80    }
81    return shop;  // 可能会返回过期数据。 因为没有进行等待
82}

Redis&MySQL数据一致性

  • 先更新Redis,再更新MySQL ❌不推荐

image-20250308135639431

  • 先更新MySQL,再更新Redis ❌不推荐

image-20250308140251136

  • 先删除Redis,再更新MySQL,最后写回Redis ❌不推荐

image-20250308140614242

  • 先更新MySQL,再删除Redis,等请求重新缓存(惰性) ✔️推荐
  • 缓存双删除策略。更新MySQL之前,删除一次Redis;更新完MySQL后,再进行一次延迟删除 ✔️推荐

image-20250308140849778

数据库没问题,但是缓存有问题,等待一段实践

  • 使用Binlog异步更新缓存,监听数据库的binlog变化,通过异步方式更新Redis缓存 ✔️推荐

WebSocket

 1@Component
 2@ServerEndpoint("/websocket")
 3public class MyWebSocketEndpoint {
 4
 5    // 会话对象  server:client == 1:n
 6    private static Map<String, Session> sessionMap = new HashMap<>();
 7
 8
 9    @OnOpen
10    public void onOpen(Session session) {
11        System.out.println("onOpen: " + session.getId());
12        sessionMap.put(session.getId(), session);
13        sendMsg("Hi, can I help you?", session.getId());
14    }
15
16    @OnMessage
17    public void onMessage(String message, Session session) {
18        System.out.println("receive Message: " + message);
19        sendMsg("Yse! I am thinking ... this question!", session.getId());
20    }
21
22    @OnClose
23    public void OnClose(Session session) {
24        sendMsg("Bye!", session.getId());
25        System.out.println("OnClose: " + session.getId());
26        sessionMap.remove(session.getId());
27    }
28
29    public void sendMsg(String msg, String sid) {
30        try {
31            sessionMap.get(sid).getBasicRemote().sendText(msg);  // 向session对象sendTest
32        } catch (IOException e) {
33            e.printStackTrace();
34        }
35    }
36
37}

前端:

 1var ws = new WebSocket("ws://localhost:8080/chat");
 2// 初始化连接
 3ws.onopen = function (e) {
 4   
 5}
 6//接受消息
 7ws.onmessage = function (ev) {
 8    
 9}
10// 关闭连接
11ws.onclose = function (ev) {
12    
13}

亮点模块:

RabbitMQ

  • 异步处理
  • 削峰填谷(流量削峰)

    => 削峰(缓冲流量):流量高峰时,MQ 充当“缓冲区”,将请求先存储到消息队列中,避免系统直接崩溃。

    填谷(平滑流量):消费者按稳定速率消费消息,保证系统负载均衡。

    • 消息可靠投递
      • 发布确认机制 - 是否重发
      • RabbitMQ持久化
    • 消息可靠消费
      • 手动ACK
      • 限流机制 - 一次只取一个消息
      • 死信队列(❗兜底)
    • 消息唯一消费
      • Redis存储消费过的ID (多个消费者接收到相同的消息时候) - 发布/订阅(Fanout 交换机-广播),消息会被复制到多个队列。
      • ⭐RabbitMQ 中,默认情况下,消息只会被一个消费者消费队列是点对点模型,一条消息只能被消费一次。多个消费者订阅同一个队列时,RabbitMQ 采用 “轮询分发”(Round Robin),每条消息只会被其中一个消费者处理。
1				 RabbitMQ
2publisher => [exchanger => queue] => consumer
3error     1.     4.      2.   5.  3.    

1.2. 投递失败 => 重新投递

4.5. MQ宕机 => 持久化

3.. ACK确认 => 重新消费

1rebbitmq:
2	# ...
3    publisher-confirm-type: correlated  # 开启发布确认机制
4    publisher-returns: true   # 投递消息抵达队列
5    listener:
6        simple:
7        acknowledge-mode: manual  # 消费者手动确认ACK
8            prefetch: 1   # 限制消费者一次处理一条数据

生产者:

 1// 确保交换机收到消息
 2rabbitTemplate.setConfirmCallback(((correlationData, ack, cause) -> {
 3    if (ack) {
 4        log.debug("[√] Product: Msg send success!");
 5    }else {
 6        log.error("[×] Product: Msg send fail!, 原因: {}", cause.toUpperCase());
 7        msgRetryCount.putIfAbsent(correlationData.getId(), 3);
 8        retrySendMsg(correlationData);
 9    }
10}));
11
12// 确保队列收到消息
13rabbitTemplate.setReturnsCallback(returnedMessage -> {
14    log.error("[×] 消息路由失败!, 原因: {}, Msg: {}", returnedMessage.getReplyText(), returnedMessage.getMessage());
15    // 这里Roting Key错误,配置错误,投递死信队列比较好
16});
17}
18
19private void retrySendMsg(CorrelationData correlationData) {
20    Integer count = msgRetryCount.get(correlationData.getId());
21    if (count > 0) {
22        String message = msgCacheMap.get(correlationData.getId());
23        rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME, ReliabilityMQConfig.ROUTING_KEY, message, correlationData);
24        msgRetryCount.computeIfPresent(correlationData.getId(), (s, integer) -> integer-1);
25    }else {
26        log.error("发送到死信交换机!");
27    }
28}
29
30// ⭐⭐⭐模拟错误情况 1. 2.
31public void sendMessage(String message) {
32    log.debug("[>] Send Msg: " + message);
33    for (int i = 1; i <= 3; i++) {
34        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
35        msgCacheMap.put(correlationData.getId(), message + ":" + i); // 缓存消息
36        if (i == 2) {
37            rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME + "error", ReliabilityMQConfig.ROUTING_KEY, message + ":" + i, correlationData);
38        }else if (i == 3) {
39            rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME, ReliabilityMQConfig.ROUTING_KEY + "error", message + ":" + i, correlationData);
40        }else {
41            rabbitTemplate.convertAndSend(ReliabilityMQConfig.EXCHANGE_NAME, ReliabilityMQConfig.ROUTING_KEY, message, correlationData);
42        }
43    }
44}

消费者:

 1@RabbitListener(queues = ReliabilityMQConfig.QUEUE_NAME)
 2public void receiveMessage(String message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws InterruptedException {
 3    try {
 4        log.debug("[×] 开始处理消息: " + message);
 5        Thread.sleep(2000);
 6        double p = RandomUtil.randomDouble(0, 1);
 7        if (p >= 0.5) {
 8            throw new IOException("手动造成异常");
 9        }
10        // 1. process success
11        channel.basicAck(deliveryTag, false);  // deliveryTag:msgId; multiple=false:ackCurrentMsg else ackPreAllMsg
12        log.debug("[√] process Msg success: {}", message);
13    } catch (IOException e) {
14        // 2. process fail
15        log.debug("[×] process Msg fail: " + message);
16        try {
17            // 退回消息
18            channel.basicNack(deliveryTag, false, true);  // msgId, 是否批量拒绝:false:仅拒绝当前Msg, true:拒绝所有deliveryTag小于等于当前的deliveryTag消息  ⭐⭐⭐ 消费失败时重新入队,确保消息不丢失。
19        } catch (IOException ioException) {
20            log.debug("[<] Msg notAck fail");
21        }
22    }
23}
  • 微服务解耦
  • 订单超时取消

拼团交易

抽象责任链模板

定义抽象任务接口

 1public interface StrategyHandler<T, D, R> {
 2
 3    StrategyHandler DEFAULT = (T, D) -> null;
 4
 5    R apply(T requestParameter, D dynamicContext) throws Exception;
 6}
 7
 8public interface StrategyMapper<T, D, R> {
 9
10    StrategyHandler<T, D, R> get(T requestParameter, D dynamicContext) throws Exception;
11
12}

定义抽象节点接口

 1public abstract class AbstractStrategyRouter<T, D, R> implements StrategyMapper<T, D, R>, StrategyHandler<T, D, R> {
 2
 3    protected StrategyHandler<T, D, R> defaultHandler = StrategyHandler.DEFAULT;
 4
 5    public R router(T requestParameter, D dynamicContext) throws Exception {
 6        StrategyHandler<T, D, R> handler = get(requestParameter, dynamicContext);
 7        if (handler != null) return handler.apply(requestParameter, dynamicContext);
 8        return defaultHandler.apply(requestParameter, dynamicContext);
 9    }
10
11}

大致就是 [apply + next],责任链模式。

1// 基本使用
2class XXX {
3    ObjectXXX nextNode;
4    
5    apply{... => next()}
6    
7    next{nextNode.apply()}
8}

多实例链路模式

双向链表的实现思想

定义链表:

 1public class Node<E> {
 2    E item;
 3
 4    Node<E> next;
 5    Node<E> prev;
 6
 7    public Node(Node<E> prev, E item, Node<E> next) {
 8        this.item = item;
 9        this.next = next;
10        this.prev = prev;
11    }
12}

定义节点处理对象

 1public interface ILogicHandler<T, D, R> {
 2
 3    // 子类可以不实现
 4    default R next(T requestParameter, D dynamicContext) {
 5        return null;
 6    }
 7
 8    R apply(T requestParameter, D dynamicContext) throws Exception;
 9
10}

实现具体的业务节点

 1public class RuleLogic201 implements ILogicHandler<String, Rule02TradeRuleFactory.DynamicContext, String> {
 2
 3    public String apply(String requestParameter, Rule02TradeRuleFactory.DynamicContext dynamicContext) throws Exception{
 4
 5        // 具体业务逻辑;
 6
 7        return next(requestParameter, dynamicContext);
 8    }
 9
10}

装配业务责任链

 1public class LinkArmory<T, D, R> {
 2    private final BusinessLinkedList<T, D, R> logicLink;
 3
 4
 5    @SafeVarargs
 6    public LinkArmory(String linkName, ILogicHandler<T, D, R>... logicHandlers) {
 7        logicLink = new BusinessLinkedList<>(linkName);
 8        for (ILogicHandler<T, D, R> logicHandler: logicHandlers){
 9            logicLink.add(logicHandler);
10        }
11    }
12    public BusinessLinkedList<T, D, R> getLogicLink() {
13        return logicLink;
14    }
15}

责任链实现业务管道

商品折扣价格试算

 1// 从工厂拿到对应的责任链出发节点
 2=> RootNode 
 3	=> multiThread[前置处理|检查信息] => doApply[业务+路由] 
 4=> SwitchNode 
 5	=> multiThread[前置处理] => doApply[业务+路由] 
 6=> TagNode
 7    => doApply[业务+路由]
 8=> MarketNode 
 9	=> multiThread[前置处理拼单试算] => doApply[业务+路由] 
10=> EndNode 
11	=> multiThread[前置处理] => doApply[业务|构建结果]    
12    
13// SwitchNode:根据条件判断业务是否降级or限流
14// TagNode:根据用户信息判断拼团活动能否参与与可见
15// MarketNode:根据拼团活动,计算折扣价格
16// 	      策略模式:根据优惠Id,计算不同的折扣

拼团锁单校验

1// 实现具体的业务节点
2
3// 1. ActivityUsabilityRuleFilter:校验活动状态和活动时间的有效性
4
5// 2. UserTakeLimitRuleFilter:判断用户参与次数与活动限制次数关系
6
7// 装配业务责任链

拼团结算校验

1// 1. SCRuleFilter: 校验拼团的渠道来源是否合法,是否在黑名单中
2// 2. SettableRuleFilter:拼团活动的用户支付时间是否在有效时间内
3// 3. OutTradeNoRuleFilter:外部交易单号是否正确
4// 4. EndRuleFilter:打包结果

AOP+消息订阅实现热更新

通过AOP编程和Redis发布/订阅功能实现业务规则(限流阈值、降级策略)的热更新

自定义注解

1@Retention(RetentionPolicy.RUNTIME)
2@Target(ElementType.FIELD)
3@Documented
4public @interface DCCValue {
5    String value() default "";
6}

定义热更新服务

 1@Service
 2public class DCCService {
 3
 4    /**
 5     * 降级开关 0关闭 1开启
 6     */
 7    @DCCValue("downgradeSwitch:0")
 8    private String downgradeSwitch;
 9
10    @DCCValue("cutRange:100")
11    private String cutRange;
12
13    public boolean isCutRange(String userId) {
14        // 计算哈希码的绝对值
15        int hashCode = Math.abs(userId.hashCode());
16
17        // 获取最后两位
18        int lastTwoDigits = hashCode % 100;
19
20        // 判断是否在切量范围内
21        if (lastTwoDigits <= Integer.parseInt(cutRange)) {
22            log.error(cutRange);
23            return true;
24        }
25
26        return false;
27    }
28
29    public boolean isDowngradeSwitch() {
30        return this.downgradeSwitch.equals("1");
31    }
32}

⭐注意:在SwitchNode中,调用这些方法进行判断是否放行

基于Redis的发布订阅进行参数调整

  1/ DCCValueBeanFactory implements BeanPostProcessor 
  2private static final String BASE_CONFIG_PATH = "group_buy_market_dcc_";
  3
  4private final RedissonClient redissonClient;
  5
  6private final Map<String, Object> dccObjGroup = new HashMap<>();
  7
  8public DCCValueBeanFactory(RedissonClient redissonClient) {
  9    this.redissonClient = redissonClient;
 10}
 11
 12@Bean("dccTopic")
 13public RTopic dccRedisTopicListener(RedissonClient redissonClient) {
 14    log.info("初始化Redis Topic监听事件 测试连通{}", redissonClient.getBucket("test").get());
 15    // 发布订阅模式的Topic实现
 16    RTopic topic = redissonClient.getTopic("group_buy_market_dcc");
 17    // 消息类型,回调接口:Topic名称,接收的消息
 18    topic.addListener(String.class, ((charSequence, s) -> {
 19        log.info("监听到Redis Topic: {}", charSequence.toString());
 20        String[] split = s.split(Constants.SPLIT);
 21
 22        // 获取值
 23        String attribute = split[0];
 24        String key = BASE_CONFIG_PATH + attribute;
 25        String value = split[1];
 26
 27        // 设置值 => 可以存储任意类型的值(如 String、Integer、自定义对象等),Redisson 会自动进行序列化和反序列化。
 28        RBucket<String> bucket = redissonClient.getBucket(key);
 29        boolean exists = bucket.isExists();
 30        if (!exists) return;
 31        bucket.set(value);
 32
 33        Object objBean = dccObjGroup.get(key); // 获取内存中的对象
 34        if (objBean == null) return;
 35
 36        Class<?> objBeanClass = objBean.getClass();
 37        // 检查objBean是否为代理对象
 38        if (AopUtils.isAopProxy(objBean)) {
 39            objBeanClass = AopUtils.getTargetClass(objBean);
 40        }
 41
 42        try {
 43            Field field = objBeanClass.getDeclaredField(attribute);
 44            field.setAccessible(true);
 45            field.set(objBean, value);
 46            field.setAccessible(false);
 47
 48            log.info("DCC 节点监听,动态设置值 {} {}", key,value);
 49        } catch (Exception e) {
 50            throw new RuntimeException(e);
 51        }
 52
 53    }));
 54    return topic;
 55}
 56
 57
 58// implements BeanPostProcessor 重写方法  -- 并在每个 bean 初始化完成后自动调用该方法。
 59@Override
 60public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
 61    // 注意:增加 AOP 代理后,获得类的方式要通过 AopProxyUtils.getTargetClass(bean); 不能直接 bean.class 因为代理后类的结构发生了变化,这样不能获得自己的自定义注解了
 62    Class<?> targetBeanClass = bean.getClass();
 63    Object targetBeanObject = bean;
 64    if (AopUtils.isAopProxy(targetBeanObject)) {
 65        targetBeanClass = AopUtils.getTargetClass(targetBeanObject);
 66        targetBeanObject = AopProxyUtils.getSingletonTarget(bean);
 67    }
 68	
 69    // 这里判断该类是否有对应的自定义注解
 70    Field[] fields = targetBeanClass.getDeclaredFields();
 71    for (Field field : fields) {
 72        if (!field.isAnnotationPresent(DCCValue.class)) {
 73            continue;
 74        }
 75
 76        DCCValue dccValue = field.getAnnotation(DCCValue.class);
 77        String value = dccValue.value();
 78        if (StringUtils.isBlank(value)) {
 79            throw new RuntimeException(field.getName() + "@DCCValue is not config value config case「isSwitch/isSwitch:1」");
 80        }
 81
 82        String[] split = value.split(":");
 83        String key = BASE_CONFIG_PATH.concat(split[0]);
 84        String defaultValue = split[1];
 85
 86        String setValue = defaultValue;
 87
 88        try {
 89            if (StringUtils.isBlank(defaultValue)) {
 90                throw new RuntimeException("dcc config error " + key + " is not null - 请配置默认值!");
 91            }
 92
 93            RBucket<String> bucket = redissonClient.getBucket(key);
 94            boolean exists = bucket.isExists();
 95            if (!exists) {
 96                bucket.set(defaultValue);
 97            } else {
 98                setValue = bucket.get();
 99            }
100
101            field.setAccessible(true);
102            field.set(targetBeanObject, setValue);
103            field.setAccessible(false);
104            log.info("初始化值 key:{} value:{}", key, setValue);
105        } catch (IllegalAccessException e) {
106            throw new RuntimeException(e);
107        }
108
109        dccObjGroup.put(key, targetBeanObject);
110    }
111    return bean;
112}

实现Api接口更新Redis中的值

 1@Override
 2@GetMapping("update_config")
 3public Response<Boolean> updateConfig(@RequestParam String key, @RequestParam String value) {
 4    try {
 5        log.info("DCC 动态配置值变更 key:{} value:{}", key, value);
 6        dccTopic.publish(key + "," + value);
 7        return Response.<Boolean>builder()
 8            .code(ResponseCode.SUCCESS.getCode())
 9            .info(ResponseCode.SUCCESS.getInfo())
10            .build();
11    } catch (Exception e) {
12        log.error("DCC 动态配置值变更失败! key:{} value:{} {}", key, value, e.getMessage());
13        return Response.<Boolean>builder()
14            .code(ResponseCode.UN_ERROR.getCode())
15            .info(ResponseCode.UN_ERROR.getInfo())
16            .build();
17    }
18}

更新请求 => Redis => 订阅服务 => 修改参数