拼团交易平台系统
**项目背景:**为了盘活沉睡用户,需要适当降低商品价格。但为了达到传播的效果,所以需要引入拼团方式,以客带客,靠用户自身传播的方式进行交易拉新。这样的处理方式对比于 KOL,会让利商品价值到用户自身。
第1-2节 拼团库表设计
业务流程:
- 运营角度: 1. 给哪些商品配置拼单;2. 拼团商品提供的规则信息:折扣、时间、人数等。
- 用户角度: 1. 参与拼团,首次发起or参与现有,拼单完成回调通知。
- 库表设计: 1. 人群设计,将所有符合某个条件的用户ID,全部写入特定Redis记录中。
- 拼团活动,折扣的多种迭代。
库表设计:
拼团配置表:
- 拼团活动表:设定了拼团的成团规则,人群标签的使用可以限定哪些人可见,哪些人可参与。
- 折扣配置表:拆分出拼团优惠到一个新的表进行多条配置。如果折扣还有更多的复杂规则,则可以配置新的折扣规则表进行处理。
- 人群标签表:专门来做人群设计记录的,这3张表就是为了把符合规则的人群ID,也就是用户ID,全部跑任务到一个记录下进行使用。 比如黑玫瑰人群、高净值人群、拼团履约率90%以上的人群等。
参与拼团表:
- 拼团账户表:记录用户的拼团参与数据,一个是为了限制用户的参与拼团次数,另外是为了人群标签任务统计数据。
- 用户拼单表:当有用户发起首次拼单的时候,产生拼单id,并记录所需成团的拼单记录,另外是写上拼团的状态、唯一索引、回调接口等。这样拼团完成就可以回调对接的平台,通知完成了。【微信支付也是这样的设计,回调支付结果,这样的设计可以方便平台化对接】当再有用户参与后,则写入用户拼单明细表。直至达成拼团
- 回调任务表:当拼团完成后,要做回调处理。但可能会有失败,所以加入任务的方式进行补偿。如果仍然失败,则需要对接的平台,自己查询拼团结果。
第1-3节 研发系统设计
进行研发系统设计。包括:库表设计、用例图、系统建模、工程模型、功能流程、UML时序图。
- 用例图:用户与系统交互最简表示形式;
- 流程图:功能节点的串联关系;
- 时序图:展示了整个拼团过程所涉及的系统模块和流转关系
系统架构:
MVC架构
DDD架构
第2-2节 试算模型抽象模板设计
引入设计模式进行解耦和实现,提高工程代码的扩展性
=> 设计模式抽象模板的通用结构定义,添加一个 tree规则树抽象模型,在引入到工程中进行使用。这样后续工程中就可以不断的定义通用的设计模式被不同的场景统一使用了。
模型设计
链式的多分支规则树模型结构,由功能节点自行决定后续流程的执行链路。它的设计比责任链的扩展性更好,自由度也更高
- 首先,定义抽象的通用规则树模型结构
- 涵盖:StrategyMapper, StrategyHandler /ˈstrætədʒi/、AbstractStrategyRouter<T, D, R>。 通过泛型设计允许使用方可以自定义出入参和动态上下文,让抽象模板模型具有通用性。
- 之后,由使用方自定义出工厂、功能抽象类和一个个流程流转的节点。
编码实现
项目工程的Types模块中,添加通用设计模式模板
1.1 策略映射器
1public interface StrategyMapper<T, D, R> {
2
3 /**
4 * 获取待执行策略
5 *
6 * @param requestParameter 入参
7 * @param dynamicContext 上下文
8 * @return 返参
9 * @throws Exception 异常
10 */
11 StrategyHandler<T, D, R> get(T requestParameter, D dynamicContext) throws Exception;
12
13}
- 用于获取每个执行的节点,责任链加强版
- T,D,R:入参,上下文,反参
1.2 策略受理器
1public interface StrategyHandler<T, D, R> {
2
3 StrategyHandler DEFAULT = (T, D) -> null;
4
5 R apply(T requestParameter, D dynamicContext) throws Exception;
6
7}
- 执行具体的业务流程,利用上下文传递信息给后面执行的节点。
1.3 策略路由器
1public abstract class AbstractStrategyRouter<T, D, R> implements StrategyMapper<T, D, R>, StrategyHandler<T, D, R> {
2
3 @Getter
4 @Setter
5 protected StrategyHandler<T, D, R> defaultStrategyHandler = StrategyHandler.DEFAULT;
6
7 // 伪代码
8 public R router(T, D) throws Exception {
9 StrategyHandler strategyHandler = get(T, D);
10 if(null != strategyHandler) return strategyHandler.apply(T, D);
11 return defaultStrategyHandler.apply(T, D);
12 }
13
14}
- 执行具体节点,转发给下一个节点或者传递到默认节点中
第2-3节 多线程异步数据加载
首页试算
当一个用户进入到购物首页查看商品的优惠信息,我们可以把这个过程定义为试算过程。试算;试试算一下,这个用户进入到首页看这个商品的时候,商品的营销优惠信息。包括:原始价格、折扣价格、拼团目标、拼团时效、是否可见优惠、是否可参与拼团等,这些东西都是试算拿到的结果。
1. Model定义对象; 2. 服务功能实现,trial试算模块;3. 业务流转的功能节点
模型链路
1IIndexGroupBuyMarketService
2=> RootNode
3 => multiThread[前置处理|检查信息] => doApply[业务+路由]
4=> SwitchNode
5 => multiThread[前置处理] => doApply[业务+路由]
6=> MarketNode
7 => multiThread[⭐前置处理⭐拼单试算] => doApply[业务+路由]
8=> EndNode
9 => multiThread[前置处理] => doApply[业务|构建结果]
路由策略模板[节点模板]
1public abstract class AbstractMultiThreadStrategyRouter<T, D, R> implements StrategyMapper<T, D, R>, StrategyHandler<T, D, R> {
2
3 public R router(T requestParameter, D dynamicContext) throws Exception {}
4 @Override
5 public R apply(T requestParameter, D dynamicContext) throws Exception {
6 multiThread() // 前置处理
7 return doApply() // 业务逻辑
8 }
9 protected abstract void multiThread(T requestParameter, D dynamicContext) throws;
10 protected abstract R doApply(T requestParameter, D dynamicContext) throws;
1public abstract class AbstractGroupBuyMarketSupport<T, D, R> extends AbstractMultiThreadStrategyRouter<T, D, R> {
2
3 protected long timeout = 500;
4 @Resource
5 protected IActivityRepository repository;
6
7 @Override
8 protected void multiThread(T, D) {
9 // 空方法,需要实现的话,让子类重写
10 }
11
12}
具体节点实例
工作流:前置处理 => 业务逻辑 => 路由
1public class RootNode extends AbstractGroupBuyMarketSupport<MarketProductEntity, DynamicContext, TrialBalanceEntity> {
2
3 @Resource
4 private SwitchNode nextNode;
5
6 @Override
7 protected TrialBalanceEntity doApply(T, D, R) throws Exception {
8 log.info("业务处理")
9 return router(requestParameter, dynamicContext); // 下一个节点
10 }
11
12 @Override
13 public StrategyHandler<T, D, R> get(T, D, R) {
14 return switchNode;
15 }
16}
⭐⭐⭐
异步数据加载
1// 异步查询活动配置
2XXXThreadTask taskA = new XXXThreadTask(...);
3FutureTask<XXX> xxxFutureTaskA = new FutureTask<>(XXXThreadTask);
4threadPoolExecutor.execute(xxxFutureTask);
5
6// 异步查询商品信息 -
7XXXThreadTask taskB = new XXXThreadTask(...);
8FutureTask<XXX> xxxFutureTaskB = new FutureTask<>(XXXThreadTask);
9threadPoolExecutor.execute(xxxFutureTask);
10
11// 写入上下文, 前置查询数据
12dynamicContext.setXXXA(xxxFutureTaskA.get(TIMEOUT, TimeUnit.MINUTES)); // 阻塞指定时间
13dynamicContext.setXXXB(xxxFutureTaskB.get(TIMEOUT, TimeUnit.MINUTES));
第2-4节 策略模式优惠折扣计算
不同优惠模式 => 策略模式实现
定义接口
1public interface IDiscountCalculateService {
2 BigDecimal calculate(...);
3}
抽象模板
1public abstract class AbstractDiscountCalculateService implements IDiscountCalculateService {
2
3 @Override
4 public BigDecimal calculate(...) {
5 // 1. 人群标签过滤
6 if (IsFilter){
7 boolean isCrowdRange = filterTagId(userId, groupBuyDiscount.getTagId());
8 if (!isCrowdRange) return originalPrice;
9 }
10 // 2. 折扣优惠计算
11 return doCalculate(originalPrice, groupBuyDiscount);
12 }
13
14 // 人群过滤 - 限定人群优惠
15 private boolean filterTagId(String userId, String tagId) {
16 // todo xiaofuge 后续开发这部分
17 return true;
18 }
19
20 // 具体计算函数
21 protected abstract BigDecimal doCalculate(...);
22
23}
折扣方法
1@Slf4j
2@Service("MJ") // 满减优惠
3public class MJCalculateService extends AbstractDiscountCalculateService {
4
5 @Override
6 public BigDecimal doCalculate(BigDecimal originalPrice, GroupBuyActivityDiscountVO.GroupBuyDiscount groupBuyDiscount) {
7 log.info("优惠策略折扣计算:{}", groupBuyDiscount.getDiscountType().getCode());
8
9 // 折扣表达式 - 100,10 满100减10元
10 String marketExpr = groupBuyDiscount.getMarketExpr();
11 String[] split = marketExpr.split(Constants.SPLIT);
12 BigDecimal x = new BigDecimal(split[0].trim());
13 BigDecimal y = new BigDecimal(split[1].trim());
14
15 // 不满足最低满减约束,则按照原价
16 if (originalPrice.compareTo(x) < 0) {
17 return originalPrice;
18 }
19
20 // 折扣价格
21 BigDecimal deductionPrice = originalPrice.subtract(y);
22
23 // 判断折扣后金额,最低支付1分钱
24 if (deductionPrice.compareTo(BigDecimal.ZERO) <= 0) {
25 return new BigDecimal("0.01");
26 }
27
28 return deductionPrice;
29 }
30
31}
营销调用
1// MarketNode
2
3public TrialBalanceEntity doApply(T, D) throws Exception {
4 log.info("拼团商品查询试算服务");
5
6 dynamicContext.getGroupBuyActivityDiscountVO();
7 groupBuyActivityDiscountVO.getGroupBuyDiscount();
8
9 dynamicContext.getSkuVO();
10
11 // 具体的策略,优惠模式
12 IDiscountCalculateService discountCalculateService = discountCalculateServiceMap.get(groupBuyDiscount.getMarketPlan());
13 if (null == discountCalculateService) {
14 log.info("...", JSON.toString(discountCalculateServiceMap.keys()))
15 throw ...
16 }
17
18 // 折扣价格
19 BigDecimal deductionPrice = discountCalculateService.calculate(requestParameter.getUserId(), skuVO.getOriginalPrice(), groupBuyDiscount);
20 dynamicContext.setDeductionPrice(deductionPrice);
21
22 return router(requestParameter, dynamicContext);
23}
学习点:
1@Service("Key")
2XXX implement IDiscountCalculateService {}
3
4@Resource // ⭐先按名称,找不到再按类型注入
5private Map<String, IDiscountCalculateService> discountCalculateServiceMap;
第2-5节 人群标签数据采集
精准的对这些用户做定向活动投放,比如;特定的券、特定的通知等。以此达到更加精准的运营效果
人群标签是根据用户业务行为采集的
数据库表:
crowd_tags :统计功能;
crowd_tags_detail:tag_id => user_id; 具体每个人群都有谁;
crowd_tags_job:tag_id => batch_id, tag_relu:消费规则;
流程:
- 根据tag_id, batch_id 查询crowd_tags_job得到对应批次任务
- 采集用户数据
- 数据写入记录 // 暂存
- 数据写入数据库 => crowd_tags_detail
- 更新人群标签统计 => crowd_tags
数据写入数据库
1@Override
2public void addCrowdTagsUserId(String tagId, String userId) {
3 CrowdTagsDetail crowdTagsDetailReq = new CrowdTagsDetail();
4 crowdTagsDetailReq.setTagId(tagId);
5 crowdTagsDetailReq.setUserId(userId);
6 try {
7 crowdTagsDetailDao.addCrowdTagsUserId(crowdTagsDetailReq);
8 // 获取BitSet
9 RBitSet bitSet = redisService.getBitSet(tagId); // tagId:{userId, ...}:BitSet
10 bitSet.set(redisService.getIndexFromUserId(userId), true); // Long类型啥的转换一下跟bitset长度一致,尽可能均匀
11 } catch (DuplicateKeyException ignore) {
12 // 忽略唯一索引冲突
13 }
14}
❗❗❗redis中 BitSet 就是bit类型的hashtable, 而 bitmap是字符串;
大致思路:
1// 数据写入数据库时,同时把bitset中位置置1,表示这个用户存在于这个集合
2// 查询时,根据用户Id,查bitset // 这里用户ID=>bitset:key, 使用一个转换函数,使得散列尽可能均匀
第2-6节 库表拆分
将商品SKU与活动表 解耦 => SKU&Activity关联表。使得多个商品可以绑定同一个活动ID(优惠拼团活动)
1# Market节点 判断 商品是否存在优惠活动
2=> ErrorNode => 不存在优惠活动
3=> EndNode => 返回优惠的试算价格
流程:
1// MarketNode => multi-thread 检查是否存在商品<=> 活动关联表
2@Override
3public GroupBuyActivityDiscountVO call() throws Exception {
4 SCSkuActivityVO scSkuActivityVO = activityRepository.querySCSkuActivityBySCGoodsId(goodsId);
5 // log.error(JSON.toJSONString(scSkuActivityVO));
6 if (scSkuActivityVO == null) return null;
7 return activityRepository.queryGroupBuyActivityDiscountVOById(scSkuActivityVO.getActivityId());
8}
1// marketNode 路由
2if (dynamicContext.getGroupBuyActivityDiscountVO() == null || dynamicContext.getSkuVO() == null) {
3 return errorNode;
4}
5return tagNode;
1// ErrorNode 判断上下文,抛出无拼团配置
2log.info("拼团商品查询试算服务-NoMarketNode userId:{} requestParameter:{}", requestParameter.getUserId(), JSON.toJSONString(requestParameter));
3
4if (dynamicContext.getGroupBuyActivityDiscountVO() == null || dynamicContext.getSkuVO() == null) {
5 log.info("商品无拼团营销配置 {}", requestParameter.getGoodsId());
6 throw new AppException(ResponseCode.ILLEGAL_PARAMETER.getCode(), ResponseCode.ILLEGAL_PARAMETER.getInfo());
7}
8
9return TrialBalanceEntity.builder().build();
第2-7节 人群标签节点过滤
限制拼团的人群
流程1:
1MarketNode => TagNode => EndNode
1// TagNode 检查请求的用户是否 能看见拼团,是否参与拼团 tag_scope = {1,2} 第一位限制是否可见,第二位是否限制参与
2// redis:bitset => tag_id:{...} 存放满足条件的用户
3
4// 获取拼团信息
5GroupBuyActivityDiscountVO activityDiscountVO = dynamicContext.getGroupBuyActivityDiscountVO();
6String tagId = activityDiscountVO.getTagId();
7boolean visible = activityDiscountVO.isVisible();
8boolean enable = activityDiscountVO.isEnable();
9
10// 无特定人群信息, 人人能操作
11if (StringUtils.isBlank(tagId)) {
12 dynamicContext.setVisible(true);
13 dynamicContext.setEnable(true);
14 return router(requestParameter, dynamicContext);
15}
16
17// 是否在人群范围内,特定人群能看
18boolean isWithin = repository.isTagCrowdRange(tagId, requestParameter.getUserId());
19dynamicContext.setVisible(visible || isWithin);
20dynamicContext.setEnable(enable || isWithin);
21
22return router(requestParameter, dynamicContext);
23
24
25// => isTagCrowdRange function
26// xiaofuge、liergou在里面
27RBitSet bitSet = redisService.getBitSet(tagId);
28if (!bitSet.isExists()) return true;
29// 判断用户是否存在人群中
30return bitSet.get(redisService.getIndexFromUserId(userId));
第2-8节 动态配置开关操作
目的:如何不停车就给汽车换个轮子?
=> 在程序运行过程中,直接动态变更某些属性配置。这些动态变更的配置包括降级和切量的开关,也包括一些功能程序的白名单用户测试。
使用Redis作为集中化储存系统的好处,方便一处修改变量,通知所有的实例进行变更!
流量降级或切量
1// 自定义注解
2@Retention(RetentionPolicy.RUNTIME)
3@Target(ElementType.FIELD)
4@Documented
5public @interface DCCValue {
6 String value() default "";
7}
8
9// 动态中心控制 -- 作用于降级
10@Service
11public class DCCService {
12
13 /**
14 * 降级开关 0关闭 1开启
15 */
16 @DCCValue("downgradeSwitch:0")
17 private String downgradeSwitch;
18
19 @DCCValue("cutRange:100")
20 private String cutRange;
21
22 public boolean isCutRange(String userId) {
23 // 计算哈希码的绝对值
24 int hashCode = Math.abs(userId.hashCode());
25
26 // 获取最后两位
27 int lastTwoDigits = hashCode % 100;
28
29 // 判断是否在切量范围内
30 if (lastTwoDigits <= Integer.parseInt(cutRange)) {
31 log.error(cutRange);
32 return true;
33 }
34
35 return false;
36 }
37
38 public boolean isDowngradeSwitch() {
39 return this.downgradeSwitch.equals("1");
40 }
41}
42
43// SwitchNode中 进行流量控制
44// 根据用户ID切量
45String userId = requestParameter.getUserId();
46
47// 判断是否降级
48if (activityRepository.downgradeSwitch()) {
49 log.info("拼团活动降级拦截 {}", userId);
50 throw new AppException(ResponseCode.E0004.getCode(), ResponseCode.E0004.getInfo());
51}
52
53// 判断是否切量
54if (activityRepository.cutRange(userId)) {
55 log.info("拼团活动切量拦截 {}", userId);
56 throw new AppException(ResponseCode.E0004.getCode(), ResponseCode.E0004.getInfo());
57}
利用Redis的Topic事件发布订阅 进行 修改
BeanPostProcessor 能够当SpringBean初始化完成后再执行
1// DCCValueBeanFactory implements BeanPostProcessor
2private static final String BASE_CONFIG_PATH = "group_buy_market_dcc_";
3
4private final RedissonClient redissonClient;
5
6private final Map<String, Object> dccObjGroup = new HashMap<>();
7
8public DCCValueBeanFactory(RedissonClient redissonClient) {
9 this.redissonClient = redissonClient;
10}
11
12@Bean("dccTopic")
13public RTopic dccRedisTopicListener(RedissonClient redissonClient) {
14 log.info("初始化Redis Topic监听事件 测试连通{}", redissonClient.getBucket("test").get());
15 // 发布订阅模式的Topic实现
16 RTopic topic = redissonClient.getTopic("group_buy_market_dcc");
17 // 消息类型,回调接口:Topic名称,接收的消息
18 topic.addListener(String.class, ((charSequence, s) -> {
19 log.info("监听到Redis Topic: {}", charSequence.toString());
20 String[] split = s.split(Constants.SPLIT);
21
22 // 获取值
23 String attribute = split[0];
24 String key = BASE_CONFIG_PATH + attribute;
25 String value = split[1];
26
27 // 设置值 => 可以存储任意类型的值(如 String、Integer、自定义对象等),Redisson 会自动进行序列化和反序列化。
28 RBucket<String> bucket = redissonClient.getBucket(key);
29 boolean exists = bucket.isExists();
30 if (!exists) return;
31 bucket.set(value);
32
33 Object objBean = dccObjGroup.get(key); // 获取内存中的对象
34 if (objBean == null) return;
35
36 Class<?> objBeanClass = objBean.getClass();
37 // 检查objBean是否为代理对象
38 if (AopUtils.isAopProxy(objBean)) {
39 objBeanClass = AopUtils.getTargetClass(objBean);
40 }
41
42 try {
43 Field field = objBeanClass.getDeclaredField(attribute);
44 field.setAccessible(true);
45 field.set(objBean, value);
46 field.setAccessible(false);
47
48 log.info("DCC 节点监听,动态设置值 {} {}", key,value);
49 } catch (Exception e) {
50 throw new RuntimeException(e);
51 }
52
53 }));
54 return topic;
55}
56
57
58// implements BeanPostProcessor 重写方法 -- 并在每个 bean 初始化完成后自动调用该方法。
59@Override
60public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
61 // 注意:增加 AOP 代理后,获得类的方式要通过 AopProxyUtils.getTargetClass(bean); 不能直接 bean.class 因为代理后类的结构发生了变化,这样不能获得自己的自定义注解了
62 Class<?> targetBeanClass = bean.getClass();
63 Object targetBeanObject = bean;
64 if (AopUtils.isAopProxy(targetBeanObject)) {
65 targetBeanClass = AopUtils.getTargetClass(targetBeanObject);
66 targetBeanObject = AopProxyUtils.getSingletonTarget(bean);
67 }
68
69 // 这里判断该类是否有对应的自定义注解
70 Field[] fields = targetBeanClass.getDeclaredFields();
71 for (Field field : fields) {
72 if (!field.isAnnotationPresent(DCCValue.class)) {
73 continue;
74 }
75
76 DCCValue dccValue = field.getAnnotation(DCCValue.class);
77 String value = dccValue.value();
78 if (StringUtils.isBlank(value)) {
79 throw new RuntimeException(field.getName() + "@DCCValue is not config value config case「isSwitch/isSwitch:1」");
80 }
81
82 String[] split = value.split(":");
83 String key = BASE_CONFIG_PATH.concat(split[0]);
84 String defaultValue = split[1];
85
86 String setValue = defaultValue;
87
88 try {
89 if (StringUtils.isBlank(defaultValue)) {
90 throw new RuntimeException("dcc config error " + key + " is not null - 请配置默认值!");
91 }
92
93 RBucket<String> bucket = redissonClient.getBucket(key);
94 boolean exists = bucket.isExists();
95 if (!exists) {
96 bucket.set(defaultValue);
97 } else {
98 setValue = bucket.get();
99 }
100
101 field.setAccessible(true);
102 field.set(targetBeanObject, setValue);
103 field.setAccessible(false);
104 log.info("初始化值 key:{} value:{}", key, setValue);
105 } catch (IllegalAccessException e) {
106 throw new RuntimeException(e);
107 }
108
109 dccObjGroup.put(key, targetBeanObject);
110 }
111 return bean;
112}
Controller
1@Override
2@GetMapping("update_config")
3public Response<Boolean> updateConfig(@RequestParam String key, @RequestParam String value) {
4 try {
5 log.info("DCC 动态配置值变更 key:{} value:{}", key, value);
6 dccTopic.publish(key + "," + value);
7 return Response.<Boolean>builder()
8 .code(ResponseCode.SUCCESS.getCode())
9 .info(ResponseCode.SUCCESS.getInfo())
10 .build();
11 } catch (Exception e) {
12 log.error("DCC 动态配置值变更失败! key:{} value:{} {}", key, value, e.getMessage());
13 return Response.<Boolean>builder()
14 .code(ResponseCode.UN_ERROR.getCode())
15 .info(ResponseCode.UN_ERROR.getInfo())
16 .build();
17 }
18}
⏱️25/3/24-23:34
第2-9节 拼团交易营销锁单
25/3/25/ 20:37
完整的业务流程如下:
- 首先,团购商品下单。下单过程分为:创建流水单、锁定营销优惠(拼团、积分、卷),创建支付订单、唤起收银台支付、用户扫码支付、支付完成核销优惠等。
- 用户以拼团的方式下单,创建流水单完成后,需要与拼团系统交互,锁定营销优惠。更新流水单优惠金额和支付金额。创建订单使用最终的支付金额。
- 拼团表(目标量、完成量、锁单量),锁单量达到目标量后,不能再参与该拼团。拼团超时,释放空闲位置让其他用户参与.
Trade Repository / rɪˈpɒzət(ə)ri /
1// 简单实现步骤
2// OutTradeNO 模拟生成 订单ID模拟生成, TeamID模拟生成. 两张表:1.记录拼团中每个用户的信息 2. 记录该拼团信息.
3
4// ⭐仓储业务实现
5// 判断是否有团 -- 一行记录存储每个拼团的信息
6// 首次则创建拼团订单. 初始化表2
7// 否则加入其他人的团 表2 拼团人数+1
8
9// 插入该用户的拼团信息到表1; 每个用户一行
10
11// ⭐控制器业务实现
12// 1. 检查用户信息
13// 2. 查询用户outTradeNo是否已经使用过,幂等性
14// 3. 查询拼团是否人数已满足
15// 4. 营销优惠试算
16// 5. 判断用户是否能够看见拼团和参与
17// 6. 拼团锁单 => 仓储业务
18// 7. 返回结果
第2-10节 责任链抽象模板设计
25/4/8/ 15:53
先前的责任链模板都是写死的。本章内容为:将每个节点的执行逻辑和链接逻辑解耦。
抽象
简单的单例链
1// 1. IlogicChainArmory: 装配链,提供添加节点方法和获取节点
2// next()
3// appendNext()
4// 2. ILogicLink extends IlogicChainArmory: 并提供一个受理业务逻辑的方法
5// apply()
6// 3. AbstractLogicLink extends ILogicLink: 封装添加节点和执行 next 下一个节点的方法
7// next()
8// appendNext()
9// apply()
这里是由统一的类维护责任链,也就是一份责任链。如果有2个独立的链要处理,就需要使用到非单例的类进行填充,否则会修改同一份链。也就是 ILogicLink<T, D, R> next 被几份链反复调整,也就不是一个单独的链了。
多例-链路模式
多例链的设计要解耦链路和执行,把链路当做一个 LinkedList 列表处理,之后执行当做是单独的 for 循环。
1// 需要设计的类
2// 1. Node & LinkedList extends ILink
3// 双向链表节点 & 双向链表
4
5// 2. BusinessLinkedList extends LinkedList
6// 从头到尾执行各个节点
7
8// 3. ILogicHandler -- 定义任务
9
10// 4. LinkArmory -- 创建链表并装配节点
11// public LinkArmory(String linkName, ILogicHandler<T, D, R>... logicHandlers)
12// private final BusinessLinkedList<T, D, R> logicLink;
1// 用法
2// 1. 定义 RuleLogicNode extends ILogicHandler
3// 2. new LinkArmory<>("demo01", ruleLogic201, ruleLogic202, ...);
具体伪代码
1public class LinkedList<E> implements ILink<E> {
2
3 /**
4 * 责任链名称
5 * */
6 @Getter
7 private final String name;
8 transient int size = 0;
9 transient Node<E> first;
10 transient Node<E> last;
11
12 public LinkedList(String name) {
13 this.name = name;
14 }
15
16 // 头插节点
17 void linkFirst(E e) {
18 final Node<E> f = first;
19 final Node<E> newNode = new Node<>(null, e, f);
20 first = newNode;
21 if (f == null)
22 last = newNode;
23 else
24 f.prev = newNode;
25 size++;
26 }
27
28 // 尾插法
29 void linkLast(E e) {
30 final Node<E> l = last;
31 final Node<E> newNode = new Node<>(l, e, null);
32 last = newNode;
33 if (l == null) {
34 first = newNode;
35 } else {
36 l.next = newNode;
37 }
38 size++;
39 }
40
41 @Override
42 public boolean add(E e) {
43 linkLast(e);
44 return true;
45 }
46
47 @Override
48 public boolean addFirst(E e) {
49 linkFirst(e);
50 return true;
51 }
52
53 @Override
54 public boolean addLast(E e) {
55 linkLast(e);
56 return true;
57 }
58
59 @Override
60 public boolean remove(Object o) {
61 if (o == null) {
62 for (Node<E> x=first; x!=null; x=x.next) {
63 if (x.item == null) {
64 unlink(x);
65 return true;
66 }
67 }
68 } else {
69 for (Node<E> x=first; x!=null; x=x.next) {
70 if (o.equals(x.item)) {
71 unlink(x);
72 return true;
73 }
74 }
75 }
76 return false;
77 }
78
79 // 删除节点,注意是双向链表
80 E unlink(Node<E> x) {
81 final E element = x.item;
82 final Node<E> next = x.next;
83 final Node<E> prev = x.prev;
84
85 // x为头节点
86 if (prev == null) {
87 first = next;
88 } else {
89 prev.next = next;
90 x.prev = null;
91 }
92
93 // x为末尾
94 if (next == null) {
95 last = x.prev;
96 } else {
97 next.prev = prev;
98 x.next = null;
99 }
100
101 x.item = null;
102 size--;
103 return element;
104 }
105
106 @Override
107 public E get(int index) {
108 return node(index).item;
109 }
110
111 // 根据index,判断是前向还是后向
112 private Node<E> node(int index) {
113 if (index < (size >> 1)) {
114 Node<E> x = first;
115 for (int i = 0; i < index; i++) {
116 x = x.next;
117 }
118 return x;
119 } else {
120 Node<E> x = last;
121 for (int i = size-1; i > index; i--) {
122 x = x.prev;
123 }
124 return x;
125 }
126 }
127
128 @Override
129 public void printLinkList() {
130 if (this.size == 0) {
131 System.out.printf("链表为空");
132 } else {
133 Node<E> temp = first;
134 System.out.print("目前的列表,头节点:" + first.item + " 尾节点:" + last.item + " 整体:");
135 while (temp != null) {
136 System.out.print(temp.item + ",");
137 temp = temp.next;
138 }
139 System.out.println();
140 }
141 }
142}
第2-11节 交易规则责任链过滤
2025-4-8 17:30
该章节补充:过滤拼团活动配置的规则。包括;活动的有效期、状态,以及个人参与拼团的次数。
1// Logic chain
2// 1. 检查参数
3// 2. 检查是否存在交易记录,根据外部OrderId
4// 3. 判断拼团活动是否满
5// 4. 营销试算 -- 给出折扣价格
6// 5. 人群限定 -- 特定人群才能拼团
7// 6. 拼团锁单 -- 1. 修改活动表对应的信息 2. 插入一条拼团记录
8// 6.1. 交易规则过滤 -- 检查活动时间 + 检查拼团次数 ⭐本节主要这部分
具体就是定义一些 责任链节点(拦截器)
1@Slf4j
2@Service
3public class ActivityUsabilityRuleFilter implements ILogicHandler<TradeRuleCommandEntity, TradeRuleFilterFactory.DynamicContext, TradeRuleFilterBackEntity> {
4
5 @Resource
6 private ITradeRepository repository;
7
8 @Override
9 public TradeRuleFilterBackEntity apply(TradeRuleCommandEntity requestParameter, TradeRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
10 log.info("交易规则过滤-活动的可用性校验{} activityId:{}", requestParameter.getUserId(), requestParameter.getActivityId());
11
12 // 查询拼团活动
13 GroupBuyActivityEntity groupBuyActivityEntity = repository.queryGroupBuyActivityEntityByActivityId(requestParameter.getActivityId());
14
15 // 校验:活动状态
16 if (!ActivityStatusEnumVO.EFFECTIVE.equals(groupBuyActivityEntity.getStatus())) {
17 log.info("活动的可用性校验,非生效状态 activityId:{}", requestParameter.getActivityId());
18 throw new AppException(ResponseCode.E0101);
19 }
20
21 // 校验;活动时间
22 Date now = new Date();
23 if (now.before(groupBuyActivityEntity.getStartTime()) || now.after(groupBuyActivityEntity.getEndTime())) {
24 log.info("活动的可用性校验,非可参与时间范围 activityId:{}", requestParameter.getActivityId());
25 throw new AppException(ResponseCode.E0102);
26 }
27
28 dynamicContext.setGroupBuyActivity(groupBuyActivityEntity);
29
30 // 走到下一个责任链节点
31 return next(requestParameter, dynamicContext);
32 }
33}
1@Slf4j
2@Service
3public class UserTakeLimitRuleFilter implements ILogicHandler<TradeRuleCommandEntity, TradeRuleFilterFactory.DynamicContext, TradeRuleFilterBackEntity> {
4
5 @Resource
6 private ITradeRepository repository;
7
8 @Override
9 public TradeRuleFilterBackEntity apply(TradeRuleCommandEntity requestParameter, TradeRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
10 log.info("交易规则过滤-用户参与次数校验{} activityId{}", requestParameter.getUserId(), requestParameter.getActivityId());
11
12 GroupBuyActivityEntity groupBuyActivity = dynamicContext.getGroupBuyActivity();
13
14 Integer count = repository.queryOrderCountByActivityId(requestParameter.getActivityId(), requestParameter.getUserId());
15
16 if (groupBuyActivity.getTakeLimitCount() != null && count >= groupBuyActivity.getTakeLimitCount()) {
17 log.info("用户参数次数校验,已达可参与上限 activityId:{}", requestParameter.getActivityId());
18 throw new AppException(ResponseCode.E0103);
19 }
20
21 return TradeRuleFilterBackEntity.builder()
22 .userTakeOrderCount(count)
23 .build();
24 }
25}
组装责任链 – BusinessLinkedList 具体业务
1@Bean("tradeRuleFilter")
2public BusinessLinkedList<TradeRuleCommandEntity, TradeRuleFilterFactory.DynamicContext, TradeRuleFilterBackEntity>
3 tradeRuleFilter(ActivityUsabilityRuleFilter activityUsabilityRuleFilter, UserTakeLimitRuleFilter userTakeLimitRuleFilter) {
4 // 组装链
5 LinkArmory<TradeRuleCommandEntity, DynamicContext, TradeRuleFilterBackEntity> filter = new LinkArmory<>("交易规则过滤链", activityUsabilityRuleFilter, userTakeLimitRuleFilter);
6 // 链对象
7 return filter.getLogicLink();
8}
LinkArmory – 真正的抽象装配责任链
1@SafeVarargs
2public LinkArmory(String linkName, ILogicHandler<T, D, R>... logicHandlers) {
3 logicLink = new BusinessLinkedList<>(linkName);
4 for (ILogicHandler<T, D, R> logicHandler: logicHandlers){
5 logicLink.add(logicHandler);
6 }
7}
8public BusinessLinkedList<T, D, R> getLogicLink() {
9 return logicLink;
10}
用法
1@Resource
2private BusinessLinkedList<TradeRuleCommandEntity, TradeRuleFilterFactory.DynamicContext, TradeRuleFilterBackEntity> tradeRuleFilter;
3
4tradeRuleFilter.aaply(...)
第2-12节:拼团组队结算统计
25/4/8 21:19
拼团的过程是用户在商城下单,锁定拼团优惠(也就是拼团系统里锁单的过程)。之后就是用户给这笔商品完成支付交易,交易后不会直接发货,直至拼团组队完成后才会发货。
那么,这里有一个流程,就是支付完成后,需要做拼团数量的统计结算。如,拼团需要3个用户一起下单,那么每完成一笔支付,就要给拼团的组队加上一笔记录。这个就是本节要实现的流程。
⭐⭐⭐ 所有用户完成支付后, 最后一个用户的结算触发回调事件!
1@Transactional(timeout = 500)
2@Override
3public void settlementMarketPayOrder(GroupBuyTeamSettlementAggregate groupBuyTeamSettlementAggregate) {
4 UserEntity userEntity = groupBuyTeamSettlementAggregate.getUserEntity();
5 GroupBuyTeamEntity groupBuyTeamEntity = groupBuyTeamSettlementAggregate.getGroupBuyTeamEntity();
6 TradePaySuccessEntity tradePaySuccessEntity = groupBuyTeamSettlementAggregate.getTradePaySuccessEntity();
7
8 // 1. 更新拼团订单明细状态
9 GroupBuyOrderList groupBuyOrderList = new GroupBuyOrderList();
10 groupBuyOrderList.setUserId(userEntity.getUserId());
11 groupBuyOrderList.setOutTradeNo(tradePaySuccessEntity.getOutTradeNo());
12 int updateOrderStatus2COMPLETE = groupBuyOrderListDao.updateOrderStatus2COMPLETE(groupBuyOrderList);
13 if (1 != updateOrderStatus2COMPLETE) {
14 throw new AppException(ResponseCode.UPDATE_ZERO);
15 }
16
17 // 2. 更新拼团达成数量
18 int updateAddCount = groupBuyOrderDao.updateAddCompleteCount(groupBuyTeamEntity.getTeamId());
19 if (1 != updateAddCount) {
20 throw new AppException(ResponseCode.UPDATE_ZERO);
21 }
22
23 // 3. 更新拼团完成状态
24 if (groupBuyTeamEntity.getTargetCount() - groupBuyTeamEntity.getCompleteCount() == 1) {
25 int i = groupBuyOrderDao.updateOrderStatus2COMPLETE(groupBuyTeamEntity.getTeamId());
26 if (1 != i) {
27 throw new AppException(ResponseCode.UPDATE_ZERO);
28 }
29
30 // 查询拼团交易外部订单号列表
31 List<String> outOrderNoList = groupBuyOrderListDao.queryGroupBuyCompleteOrderOutTradeNoListByTeamId(groupBuyTeamEntity.getTeamId());
32
33 // 拼团完成写入回调任务记录
34 NotifyTask notifyTask = NotifyTask.builder()
35 .activityId(groupBuyTeamEntity.getActivityId())
36 .teamId(groupBuyTeamEntity.getTeamId())
37 .notifyUrl("暂无")
38 .notifyCount(0)
39 .notifyStatus(0)
40 .build();
41 notifyTask.setParameterJson(JSON.toJSONString(new HashMap<String, Object>() {{
42 put("teamId", groupBuyTeamEntity.getTeamId());
43 put("outTradeNoList", outOrderNoList);
44 }}));
45
46 notifyTaskDao.insert(notifyTask);
47 }
48
49}
这里算是支付成功后的回调
第2-13节:交易结算责任链过滤
⏱️25/4/9 12:31
本节诉求:
拼团交易结算的过程,需要一些列的规则过滤。包括;我们上一节提到的校验外部交易单的时间是否在拼团有效时间内,同时还有关于这笔外部交易单是否为有效的拼团锁单订单。另外像是 SC 渠道的有效性也需要在结算时进行校验。
所以,本节我们需要实现一套规则链,来处理这些业务规则。因为规则链已经被抽取为通用的模板了,那么本节使用起来会非常容易。
责任链流程:1. 校验渠道是否为黑名单 2. 外部订单号是否正确 3. 交易时间是否在拼团有效期内 4. 打包信息到上下文
责任链包装对象
1@Bean("tradeSettlementRuleFilter")
2public BusinessLinkedList<TradeSettlementRuleCommandEntity,
3TradeSettlementRuleFilterFactory.DynamicContext, TradeSettlementRuleFilterBackEntity> tradeSettlementRuleFilter(
4 SCRuleFilter scRuleFilter, OutTradeNoRuleFilter outTradeNoRuleFilter, SettableRuleFilter settableRuleFilter, EndRuleFilter endRuleFilter
5) {
6 LinkArmory<TradeSettlementRuleCommandEntity, DynamicContext, TradeSettlementRuleFilterBackEntity> tradeSettlementFilter =
7 new LinkArmory<>("交易结算规则过滤链", scRuleFilter, outTradeNoRuleFilter, settableRuleFilter, endRuleFilter);
8
9 return tradeSettlementFilter.getLogicLink();
10
11}
1 SC过滤器
1@Slf4j
2@Service
3public class SCRuleFilter implements ILogicHandler<TradeSettlementRuleCommandEntity,
4 TradeSettlementRuleFilterFactory.DynamicContext, TradeSettlementRuleFilterBackEntity> {
5
6 @Resource
7 private ITradeRepository repository;
8
9 @Override
10 public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
11 // 1. 打印函数流程信息
12 log.info("结算规则过滤-渠道黑名单校验{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo());
13
14 // 2. 校验黑名单信息
15 Boolean intercept = repository.isSCBlackIntercept(requestParameter.getSource(), requestParameter.getChannel());
16 if (intercept) {
17 log.info("{}-{} 渠道黑名单拦截", requestParameter.getSource(), requestParameter.getChannel());
18 throw new AppException(ResponseCode.E0105);
19 }
20
21 return next(requestParameter, dynamicContext);
22 }
23}
2 订单号校验过滤器
1@Slf4j
2@Service
3public class OutTradeNoRuleFilter implements ILogicHandler<TradeSettlementRuleCommandEntity,
4 TradeSettlementRuleFilterFactory.DynamicContext, TradeSettlementRuleFilterBackEntity> {
5
6 @Resource
7 private ITradeRepository repository;
8
9 @Override
10 public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
11 log.info("结算规则过滤-外部单号校验{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo());
12
13 MarketPayOrderEntity marketPayOrderEntity = repository.queryMarketPayOrderEntityByOutTradeNo(requestParameter.getUserId(), requestParameter.getOutTradeNo());
14
15 if (null == marketPayOrderEntity) {
16 log.info("不存在的外部交易单号或用户已退单,不需要做支付订单结算:{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo());
17 throw new AppException(ResponseCode.E0104);
18 }
19
20 dynamicContext.setMarketPayOrderEntity(marketPayOrderEntity);
21
22 return next(requestParameter, dynamicContext);
23 }
24}
3 有效期校验器
1@Slf4j
2@Service
3public class SettableRuleFilter implements ILogicHandler<TradeSettlementRuleCommandEntity,
4 TradeSettlementRuleFilterFactory.DynamicContext, TradeSettlementRuleFilterBackEntity> {
5
6 @Resource
7 private ITradeRepository repository;
8
9 @Override
10 public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
11 log.info("结算规则过滤-有效时间校验:{}, {}", requestParameter.getUserId(), requestParameter.getOutTradeNo());
12
13 // 1. 上下文获取数据
14 MarketPayOrderEntity marketPayOrderEntity = dynamicContext.getMarketPayOrderEntity();
15
16 // 2. 查询拼团对象
17 GroupBuyTeamEntity groupBuyTeamEntity = repository.queryGroupBuyTeamByTeamId(marketPayOrderEntity.getTeamId());
18
19 // 3. 外部交易时间 - 支付时间需要在拼团有效时间内完成
20 if (requestParameter.getOutTradeTime().after(groupBuyTeamEntity.getValidEndTime())){
21 // 4. 判断,外部交易时间是否小于拼团结束时间
22 log.info("订单交易时间不在拼团有效时间范围内");
23 throw new AppException(ResponseCode.E0106);
24 }
25
26 // 5. 设置上下文
27 dynamicContext.setGroupBuyTeamEntity(groupBuyTeamEntity);
28
29 return next(requestParameter, dynamicContext);
30 }
31}
4 打包节点
1@Slf4j
2@Service
3public class EndRuleFilter implements ILogicHandler<TradeSettlementRuleCommandEntity,
4 TradeSettlementRuleFilterFactory.DynamicContext, TradeSettlementRuleFilterBackEntity> {
5 @Override
6 public TradeSettlementRuleFilterBackEntity apply(TradeSettlementRuleCommandEntity requestParameter, TradeSettlementRuleFilterFactory.DynamicContext dynamicContext) throws Exception {
7 log.info("结算规则过滤-结束节点{} outTradeNo:{}", requestParameter.getUserId(), requestParameter.getOutTradeNo());
8
9 GroupBuyTeamEntity groupBuyTeamEntity = dynamicContext.getGroupBuyTeamEntity();
10
11 return TradeSettlementRuleFilterBackEntity.builder()
12 .teamId(groupBuyTeamEntity.getTeamId())
13 .activityId(groupBuyTeamEntity.getActivityId())
14 .completeCount(groupBuyTeamEntity.getCompleteCount())
15 .targetCount(groupBuyTeamEntity.getTargetCount())
16 .validStartTime(groupBuyTeamEntity.getValidStartTime())
17 .validEndTime(groupBuyTeamEntity.getValidEndTime())
18 .lockCount(groupBuyTeamEntity.getLockCount())
19 .status(groupBuyTeamEntity.getStatus())
20 .build();
21 }
22}
5 SC来源渠道黑名单动态配置
1@Slf4j
2@Service
3public class DCCService
4
5@DCCValue("scBlacklist:s02c02") // 默认黑名单渠道
6private String scBlacklist;
7
8public Boolean isSCBlackIntercept(String source, String channel) {
9 List<String> list = Arrays.asList(scBlacklist.split(Constants.SPLIT));
10 return list.contains(source + channel);
11}
12
13// Http请求key=scBlacklist&value=s02c02;s03c03
14
15// => Redis (scBlacklist) => scBlacklist,s02c02;s03c03
第2-14节:拼团回调通知任务
⏱️25/4/9 15:32
拼团组队交易结算完结后,实现一个回调通知的任务处理。告知另外的微服务系统可以进行后续的流程了。
RPC、MQ,这一类的都是需要有一个公用的注册中心,它的技术架构比较适合于公司内部的统一系统使用。如果是有和外部其他系统的对接,通常我们会使用 HTTP 这样统一标准协议的接口进行使用。
注意:微信支付,支付宝支付,也是在完成支付后,做的这样的回调处理。
💡流程逻辑 1:锁单
1// 1. 检查锁单请求信息 - 请求参数是否为Null or 空串
2// 2. 检查outTradeNo是否重复锁单 - 检查检查outTradeNo是否重复锁单是否存在
3// 3. 拼团是否能够加入 - 判断拼团当前人数小于目标人数
4// 4. 营销优惠试算
5// 4.1. RootNode => SwitchRoot => MarketNode => TagNode => EndNode
6// => ErrorNode
7// 4.1.1 RootNode检查信息
8// 4.1.2 SwitchRoot 降级和限流
9// 4.1.3 MarketNode 1:异步查询商品信息+活动信息 2:进行折扣验算(根据Map拿到对应的折扣对象 - 每个折扣对象实现同一接口)
10// 4.1.4-1 TagNode:根据bitset,检查用户Id是否有资格参与or可见该折扣活动
11// 4.1.4-2 ErrorNode:无折扣活动返回
12// 4.1.5 EndNode 组装试算结果
13// 5. 根据试算结果判断是否可见和可参与
14// 6. 进行锁单
15// 6.1. 交易规则检查 - 校验活动状态+活动时间 && 校验参与次数 -责任链
16// LinkArmory(BusinessLinkedList:ActivityUsabilityRuleFilter+UserTakeLimitRuleFilter)
17// 6.1.1 ActivityUsabilityRuleFilter: 检查活动状态和时间
18// 6.1.2 UserTakeLimitRuleFilter: 检查用户参与次数
19// 6.2. 判断是否有团 - 生成拼团订单 or 加入别人的订单 => 插入or修改记录 GroupBuyOrder
20// 6.3. 生成订单明细 => 插入记录到GroupBuyOrderList
💡流程逻辑 2:结算
1// 1. 检查拼团信息 - 结算规则
2// 1.1 检查 渠道合法性=>外部订单合法性=>拼团时间合法性=>包装检查结果
3// 1.1.1 SCRuleFilter: 检查source和channel是否在黑名单中,@DCCValue("scBlacklist:s02c02"), 可动态配置
4// 1.1.2 OutTradeNoRuleFilter:检查OutTradeNo是否合法,是否GroupBuyOrderList存在初始锁单的拼团订单
5// (这里,外部进行了支付,但是系统还没修改结算状态呢)
6// 1.1.3 SettableRuleFilter:检查订单交易时间是否在拼团有效时间内
7// 1.1.4 EndRuleFilter:包装结果
8// 2. 进行拼团结算
9// 2.1 更新拼团订单明细 => GroupBuyOrderList对应用户的完成状态
10// 2.2 更新拼团达成数量 => GroupBuyOrder对应的用户完成支付数量
11// 2.3 更新拼团完成状态 => 最后一个人支付完成触发 => GroupBuyOrder拼团信息状态 => 写入回调任务记录(包装TeamId+所有的外部订单ID)
12// 3. 组队回调处理 - 这里显示的执行,使得实时性比较好,只有最后一个人完成才会执行成功,不过有定时任务进行兜底
13// 3.1 发送Http远程调用,将(TeamId+所有的外部订单ID)发送给第三方服务,通知发货or其他
14// 3.2 定时任务:查询所有初始状态和重试状态的 通知任务
🏷️本章节完成 3 触发回调部分
1// 指定触发回调通知任务 - 实时性
2@Override
3public Map<String, Integer> execSettlementNotifyJob(String teamId) throws Exception {
4 log.info("拼团交易-执行结算通知回调,指定 teamId:{}", teamId);
5
6 List<NotifyTaskEntity> notifyTaskEntityList = repository.queryUnExecutedNotifyTaskList(teamId);
7 return execSettlementNotifyJob(notifyTaskEntityList);
8}
9
10// 辅助定时扫描,进行通知 - 兜底
11@Override
12public Map<String, Integer> execSettlementNotifyJob() throws Exception {
13 log.info("拼团交易-执行结算通知任务");
14
15 // 查询未执行任务
16 List<NotifyTaskEntity> notifyTaskEntities = repository.queryUnExecutedNotifyTaskList();
17
18 return execSettlementNotifyJob(notifyTaskEntities);
19}
20
21// 公用通知函数
22private Map<String, Integer> execSettlementNotifyJob(List<NotifyTaskEntity> notifyTaskEntities) throws Exception {
23 int successCount = 0, errorCount = 0, retryCount = 0;
24 for (NotifyTaskEntity notifyTask : notifyTaskEntities) {
25 // 回调处理
26 String response = port.groupBuyNotify(notifyTask);
27
28 if (NotifyTaskHTTPEnumVO.SUCCESS.getCode().equals(response)) {
29 int i = repository.updateNotifyTaskStatusSuccess(notifyTask.getTeamId());
30 if (1 == i) {
31 successCount += 1;
32 }
33 } else if (NotifyTaskHTTPEnumVO.ERROR.getCode().equals(response)) {
34 int i = repository.updateNotifyTaskStatusError(notifyTask.getTeamId());
35 if (1 == i) {
36 errorCount += 1;
37 }
38 } else {
39 int i = repository.updateNotifyTaskStatusRetry(notifyTask.getTeamId());
40 if (1 == i) {
41 retryCount += 1;
42 }
43 }
44 }
45
46 Map<String, Integer> resultMap = new HashMap<>();
47 resultMap.put("waitCount", notifyTaskEntities.size());
48 resultMap.put("successCount", successCount);
49 resultMap.put("errorCount", errorCount);
50 resultMap.put("retryCount", retryCount);
51
52 return resultMap;
53}
54
55// 远程调用
56@Override
57public String groupBuyNotify(NotifyTaskEntity notifyTask) throws Exception {
58 RLock lock = redisService.getLock(notifyTask.lockKey());
59 try {
60 // 拼团服务器部署在多台应用服务器上,多任务执行,避免任务被多次执行,锁的粒度 xxx:teamId
61 if (lock.tryLock(3, 0, TimeUnit.SECONDS)) {
62 try {
63 if (StringUtils.isBlank(notifyTask.getNotifyUrl()) || "暂无".equals(notifyTask.getNotifyUrl())) {
64 return NotifyTaskHTTPEnumVO.SUCCESS.getCode();
65 }
66 return ⭐groupBuyNotifyService.groupBuyNotify(notifyTask.getNotifyUrl(), notifyTask.getParameterJson());⭐ // 这里进行远程调用
67 } finally {
68 if (lock.isLocked() && lock.isHeldByCurrentThread())
69 lock.unlock();
70 }
71 }
72 return NotifyTaskHTTPEnumVO.NULL.getCode();
73 }catch (Exception e) {
74 Thread.currentThread().interrupt();
75 return NotifyTaskHTTPEnumVO.NULL.getCode();
76 }
77}
78
79// OKHttp发送请求
80public String groupBuyNotify(String apiUrl, String notifyRequestDTOJSON) {
81 try {
82 // 1. 构建参数
83 MediaType mediaType = MediaType.parse("application/json");
84 RequestBody body = RequestBody.create(mediaType, notifyRequestDTOJSON);
85 Request request = new Request.Builder()
86 .url(apiUrl)
87 .post(body)
88 .addHeader("content-type", "application/json")
89 .build();
90
91 // 2. 调用接口
92 Response response = okHttpClient.newCall(request).execute();
93
94 // 3. 返回结果
95 return response.body().string();
96 } catch (IOException e) {
97 log.info("拼团回调 HTTP 接口服务异常 {}", apiUrl, e);
98 throw new AppException(ResponseCode.HTTP_EXCEPTION);
99 }
100}
微信扫描登录流程

