redis分布式锁应用场景
订单中心接收MQ下发出库信息(中台接收云POS的出库信息BMS125,写到库存移动单据表)
场景1
订单中心接收MQ下发出库信息
(中台接收云POS的出库信息BMS125,写到库存移动单据表)
存在问题
MQ重复下发,导致重复消费
解决方案
redis分布式锁
加锁维度key的定义
1,MQ key
2,下发内容,如订单号
具体处理方案
@Autowired private ValueOperations<String, String> valueOperations;
1,通过key获取value
String value = (String)this.valueOperations.get(key);
2,存在,return,即已消费,流程结束
3,不存在,设置key,过期时间10天
redisUtils.set(key, value,expired);
4,执行业务逻辑
5,在catch中删除key,释放锁
} catch (Exception e) {
redisUtils.delete(key);
log.error("");
throw new OrderCenterException("");
}
思考,以上方案为了解决什么问题
以上解决方案为了解决什么问题,应该就是避免重复消费的问题,只要正常消费一次后(无异常,不走catch),后面10天内,再下发相同的订单,不再处理。
关于过期时间10天,为什么10天,这个具体时间,根据实际业务场景定的。10天以后,就不可能再次下发,或者即便下发了,继续处理,不管是否重复的问题了。
分布式锁漏洞
1,加锁与设置过期时间非原子操作,可能加锁失败,导致永不过期。代码如下
public void set(String key, Object value, long expire) {
this.valueOperations.set(key, JsonUtils.toJson(value));
if (expire != -1L) {
this.expire(key, expire, TimeUnit.SECONDS);
}
}
2,在catch中释放锁,如果加锁后,服务挂了,导致永不过期
针对以上问题解决方案
1,使用setNX
2,只有通过准备控制过期时间,来自动释放锁了。(注意在finally中释放锁,解决的程序异常时,无法释放锁的问题,这里catch中释放锁刚好解决了异常无法释放锁的问题
场景2
手动创建交车计划单
存在问题
前端重复提交
解决方案
如果重复提交,提示 - 请勿重复提交
redission
@Autowired private RedissonClient redissonClient;
String workOrderNo = reqDTO.getWorkOrderNo();
RLock rLock = redissonClient.getLock(Constant.DELIVERY_PLAN_CREATE_WORK_ORDER_LOCK + workOrderNo);
try {
boolean isLocked = rLock.tryLock();
if (!isLocked) {
throw new LeaseServiceException(ErrConstant.OPERATION_FAILED, "请勿重复提交");
}
// todo业务逻辑
} finally {
if (rLock.isLocked() && rLock.isHeldByCurrentThread()) {
rLock.unlock();
}
}
场景3
订单order创建及状态流转
系统流转
DOL ->调度中心 -> order(状态初始化及变更)
task -> order(调度生效,发运,到达,完成,取消)
订单变化,分两种
1,状态初始化及变更(DOL->dps->order),如DOL下发创建,DOL下发修改
2,状态流转(task->order),司机操作运单,状态回传/流转
调度生效,发运,到达,完成,取消
-------------------------------------------------------
为了解决什么问题
具体思考一下,如果不加分布式会存在什么问题,加了,为了解决什么问题
具体场景
1,并发情况下,比如有20个订单同时过来,不加锁,同时触发状态机,都执行了业务逻辑,
有问题吗,好像也没问题
2,重复下发(同时下发),比如由于上游没控制好,同一个订单,同时下发5次
如果不加锁,会有问题吗,有,订单重复创建了
3,重复下发(不是同时下发,过了一段时间再次下发),具体场景比如,order服务异常了,
esb->order,订单消息消费失败了,要在esb上重试,重复下发,这个时候,就不该控制幂等,应该正常处理。
所以,加分布式锁,为了解决上面第二种情况,同时避免影响第三中情况。
自己如何实现
Boolean lock = setNx
if(lock)
// todo
finally{
if(lock){unLock}
}
相同订单下发时,第一个setNx成功,处理业务逻辑,未删除
后面几个订单下发时,setNx是失败的,也就不会处理业务,正常的。
分布式锁具体怎么用的
1,加锁维度,订单号
2,触发状态机时,当发生如上状态变更,如下状态改变事件时
/**
* 订单状态改变事件
*/
public enum OrderStatusChangeEvent {
//取消
CANCEL,
//创建订单
CREATE_ORDER,
//调度
DISPATCH,
//司机接单
RECEIVING_ORDERS,
//出发
SET_OUT,
//到达目的地
ARRIVE,
//dol下发修改订单
DOL_UPDATE,
//确认完成
FINSH;
}
代码如下
1,事件 - > 触发点
@Override
public void fire(OrderStatusChangeEvent event, OrderContext context) {
OrderInfoDTO orderInfoDTO = context.getOrderInfoDTO();
UntypedStateMachine stateMachine = stateMachineBuilder.newUntypedStateMachine(
orderInfoDTO.getStatus(),
applicationContext);
String ylOrderNo = orderInfoDTO.getYlOrderNo();
String outOrderNo = orderInfoDTO.getOutOrderNo();
String lockName = YL_ORDER_NO_LOCK + ylOrderNo;
if (StringUtils.isBlank(ylOrderNo)) {
lockName = YL_ORDER_OUT_NO_LOCK + outOrderNo;
}
distributedLockFire("ESB订单状态流转分布式锁",lockName,event,context,stateMachine);
}
2, 状态机触发点
stateMachine.fire(event, context);
protected void distributedLockFire(String handlerName, String lockName, R event, E context, UntypedStateMachine stateMachine, RedisTemplate redis) {
if (ToolUtil.isOneEmpty(handlerName, lockName, event, context, stateMachine)) {
throw new OrderServiceException(OrderServiceErrorCode.LOCK_PARAMETER_NOT_FOUND);
}
if (null == redis) {
redis = redisTemplate;
}
NewRedisDistributedLock newRedisDistributedLock = new NewRedisDistributedLock(redis);
RedisLock lock = newRedisDistributedLock.getLock(lockName, 60L);
try {
if (lock != null) {
logger.debug("分布式锁处理器开始执行[{}},{}]", handlerName, lockName);
stateMachine.fire(event, context);
}
} finally {
logger.debug("分布式锁处理器执行[{},{}]结束", handlerName, lockName);
assert lock != null;
lock.unlock();
}
}
解锁
判断解的是自己的锁,获取锁的值
@Override
public void unlock() {
List<String> keys = Collections.singletonList(this.key);
NewRedisDistributedLock.logger.debug("线程[{}]开始释放锁Hash值[{}]", Thread.currentThread().getName(), this.expectedValue);
Long result = (Long)this.stringRedisTemplate.execute(new DefaultRedisScript(
"if redis.call('get',KEYS[1]) == ARGV[1]\nthen\n " +
"return redis.call('del',KEYS[1])\nelse\n " +
" return 0\nend", Long.class), keys, new Object[]{this.expectedValue});
if (result == 0L) {
NewRedisDistributedLock.logger.error("释放锁失败!Hash[{}]", this.expectedValue);
}
}
3,锁的实现方案
public RedisLock getLock(String key, long expireSeconds) {
return this.getLock(key, expireSeconds, 480, 8L);
}
public RedisLock getLock(final String key, final long expireSeconds, int maxRetryTimes, long retryIntervalTimeMillis) {
final String value = UUID.randomUUID().toString();
for(int i = 0; i < maxRetryTimes; ++i) {
String status = (String)this.stringRedisTemplate.execute(new RedisCallback<String>() {
@Override
public String doInRedis(RedisConnection connection) throws DataAccessException {
Jedis jedis = (Jedis)connection.getNativeConnection();
String status = jedis.set(key, value, "nx", "ex", expireSeconds);
return status;
}
});
if ("OK".equals(status) && value.equals(this.stringRedisTemplate.opsForValue().get(key))) {
logger.debug("线程[{}]获取了锁,Hash值[{}],尝试次数[{}]", Thread.currentThread().getName() + " :" + value, i + 1);
return new RedisLockInner(this.stringRedisTemplate, key, value);
}
try {
if (retryIntervalTimeMillis > 0L) {
Thread.sleep(retryIntervalTimeMillis);
} else {
Thread.sleep(10L);
}
} catch (InterruptedException var11) {
logger.error("线程[{}]中断,竞争锁失败,Hash值[{}]", Thread.currentThread(), value);
break;
}
if (Thread.currentThread().isInterrupted()) {
break;
}
}
logger.debug("线程[{}]竞争锁失败,Hash值[{}],尝试次数[{}]", Thread.currentThread().getName() + " :" + value, maxRetryTimes);
return null;
}
实现方案与实际场景思考
代码中增加了重试机制,为什么加,为了解决什么问题
猜测
1,并发重复下发时,排队处理,目的让所有相同订单都处理。但显然不符合实际业务场景。为什么没有问题呢,重试机制是,重试次数480,频率8毫秒。总耗时480*8=3S,如果业务执行时间大于3s,其他订单重试是失败的,因为在此期间,锁未释放
但是,3s是一个比较长的时间了,业务执行时间如果小于3s,是可以成功的,这会导致,重复订单,重复消费。
那为什么没有重复消费呢,原因可能只是没有并发下发重复订单而已。
总结:显然不是为了,可以重复消费,重复下发的订单
2,由于某种原因,获取锁失败了
需要重试,再次获得锁。失败原因,排除setNx。
redisson
解决了什么问题
1,锁续期问题
魔乐社区(Modelers.cn) 是一个中立、公益的人工智能社区,提供人工智能工具、模型、数据的托管、展示与应用协同服务,为人工智能开发及爱好者搭建开放的学习交流平台。社区通过理事会方式运作,由全产业链共同建设、共同运营、共同享有,推动国产AI生态繁荣发展。
更多推荐



所有评论(0)