场景1

订单中心接收MQ下发出库信息

 (中台接收云POS的出库信息BMS125,写到库存移动单据表)

存在问题

MQ重复下发,导致重复消费

解决方案

redis分布式锁

加锁维度key的定义

1,MQ key

2,下发内容,如订单号

具体处理方案

@Autowired
private ValueOperations<String, String> valueOperations;

1,通过key获取value

String value = (String)this.valueOperations.get(key);

2,存在,return,即已消费,流程结束

3,不存在,设置key,过期时间10天

redisUtils.set(key, value,expired);

4,执行业务逻辑

5,在catch中删除key,释放锁

} catch (Exception e) {
    redisUtils.delete(key);
    log.error("");
    throw new OrderCenterException("");
}

思考,以上方案为了解决什么问题

以上解决方案为了解决什么问题,应该就是避免重复消费的问题,只要正常消费一次后(无异常,不走catch),后面10天内,再下发相同的订单,不再处理。

关于过期时间10天,为什么10天,这个具体时间,根据实际业务场景定的。10天以后,就不可能再次下发,或者即便下发了,继续处理,不管是否重复的问题了。

分布式锁漏洞

1,加锁与设置过期时间非原子操作,可能加锁失败,导致永不过期。代码如下

public void set(String key, Object value, long expire) {
    this.valueOperations.set(key, JsonUtils.toJson(value));
    if (expire != -1L) {
        this.expire(key, expire, TimeUnit.SECONDS);
    }

}

2,在catch中释放锁,如果加锁后,服务挂了,导致永不过期

针对以上问题解决方案

1,使用setNX

2,只有通过准备控制过期时间,来自动释放锁了。(注意在finally中释放锁,解决的程序异常时,无法释放锁的问题,这里catch中释放锁刚好解决了异常无法释放锁的问题

场景2

手动创建交车计划单

存在问题

前端重复提交

解决方案

如果重复提交,提示 - 请勿重复提交

redission

@Autowired
private RedissonClient redissonClient;
String workOrderNo = reqDTO.getWorkOrderNo();
RLock rLock = redissonClient.getLock(Constant.DELIVERY_PLAN_CREATE_WORK_ORDER_LOCK + workOrderNo);

try {
    boolean isLocked = rLock.tryLock();
    if (!isLocked) {
        throw new LeaseServiceException(ErrConstant.OPERATION_FAILED, "请勿重复提交");
    }

         // todo业务逻辑

} finally {
    if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
        rLock.unlock();
    }
}

场景3

订单order创建及状态流转

系统流转

DOL ->调度中心 -> order(状态初始化及变更)

task -> order(调度生效,发运,到达,完成,取消)

订单变化,分两种

1,状态初始化及变更(DOL->dps->order),如DOL下发创建,DOL下发修改

2,状态流转(task->order),司机操作运单,状态回传/流转

调度生效,发运,到达,完成,取消

-------------------------------------------------------

为了解决什么问题

具体思考一下,如果不加分布式会存在什么问题,加了,为了解决什么问题

具体场景

1,并发情况下,比如有20个订单同时过来,不加锁,同时触发状态机,都执行了业务逻辑,

有问题吗,好像也没问题

2,重复下发(同时下发),比如由于上游没控制好,同一个订单,同时下发5次

如果不加锁,会有问题吗,有,订单重复创建了

3,重复下发(不是同时下发,过了一段时间再次下发),具体场景比如,order服务异常了,

esb->order,订单消息消费失败了,要在esb上重试,重复下发,这个时候,就不该控制幂等,应该正常处理。

所以,加分布式锁,为了解决上面第二种情况,同时避免影响第三中情况。

自己如何实现

Boolean lock = setNx

if(lock)

// todo

finally{

if(lock){unLock}

}

相同订单下发时,第一个setNx成功,处理业务逻辑,未删除

后面几个订单下发时,setNx是失败的,也就不会处理业务,正常的。

分布式锁具体怎么用的

1,加锁维度,订单号

2,触发状态机时,当发生如上状态变更,如下状态改变事件时

/**
 * 订单状态改变事件
 */
public enum OrderStatusChangeEvent {
    //取消
    CANCEL,
    //创建订单
    CREATE_ORDER,
    //调度
    DISPATCH,
    //司机接单
    RECEIVING_ORDERS,
    //出发
    SET_OUT,
    //到达目的地
    ARRIVE,
    //dol下发修改订单
    DOL_UPDATE,
    //确认完成
    FINSH;
}

代码如下

1,事件 - > 触发点