access_token是公众号的全局唯一接口调用凭据; 公众号和小程序均可以使用AppID和AppSecret调用本接口来获取access_token。
- 用户点击Web登录 => 商户后端 => 获取公众号登录凭证(请求微信二维码接口,获取对应的ticket,用于获取微信公众号的二维码)
- 然后,前端根据ticket去微信系统获取对应的二维码,进行登录
- 登录成功后,微信会回调公众号对应的回调接口,这时候,后端可以保存用户信息,比如<ticket, openid> (openid标识用户)
- 前端再拿ticket进行登录校验,登录成功后,返回Token令牌就好了。
⭐
- 需要实现请求二维码登录凭证的Api接口;
- 需要实现微信登录回调的Api接口,保存用户登录信息;
- 需要实现用户登录校验的接口,生成Token令牌,后续请求用这个。
微信支付流程

- 请求下单接口 => 后端创建订单(创建) => 请求微信支付系统(创建支付订单),后端订单(待付款);最后返回拉起支付的参数;

- 小程序拉起微信收银台

- 回调小程序支付状态,小程序查询商户后端订单状态 => 后端查询微信支付系统 => … => 返回支付状态
- 用户支付成功,会回调通知商户后端。⭐需要实现回调的接口,保持支付通知(改后端订单状态: 已支付)
- 未收到回调通知,需要商户后端主动查询微信支付的查询接口。
- 超时 => 商户后端主动调用微信关单接口 (订单状态:支付失败|超时)
- 退款 => 商户后端请求微信支付退款接口 (订单状态:退款)
⭐
需要实现的接口:
- 商户后端创建初始订单,并请求微信支付生成支付订单的Api接口;(检查订单幂等性)
- 接收微信支付回调的Api接口,修改订单支付状态;
- 主动查询订单支付状态的接口,调用微信支付的查询接口。
- 超时 => 商户后端主动调用微信关单接口 (订单状态:支付失败|超时)的Api接口
- 退款 => 商户后端请求微信支付退款接口 (订单状态:退款)的Api接口
第2-15节:根据UI展示封装接口
-
紫色圈:拼团的统计信息;
-
灰色圈:商品信息;
-
红色圈:参与拼团,随机挑选几个团;
-
绿色圈:参与拼团;单独开始一个新的团,还是参与其他人的团
-
黄色圈:拼团结算。
ResponseDTO
1@Data
2@Builder
3@AllArgsConstructor
4@NoArgsConstructor
5public class GoodsMarketResponseDTO {
6
7 private Goods goods;
8 private List<Team> teamList;
9 private TeamStatistic teamStatistic;
10
11 /**
12 * 商品信息
13 */
14 @Data
15 @Builder
16 @AllArgsConstructor
17 @NoArgsConstructor
18 public static class Goods {
19 // 商品ID
20 private String goodsId;
21 // 原始价格
22 private BigDecimal originalPrice;
23 // 折扣金额
24 private BigDecimal deductionPrice;
25 // 支付价格
26 private BigDecimal payPrice;
27 }
28
29 /**
30 * 组队信息
31 */
32 @Data
33 @Builder
34 @AllArgsConstructor
35 @NoArgsConstructor
36 public static class Team {
37 // 用户ID
38 private String userId;
39 // 拼单组队ID
40 private String teamId;
41 // 活动ID
42 private Long activityId;
43 // 目标数量
44 private Integer targetCount;
45 // 完成数量
46 private Integer completeCount;
47 // 锁单数量
48 private Integer lockCount;
49 // 拼团开始时间 - 参与拼团时间
50 private Date validStartTime;
51 // 拼团结束时间 - 拼团有效时长
52 private Date validEndTime;
53 // 倒计时(字符串) validEndTime - validStartTime
54 private String validTimeCountdown;
55 /** 外部交易单号-确保外部调用唯一幂等 */
56 private String outTradeNo;
57
58 public static String differenceDateTime2Str(Date validStartTime, Date validEndTime) {
59 if (validStartTime == null || validEndTime == null) {
60 return "无效的时间";
61 }
62
63 long diffInMilliseconds = validEndTime.getTime() - validStartTime.getTime();
64
65 if (diffInMilliseconds < 0) {
66 return "已结束";
67 }
68
69 long seconds = TimeUnit.MILLISECONDS.toSeconds(diffInMilliseconds) % 60;
70 long minutes = TimeUnit.MILLISECONDS.toMinutes(diffInMilliseconds) % 60;
71 long hours = TimeUnit.MILLISECONDS.toHours(diffInMilliseconds) % 24;
72 long days = TimeUnit.MILLISECONDS.toDays(diffInMilliseconds);
73
74 return String.format("%02d:%02d:%02d", hours, minutes, seconds);
75 }
76
77 }
78
79
80 /**
81 * 组队统计
82 */
83 @Data
84 @Builder
85 @AllArgsConstructor
86 @NoArgsConstructor
87 public static class TeamStatistic {
88 // 开团队伍数量
89 private Integer allTeamCount;
90 // 成团队伍数量
91 private Integer allTeamCompleteCount;
92 // 参团人数总量 - 一个商品的总参团人数
93 private Integer allTeamUserCount;
94 }
95
96}
1// 增删改查
2// 注意:Team的查询
3// 1. 先查自己的拼团
4// 2. 查其他人的拼团。对应活动,非当前用户,状态为可加入拼团,时间有效,队伍in(对应活动,且锁单人数小于目标人数)
5
6@Override
7public List<UserGroupBuyOrderDetailEntity> queryInProgressUserGroupBuyOrderDetailListByRandom(Long activityId, String userId, Integer randomCount) {
8 // 1. 根据用户ID、活动ID、查询非当前用户参与的拼团队伍
9 GroupBuyOrderList buyOrderListReq = GroupBuyOrderList.builder()
10 .activityId(activityId)
11 .userId(userId)
12 .build();
13 buyOrderListReq.setCount(randomCount * 2); // 查询两倍的量,再随机取一半
14
15 // 1. ==ActivityId; 2. !=userId 3; status == 0 or 1; 4. endTime > now(); 5. teamId in (select teamId from group_buy_order where status = 0 and activity = #{activity})
16 List<GroupBuyOrderList> groupBuyOrderLists = groupBuyOrderListDao.queryInProgressUserGroupBuyOrderDetailListByRandom(buyOrderListReq);
17 if (groupBuyOrderLists == null || groupBuyOrderLists.isEmpty())
18 return null;
19
20 // 随机选 randomCount 条
21 if (groupBuyOrderLists.size() > randomCount) {
22 Collections.shuffle(groupBuyOrderLists);
23 groupBuyOrderLists = groupBuyOrderLists.subList(0, randomCount);
24 }
25
26 // 2. 过滤队伍获取 TeamId
27 Set<String> teamIds = groupBuyOrderLists.stream().map(GroupBuyOrderList::getTeamId)
28 .filter(teamId -> teamId != null && !teamId.isEmpty())
29 .collect(Collectors.toSet());
30
31
32 // 3. 查询队伍明细,组装Map结构
33 List<GroupBuyOrder> groupBuyOrders = groupBuyOrderDao.queryGroupBuyProgressByTeamIds(teamIds);
34 if (groupBuyOrders == null || groupBuyOrders.isEmpty()) return null;
35
36 Map<String, GroupBuyOrder> groupBuyOrderMap = groupBuyOrders.stream()
37 .collect(Collectors.toMap(GroupBuyOrder::getTeamId, order -> order));
38
39 // 4. 转换数据
40 ArrayList<UserGroupBuyOrderDetailEntity> userGroupBuyOrderDetailEntities = new ArrayList<>();
41 // ... 根据 groupBuyOrders 和 groupBuyOrderLists组装结果
42
43 return userGroupBuyOrderDetailEntities;
44}
1// 锁单 接口
2⬇️
3// outTradeNo - 调用第三方支付生成
4⬇️
5// 结算 接口
第2-16节 独占锁和无锁化
一个库存一个锁,最小化锁的粒度,使用occupyCount + recoveryCount 判断库存。因为之前的线程抢占了库存,但是执行失败,需要恢复被抢占的库存量。