@Override
public void fire(OrderStatusChangeEvent event, OrderContext context) {
    OrderInfoDTO orderInfoDTO = context.getOrderInfoDTO();
    UntypedStateMachine stateMachine = stateMachineBuilder.newUntypedStateMachine(
            orderInfoDTO.getStatus(),
            applicationContext);
    String ylOrderNo = orderInfoDTO.getYlOrderNo();
    String outOrderNo = orderInfoDTO.getOutOrderNo();
    String lockName = YL_ORDER_NO_LOCK + ylOrderNo;
    if (StringUtils.isBlank(ylOrderNo)) {
        lockName = YL_ORDER_OUT_NO_LOCK + outOrderNo;
    }
    distributedLockFire("ESB订单状态流转分布式锁",lockName,event,context,stateMachine);
}

2, 状态机触发点

stateMachine.fire(event, context);

protected void distributedLockFire(String handlerName, String lockName, R event, E context, UntypedStateMachine stateMachine, RedisTemplate redis) {
    if (ToolUtil.isOneEmpty(handlerName, lockName, event, context, stateMachine)) {
        throw new OrderServiceException(OrderServiceErrorCode.LOCK_PARAMETER_NOT_FOUND);
    }
    if (null == redis) {
        redis = redisTemplate;
    }
    NewRedisDistributedLock newRedisDistributedLock = new NewRedisDistributedLock(redis);
    RedisLock lock = newRedisDistributedLock.getLock(lockName, 60L);
    try {
        if (lock != null) {
            logger.debug("分布式锁处理器开始执行[{}},{}]", handlerName, lockName);
            stateMachine.fire(event, context);
        }
    } finally {
        logger.debug("分布式锁处理器执行[{},{}]结束", handlerName, lockName);
        assert lock != null;
        lock.unlock();
    }
}

解锁

判断解的是自己的锁,获取锁的值


        @Override
        public void unlock() {
            List<String> keys = Collections.singletonList(this.key);
            NewRedisDistributedLock.logger.debug("线程[{}]开始释放锁Hash值[{}]", Thread.currentThread().getName(), this.expectedValue);
            Long result = (Long)this.stringRedisTemplate.execute(new DefaultRedisScript(
                    "if redis.call('get',KEYS[1]) == ARGV[1]\nthen\n    " +
                            "return redis.call('del',KEYS[1])\nelse\n   " +
                            " return 0\nend", Long.class), keys, new Object[]{this.expectedValue});
            if (result == 0L) {
                NewRedisDistributedLock.logger.error("释放锁失败!Hash[{}]", this.expectedValue);
            }

        }

3,锁的实现方案

    public RedisLock getLock(String key, long expireSeconds) {
        return this.getLock(key, expireSeconds, 480, 8L);
    }

    public RedisLock getLock(final String key, final long expireSeconds, int maxRetryTimes, long retryIntervalTimeMillis) {
        final String value = UUID.randomUUID().toString();

        for(int i = 0; i < maxRetryTimes; ++i) {
            String status = (String)this.stringRedisTemplate.execute(new RedisCallback<String>() {
                @Override
                public String doInRedis(RedisConnection connection) throws DataAccessException {
                    Jedis jedis = (Jedis)connection.getNativeConnection();
                    String status = jedis.set(key, value, "nx", "ex", expireSeconds);
                    return status;
                }
            });
            if ("OK".equals(status) && value.equals(this.stringRedisTemplate.opsForValue().get(key))) {
                logger.debug("线程[{}]获取了锁,Hash值[{}],尝试次数[{}]", Thread.currentThread().getName() + " :" + value, i + 1);
                return new RedisLockInner(this.stringRedisTemplate, key, value);
            }

            try {
                if (retryIntervalTimeMillis > 0L) {
                    Thread.sleep(retryIntervalTimeMillis);
                } else {
                    Thread.sleep(10L);
                }
            } catch (InterruptedException var11) {
                logger.error("线程[{}]中断,竞争锁失败,Hash值[{}]", Thread.currentThread(), value);
                break;
            }

            if (Thread.currentThread().isInterrupted()) {
                break;
            }
        }

        logger.debug("线程[{}]竞争锁失败,Hash值[{}],尝试次数[{}]", Thread.currentThread().getName() + " :" + value, maxRetryTimes);
        return null;
    }

实现方案与实际场景思考

代码中增加了重试机制,为什么加,为了解决什么问题

猜测

1,并发重复下发时,排队处理,目的让所有相同订单都处理。但显然不符合实际业务场景。为什么没有问题呢,重试机制是,重试次数480,频率8毫秒。总耗时480*8=3S,如果业务执行时间大于3s,其他订单重试是失败的,因为在此期间,锁未释放

但是,3s是一个比较长的时间了,业务执行时间如果小于3s,是可以成功的,这会导致,重复订单,重复消费。

那为什么没有重复消费呢,原因可能只是没有并发下发重复订单而已。

总结:显然不是为了,可以重复消费,重复下发的订单

2,由于某种原因,获取锁失败了

需要重试,再次获得锁。失败原因,排除setNx。

redisson

解决了什么问题

1,锁续期问题

Logo

魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。

更多推荐