**独占锁:**多个服务均有定时任务,仅一个服务执行即可。
1@Scheduled(cron = "0 0 0 * * ?")
2public void exec() {
3 RLock lock = redissonClient.getLock("group_buy_market_notify_job_exec");
4 try {
5 // 等待时间(若被占用等待3秒),锁的过期时间(0表示需要手动释放)
6 boolean isLocked = lock.tryLock(3, 0, TimeUnit.SECONDS);
7 if (!isLocked) return;
8
9 Map<String, Integer> result = tradeSettlementOrderService.execSettlementNotifyJob();
10 log.info("定时任务,回调通知拼团完结任务 result:{}", JSON.toJSONString(result));
11 } catch (Exception e) {
12 log.error("定时任务,回调通知拼团完结任务失败", e);
13 } finally {
14 if (lock.isLocked() && lock.isHeldByCurrentThread()) {
15 lock.unlock();
16 }
17 }
18}
乐观锁:
将库存的隔离进行分段化,一个库存表示一个锁
1// 拼团人数检查-是否已满
2// 1. teamId 为空,则为首次开团,不做拼团组队目标量库存限制
3String teamId = requestParameter.getTeamId();
4if (StringUtils.isBlank(teamId)) {
5 return TradeLockRuleFilterBackEntity.builder()
6 .userTakeOrderCount(dynamicContext.getUserTakeOrderCount())
7 .build();
8}
9
10// 2. 抢占库存;通过抢占 Redis 缓存库存,来降低对数据库的操作压力。
11// 上下文信息
12GroupBuyActivityEntity groupBuyActivity = dynamicContext.getGroupBuyActivity();
13Integer target = groupBuyActivity.getTarget();
14Integer validTime = groupBuyActivity.getValidTime();
15String teamStockKey = dynamicContext.generateTeamStockKey(teamId);
16String recoveryTeamStockKey = dynamicContext.generateRecoveryTeamStockKey(teamId);
17
18// 抢占库存
19boolean status = repository.occupyTeamStock(teamStockKey, recoveryTeamStockKey, target, validTime);
抢占库存
1public boolean occupyTeamStock(String teamStockKey, String recoveryTeamStockKey, Integer target, Integer validTime) {
2 // 失败恢复量
3 Long recoveryCount = redisService.getAtomicLong(recoveryTeamStockKey);
4 recoveryCount = null == recoveryCount ? 0 : recoveryCount;
5
6 // 1. incr 得到值,与总量和恢复量做对比。恢复量为系统失败时候记录的量。
7 // 2. 从有组队量开始,相当于已经有了一个占用量,所以要 +1
8 long occupy = redisService.incr(teamStockKey) + 1;
9
10 if (occupy >= target + recoveryCount) {
11 redisService.setAtomicLong(teamStockKey, target);
12 return false;
13 }
14
15 // 1. 给每个产生的值加锁为兜底设计,虽然incr操作是原子的,基本不会产生一样的值。但在实际生产中,遇到过集群的运维配置问题,以及业务运营配置数据问题,导致incr得到的值相同。
16 // 2. validTime + 60分钟,是一个延后时间的设计,让数据保留时间稍微长一些,便于排查问题。
17 String lockKey = teamStockKey + Constants.UNDERLINE + occupy;
18 Boolean lock = redisService.setNx(lockKey, validTime + 60, TimeUnit.MINUTES);
19
20 if (!lock) {
21 log.info("组队库存加锁失败 {}", lockKey);
22 }
23
24 return lock;
25}
恢复库存 <= 业务执行失败,需要进行库存的恢复!
1@Override
2public void recoveryTeamStock(String recoveryTeamStockKey, Integer validTime) {
3 // 首次组队拼团,是没有 teamId 的,所以不需要这个做处理。
4 if (StringUtils.isBlank(recoveryTeamStockKey)) return;
5 redisService.incr(recoveryTeamStockKey);
6}