引言
随着业务规模的不断扩张和技术架构的演进,分布式系统已经成为支撑高并发、海量数据处理的关键基础设施。在分布式环境中,各个节点相对独立且可能并发地执行任务,这极大地提升了系统的整体性能和可用性。当涉及到对共享资源的访问和修改时,为了确保数据的一致性和正确性,我们需要一种能在多节点间协调并发操作的技术手段,也就是分布式锁。
传统的单机环境下,进程内可以通过本地锁轻松实现对临界区资源的互斥访问。但是,这一方法在分布式系统中不再适用,因为单机锁无法跨越网络边界,无法保证不同节点间的并发控制。分布式锁正是在这种背景下产生,它是一种能够实现在分布式系统中多个节点之间协同工作的锁机制,旨在保护共享资源不受并发冲突的影响,确保在复杂的分布式场景下数据操作的有序性和一致性。
库存扣减
我们以WMS系统中,订单出入库操作库存为例。
CREATE TABLE `tb_inventory`(`id`BIGINTNOT NULL AUTO_INCREMENT,`account_id`BIGINTNOT NULL DEFAULT 0 COMMENT '帐套ID',`sku`VARCHAR(128)NOT NULL DEFAULT '' COMMENT '商品sku编码',`warehouse_code`VARCHAR(16)NOT NULL DEFAULT '' COMMENT '库存编码',`available_inventory` INT UNSIGNEDNOT NULL DEFAULT 0 COMMENT '可用库存',`create_time`DATETIMENOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',`update_time`DATETIMENOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',`deleted`TINYINT UNSIGNED NULLDEFAULT 0 COMMENT '0-未删除 1/null-已删除',PRIMARY KEY (`id`) USING BTREE,UNIQUE KEY uk_warehouse_code (customer_no, warehouse_code, sku, deleted)) ENGINE = InnoDBAUTO_INCREMENT = 1CHARACTER SET = utf8mb4 COMMENT = '库存表';
关于操作库存,常见有以下一些错误做法:
1、内存中判断库存是否充足,并完成扣减
直接在内存中判断是否有库存,计算扣减之后的值更新数据库,并发的情况下会导致库存相互覆盖发。
/*** 确认订单出库** @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrder(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 忽略 订单信息校验等,,,// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 剩余库存Integer remainInventory = availableInventory - qty;// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);updateInventory.setAvailableInventory(remainInventory);tbInventoryMapper.updateInventory(updateInventory);}
sql中直接执行更新库存
<update>UPDATE tb_inventorySET available_inventory = #{availableInventory}WHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND deleted = 0</update>
库存SKU的库存已经变成了负数:
2、内存中判断库存是否充足,Sql中执行库存扣减
在InnoDB存储引擎下,UPDATE通常会应用行锁,所以在SQL中加入运算避免值的相互覆盖,但是库存的数量还是可能变为负数。因为校验库存是否充足在内存中执行,并发情况下都会读到有库存。
/*** 确认订单出库** @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrder(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 忽略 订单信息校验等,,,// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}
库存扣减在sql中进行
<update>UPDATE tb_inventorySET available_inventory = available_inventory - #{diffInventory}WHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND deleted = 0</update>
库存SKU的库存已经变成了负数:
在操作库存方法上使用synchronized
虽然synchronized可以防止在多并发环境下,多个线程并发访问这个库存操作方法,但是synchronized的作用在方法结束之后就失效了,可能此时事务并没有提交,导致可能其他的线程会在拿到锁之后读取到旧库存数据,在执行扣除时,依然可能会造成库存扣减不对。
/*** 确认订单出库** @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic synchronized void confirmOrder(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 忽略 订单信息校验等,,,// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}
库存SKU的库存已经变成了负数:
从上面的错误案例来看,在操作库存时,不是原子性的,导致库存操作失败。以下我们从单体以及分布式系统两个方向探讨如何保证数据的一致性和正确性。
单机系统
在单机系统中,数据和业务逻辑都集中在一个进程中,面对并发访问共享资源的情况,需要依靠锁机制和数据库的事务管理(行锁)来维护数据的正确性和一致性。
对于锁机制,我们不管是采用synchronized还是Lock等,我们要保证的一个条件就是:要让数据库的事务在锁的控制范围之内。
针对上述错误案例,我们可以将锁作用于事务之外,即将锁放在库存操作方法的上一层(例如service层)。
@Servicepublic class OrderServiceImpl implements IOrderService {private IOrderManager orderManager;/*** 确认订单出库** @param customerNo* @param orderNo*/@Overridepublic synchronized void confirmOrder(String customerNo, String orderNo) {orderManager.confirmOrder(customerNo, orderNo);}@Autowiredpublic void setOrderManager(IOrderManager orderManager) {this.orderManager = orderManager;}}
此时我们在操作库存,会因为库存不够,导致库存操作失败:
这种方式虽然可以实现数据一致性和正确性,但是并不是很推荐,因为我们的事务要控制的粒度尽可能的小。
推荐的方式,是我们再锁的控制范围去提交事务。即手动提交事务。使用TransactionTemplate或直接在代码中调用PlatformTransactionManager的getTransaction和commit方法来手动管理事务。
@Autowiredprivate PlatformTransactionManager transactionManager;/*** 确认订单出库** @param customerNo* @param orderNo*/@Overridepublic synchronized void confirmOrder(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());// 忽略 订单信息校验等,,,// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);// 提交事务transactionManager.commit(status);}
此时我们再去执库存操作,会因为库存不够,导致库存操作失败:
对于上述同步锁的实现,我们最好使用Lock得方式去实现,可以更精细控制同步逻辑。
@Autowiredprivate PlatformTransactionManager transactionManager;private final Lock orderLock = new ReentrantLock();/** * 确认订单出库 * * @param customerNo * @param orderNo */@Overridepublic void confirmOrder(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();try {// 尝试获取锁,最多等待timeout时间if (orderLock.tryLock(1, TimeUnit.SECONDS)) {// 成功获取到锁,执行确认订单的逻辑TransactionStatus status = transactionManager.getTransaction(new DefaultTransactionDefinition());try {// 忽略 订单信息校验等,,,// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);// 提交事务transactionManager.commit(status);}catch (Exception e){// 回滚事务transactionManager.rollback(status);// 处理异常e.printStackTrace();}finally {// 释放锁orderLock.unlock();}} else {// 获取锁超时System.out.println("Failed to confirm order within the timeout period: " +orderNo);// 处理超时情况,比如记录日志、通知用户等}} catch (InterruptedException e) {// 如果在等待锁的过程中线程被中断,处理中断异常Thread.currentThread().interrupt();// ... 处理中断逻辑 ...}}
在单机系统中,上述方法可以保证数据一致性以及正确性,但是实际业务中,我们应用通常都部署在多个服务器中,此时上述方案就不能保证了,就需要分布式锁来解决了。
分布式锁的实现
在单机系统中,锁是一种基本的同步机制,用于控制多个线程对共享资源的并发访问。当我们升级到分布式系统时,由于服务分散在多个节点之上,原本在单机环境下使用的锁机制无法直接跨越多个节点来协调资源访问。所以此时,分布式锁作为一种扩展的锁概念应运而生。分布式锁是一种跨多个节点、进程或服务的同步原语,它允许在分布式系统中协调对共享资源的访问,确保在任何时候只有一个节点能够独占地执行操作,即使这些节点分布在不同的物理或虚拟机器上。
分布式锁的基本要素
1.互斥性: 这是分布式锁最基本的要求,意味着在任意时刻,只有一个客户端(无论是进程、线程还是服务实例)能够持有并使用锁,从而确保共享资源不会同时被多个客户端修改。
2.持久性: 分布式锁必须具备一定的持久化能力,即便服务重启或网络短暂断开,锁的状态仍然能够得到保持。
3.可重入性:类似于单机环境下的可重入锁,分布式锁也应该支持同一客户端在持有锁的同时再次请求锁而不被阻塞,这对于递归调用或涉及多个资源访问的操作至关重要。
4.公平性(Fairness): 在某些场景下,要求锁分配遵循一定的公平原则,即等待最久的客户端在锁释放时优先获得锁。虽然不是所有分布式锁实现都需要考虑公平性,但在某些高性能或高并发的系统中,公平性是非常重要的。
5.容错性:分布式锁服务应当具备一定的容错能力,即即使一部分服务节点发生故障,仍能保证锁功能的正确运行,防止死锁和数据不一致。这通常通过服务冗余和复制机制来实现,如使用Raft、Paxos等一致性协议或基于ZooKeeper、etcd等分布式协调服务。
常见分布式锁解决方案
基于数据库实现
1.数据库悲观锁
悲观锁以预防性策略处理并发冲突,它假设并发访问导致的数据冲突是常态。因此,在访问数据之前,它会积极地获取并持有锁,确保在锁未释放时,其他事务无法对同一数据进行访问。通过运用SELECT ... FOR UPDATESQL语句,能够在查询阶段即锁定相关行,实现数据的独占访问。然而,重要的是要注意,此操作应仅针对唯一键执行,否则可能会大幅增加锁定范围和潜在的锁表风险,从而影响系统的并发性能与效率。
最常见的做法是直接在业务数据上使用SELECT ... FOR UPDATE,例如:
<select resultType="com.springboot.mybatis.entity.TbInventoryDO">SELECT *FROM tb_inventoryWHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND deleted = 0FOR UPDATE</select>
在一个事务中,先使用SELECT ... FOR UPDATE后,在执行更新。
/*** 使用SELECT... FOR UPDATE 实现分布式锁,扣减库存* @param customerNo* @param orderNo*/@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithLock(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}
但是,这种实现方式,很容易造成业务表的锁压力,特别是数据量大,并发量高的时候。所以,还有一种做法是,专门维护一张锁的表,而不是直接在业务数据表上使用SELECT FOR UPDATE。这种方式在某些场景下可以帮助简化锁的管理,并且可以在一定程度上减轻对业务数据表的锁定压力。(其实实现方式,类似Redis实现的分布式锁,只是用数据库实现了而已)。其实现流程,如下:
数据库实现悲观锁流程
1.创建锁表:首先,创建一张锁表,例如lock_table,包含lock_key(用于标识需要锁定的业务资源)、lock_holder(持有锁的客户端标识,如用户ID或事务ID)、acquire_time(获取锁的时间)等字段。
CREATE TABLE `tb_lock`(idBIGINT AUTO_INCREMENTPRIMARY KEY,lock_keyVARCHAR(255)NOT NULL DEFAULT '' COMMENT '锁的业务编码。对应业务表的唯一键',lock_holderVARCHAR(32)NOT NULL DEFAULT '' COMMENT '持有锁的客户端标识',acquire_time DATETIMENOT NULL COMMENT '获取锁的时间',create_timeDATETIMEDEFAULT CURRENT_TIMESTAMP NOT NULL COMMENT '创建时间',update_timeDATETIMEDEFAULT CURRENT_TIMESTAMP NOT NULL ON UPDATE CURRENT_TIMESTAMP COMMENT '修改时间',deletedTINYINT UNSIGNED DEFAULT '0'NULL COMMENT '0-未删除 1/null-已删除',UNIQUE KEY uk_lock (lock_key, deleted)) ENGINE = InnoDBAUTO_INCREMENT = 1CHARACTER SET = utf8mb4 COMMENT = 'Lock表';
<insert>INSERT INTO tb_lock(lock_key,lock_holder,acquire_time)VALUES(#{lockKey},#{lockHolder},#{acquireTime})</insert>
<select resultType="com.springboot.mybatis.entity.TbLockDO">SELECT *FROM tb_lockWHERE lock_key = #{lockKey} AND deleted = 0FOR UPDATE</select>
4. 检查锁状态:在获取锁时,可以检查锁是否已被持有,比如检查lock_holder字段,如果已有其他事务持有锁,则获取锁失败,需要等待或重试。
// 尝试获取锁tryLock(lockKey, lockHolder);// 使用SELECT FOR UPDATE锁定锁表记录TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey);if (!tbLockDO.getLockHolder().equals(lockHolder)) {// 锁已被其他客户端持有,获取锁失败,需要处理此异常情况throw new IllegalStateException("Lock is held by another client.");}
<delete parameterType="java.lang.String">DELETE FROM tb_lockWHERE lock_key = #{lockKey}AND lock_holder = #{lockHolder}AND deleted = 0</delete>
基于数据库悲观锁实现,代码如下:
@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithLock(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);String lockHolder = Thread.currentThread().getName();try {// 尝试获取锁tryLock(lockKey, lockHolder);// 使用SELECT FOR UPDATE锁定锁表记录TbLockDO tbLockDO = tbLockMapper.selectLockByLockKey(lockKey);if (!tbLockDO.getLockHolder().equals(lockHolder)) {// 锁已被其他客户端持有,获取锁失败,需要处理此异常情况throw new IllegalStateException("Lock is held by another client.");}// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventoryForUpdate(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);}finally {unlock(lockKey, lockHolder);}}/*** 尝试获取锁* @param lockKey 锁的key 业务编码* @param lockHolder 锁的持有者* @return 是否获取成功*/private void tryLock(String lockKey, String lockHolder) {TbLockDO tbLockDO = new TbLockDO();tbLockDO.setLockKey(lockKey);tbLockDO.setLockHolder(lockHolder);tbLockDO.setAcquireTime(LocalDateTime.now());//插入一条数据insert intotbLockMapper.insertLock(tbLockDO);}/*** 锁释放* @param lockKey 锁的key 业务编码*/private void unlock(String lockKey, String lockHolder){tbLockMapper.deleteLockByLockKey(lockKey, lockHolder);}
数据库悲观锁实现分布式锁可以防止并发冲突,确保在事务结束前,这些记录不会被其他并发事务修改。它还可以控制锁的粒度,提供行级别的锁定,减少锁定范围,提高并发性能。这种方式非常适合于处理需要更新的事务场景,特别是银行转账、库存扣减等需要保证数据完整性和一致性的操作。
但是,需要注意的是,过度或不当使用SELECT FOR UPDATE会导致更多的行被锁定,在高并发场景下,如果大量事务都在等待获取锁,可能会导致锁等待和死锁问题,并且当事务持有SELECT FOR UPDATE的锁时,其他事务尝试修改这些锁定的行会陷入等待状态,直至锁释放。这可能导致其他事务的延迟和系统吞吐量下降,长时间持有锁会导致数据库资源(如内存、连接数等)消耗增大,特别是长事务中持有锁时间较长,会影响系统的总体性能。所以我们在使用时要特别注意不要再长事务中使用悲观锁。
2.数据库乐观锁
乐观锁假定并发冲突不太可能发生,因此在读取数据时不锁定资源,而是在更新数据时验证数据是否被其他事务修改过。
在数据库表中添加一个version字段。
ALTER TABLE `tb_inventory` ADD COLUMN `version` INT NOT NULL DEFAULT 0 COMMENT '乐观锁版本' AFTER available_inventory;
每次更新时将version字段加1。在更新数据时,通过UPDATE语句附带WHERE version = oldVersion条件,只有当version值不变时更新操作才会成功。若version已变,则表示数据已被其他事务修改,此次更新失败。
<update>UPDATE tb_inventorySET available_inventory = available_inventory - #{diffInventory},version = #{version} + 1WHERE sku = #{sku}AND customer_no = #{customerNo}AND warehouse_code = #{warehouseCode}AND version = #{version}AND deleted = 0</update>
基于乐观锁实现的方案:
@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithVersion(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();Integer curVersion = inventoryDO.getVersion();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 设置当前数据版本号updateInventory.setVersion(curVersion);// 库存差值updateInventory.setDiffInventory(qty);updateInventory.setVersion(inventoryDO.getVersion());int updateRows = tbInventoryMapper.updateInventorWithVersion(updateInventory);if (updateRows != 1){System.err.println("更新库存时发生并发冲突,请重试");throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时发生并发冲突,请重试");}}
乐观锁假定大多数情况下不会有并发冲突,所以在读取数据时不立即加锁,而是等到更新数据时才去检查是否有其他事务进行了改动,这样可以减少锁的持有时间,提高了系统的并发性能。并且,乐观锁在数据更新时才检查冲突,而不是在获取数据时就加锁,所以大大降低了死锁的风险。并且因为不常加锁,所以减少了数据库级别的锁管理开销,非常适合对于读多写少的场景。
但是,当并发写入较多时,可能出现大量更新冲突,需要不断地重试事务以获得成功的更新。过多的重试可能导致性能下降,特别是在并发度极高时,可能会形成“ABA”问题。并且 在极端并发条件下,如果没有正确的重试机制或超时机制,乐观锁可能无法保证强一致性。尤其是在涉及多个表的复杂事务中,单个乐观锁可能不足以解决所有并发问题。
基于Redis实现
1.Redis的setNX实现
Redis的setNX(set if not exists)命令是原子操作,当键不存在时才设置值,设置成功则返回true,否则返回false。通过这个命令可以快速地在Redis中争夺一把锁。
利用Redis,我们可以生成一个唯一的锁ID作为key的一部分。然后使用setNX尝试设置key-value对,value可以是过期时间戳。若设置成功,则认为获取锁成功,执行业务逻辑。在业务逻辑完成后,删除对应key释放锁,或设置过期时间自动释放。
@Slf4jpublic class RedisDistributedLock implements AutoCloseable{private final StringRedisTemplate stringRedisTemplate;private final DefaultRedisScript<Boolean> unlockScript;/**锁的key*/private final String lockKey;/**锁过期时间*/private final Integer expireTime;private static final String UNLOCK_LUA_SCRIPT = "if redis.call(\"get\", KEYS[1]) == ARGV[1] then\n" +"return redis.call(\"del\", KEYS[1])\n" +"else\n" +"return 0\n" +"end";public RedisDistributedLock(StringRedisTemplate stringRedisTemplate, String lockKey, Integer expireTime) {this.stringRedisTemplate = stringRedisTemplate;this.lockKey = lockKey;this.expireTime = expireTime;// 初始化Lua解锁脚本this.unlockScript = new DefaultRedisScript<>();unlockScript.setScriptText(UNLOCK_LUA_SCRIPT);unlockScript.setResultType(Boolean.class);}/*** 获取锁* @return 是否获取成功*/public Boolean getLock() {String value = UUID.randomUUID().toString();try {return stringRedisTemplate.opsForValue().setIfAbsent(lockKey, value, expireTime, TimeUnit.SECONDS);} catch (Exception e) {log.error("获取分布式锁失败: {}", e.getMessage());return false;}}/*** 释放锁* @return 是否释放成功*/public Boolean unLock() {// 使用Lua脚本进行解锁操作List<String> keys = Collections.singletonList(lockKey);Object result = stringRedisTemplate.execute(unlockScript, keys, stringRedisTemplate.opsForValue().get(lockKey));boolean unlocked = (Boolean) result;log.info("释放锁的结果: {}", unlocked);return unlocked;}@Overridepublic void close() throws Exception {unLock();}}
然后,我们在处理库存时,先尝试获取锁,如果获取到锁,则就可以更新库存。
@Transactional(rollbackFor = Exception.class)@Overridepublic void confirmOrderWithRedisNx(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);// 30秒过期try (RedisDistributedLock lock = new RedisDistributedLock(stringRedisTemplate, lockKey, 30)) {if (lock.getLock()) {// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);} else {log.error("更新库存时发生并发冲突,请重试");throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时发生并发冲突,请重试");}} catch (Exception e) {log.error("处理分布式锁时发生错误: {}", e.getMessage());}}
Redis作为内存数据库,其操作速度快,setNX的执行时间几乎可以忽略不计,尤其适合高并发场景下的锁请求。Redis作为一个可以独立的服务,可以轻松实现不同进程或服务器之间的互斥锁。而setNX命令是原子操作,能够在Redis这一单线程环境下以原子性的方式实现锁的获取,简单一行命令即可实现锁的争抢。同时可以通过EX或PX参数,可以在设置锁时一并设定过期时间,避免因意外情况导致的死锁。
但是单纯使用setNX并不能自动续期,一旦锁过期而又未主动释放,可能出现锁被其他客户端误获取的情况,需要额外实现锁的自动续期机制,例如使用WATCH和MULTI命令组合,或者SET命令的新参数如SET key value PX milliseconds NX XX。而setNX在获取不到锁时会立即返回失败,所以我们必须轮询或使用某种延时重试策略来不断尝试获取锁。并且如果多个客户端同时请求锁,Redis并不会保证特定的排队顺序,可能导致“饥饿”现象(即某些客户端始终无法获取锁)。
虽然Redis的setNX命令在实现分布式锁方面提供了便捷性和高性能,但要构建健壮、可靠的分布式锁解决方案,往往还需要结合其他命令(如expire、watch、multi/exec等)以及考虑到各种边缘情况和容错机制。一些成熟的Redis客户端库(如Redisson、Jedis)提供了封装好的分布式锁实现,解决了上述许多问题。
基于Redisson实现
Redisson是一个高性能、开源的Java驻内存数据网格,它基于Redis,并提供了众多分布式数据结构和一套分布式服务,例如分布式锁、信号量、闭锁、队列、映射等。Redisson使得开发者能够更容易地在Java应用程序中使用Redis,特别是对分布式环境下的同步原语提供了丰富的API支持。
Redisson的分布式锁核心原理基于Redis命令,但进行了增强和封装,提供了一种更加可靠和易于使用的分布式锁实现。他实现分布式锁的思路与Redis的setNx实现类似。但是,相比较与Redis的setNx实现分布式锁,Redisson还支持可重入锁,即同一个线程在已经获得锁的情况下可以再次获取锁而不被阻塞。内部通过计数器记录持有锁的次数,每次成功获取锁时计数器递增,释放锁时递减,只有当计数器归零时才真正释放锁。Redisson使用了看门狗(Watchdog)机制来监控锁的状态,定期自动延长锁的有效期,这样即使持有锁的客户端暂时冻结或网络抖动,锁也不会因为超时而被提前释放。并且,对于Redis集群,Redisson还可以实现RedLock算法,通过在多个Redis节点上分别获取锁,增加分布式锁的可用性和容错能力。
我们使用Redisson实现分布式锁,实现库存扣减。
<dependency><groupId>org.redisson</groupId><artifactId>redisson-spring-boot-starter</artifactId><version>3.17.7</version></dependency>
spring:redisson:address: "redis://127.0.0.1:6379"password:
@Overridepublic void confirmOrderWithRedisson(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);// 30秒过期RLock lock = redissonClient.getLock(lockKey);try {if (lock.tryLock(30, TimeUnit.SECONDS)) {// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);} else {log.error("更新库存时发生并发冲突,请重试");throw new ServiceException(StatusEnum.SERVICE_ERROR, "更新库存时发生并发冲突,请重试");}}catch (Exception e){throw new ServiceException(StatusEnum.SERVICE_ERROR, "获取分布式锁时被中断");}finally {// 无论成功与否,都要释放锁if (lock.isLocked() && lock.isHeldByCurrentThread()) {lock.unlock();}}}
Redisson支持多种类型的分布式锁,包括可重入锁(RLock)、读写锁(RReadWriteLock)、公平锁(RFairLock)等,满足不同业务场景的需求。Redisson支持锁的自动续期功能,可以防止因为锁持有者在业务处理过程中长时间未完成而导致锁过期被其他客户端获取。对于Redisson RedLock算法(多节点部署时),即使部分Redis节点失效,也能在大多数Redis节点存活的情况下维持锁的稳定性,增强了系统的容错性和高可用性。
相较于简单的数据库悲观锁,Redisson的分布式锁实现更为复杂。虽然Redisson提供了自动续期机制,但如果客户端在获取锁后突然崩溃且没有正常释放锁,理论上仍然有可能导致锁泄漏。虽然Redisson也提供了超时设置,但极端情况下仍需结人工清理机制或者其他的方案来预防此类问题。
使用Zookeeper
在Zookeeper中实现分布式锁的基本原理是利用Zookeeper的临时节点和Watcher监听机制。
客户端在Zookeeper中指定的某个路径下创建临时有序节点,每个节点名称后都会附加一个唯一的递增数字,表示节点的顺序。当多个客户端同时请求锁时,它们都会创建各自的临时有序节点。
客户端按照节点顺序判断自己是否可以获得锁。节点顺序最小的客户端被认为是锁的持有者,它观察到的序号比自己大的所有节点都是待解锁的队列。锁的持有者继续执行业务逻辑,其它客户端则会注册Watcher监听比自己序号小的那个节点。
当锁持有者完成业务处理后,会删除它创建的临时节点,Zookeeper会触发Watcher通知等待队列中的下一个节点。接收到通知的下一个节点发现其观察的节点已删除,于是重新检查当前路径下剩余节点的顺序,如果自己是现在最小的节点,则认为获得了锁。
Watcher机制允许客户端监听Zookeeper上的节点变化事件,当节点被创建、删除、更新时,Zookeeper会向注册了相应事件的客户端发送通知。在分布式锁场景中,客户端通过注册Watcher来监听锁持有者的节点状态,以便在锁释放时及时获取锁。
而我们使用Apache Curator框架作为Zookeeper客户端实现分布式锁。Curator拥有良好的架构设计,提供了丰富的recipes(即预制模板)来实现常见的分布式协调任务,包括共享锁、互斥锁、屏障、Leader选举等。Curator的分布式锁实现如InterProcessMutex和InterProcessSemaphoreMutex,直接提供了易于使用的API来获取和释放锁。
Curator在实现分布式锁时,充分考虑了ZooKeeper的特性,比如临时节点的生命周期关联会话、有序节点的排序机制以及Watcher事件的通知机制等,确保在各种异常情况下,锁的行为符合预期,例如客户端断线后锁能被正确释放。
Curator内部集成了重试策略和背压控制,当ZooKeeper操作遇到网络延迟或短暂的ZooKeeper集群不稳定时,Curator能够自动进行重试,而不是立即抛出异常。
@Componentpublic class ZkLock {private final CuratorFramework client;private final InterProcessMutex lock;@Value("${curator.zookeeper.connect-string}")private String zookeeperConnectString;public ZkLock() {RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);client = CuratorFrameworkFactory.newClient(zookeeperConnectString, retryPolicy);client.start();// 分布式锁路径String lockPath = "/locks/product_stock";lock = new InterProcessMutex(client, lockPath);}public void acquireLock(Runnable task) throws Exception {// 尝试获取锁,超时时间为30秒if (lock.acquire(30, TimeUnit.SECONDS)) {try {task.run();// 在持有锁的情况下执行任务} finally {lock.release();// 无论是否出现异常,都要确保释放锁}} else {throw new Exception("获取分布是锁失败");}}}
使用ZkLock:
@Overridepublic void confirmOrderWithZk(String customerNo, String orderNo) {// 查询订单信息OutboundOrderDO outboundOrderDO = outboundOrderMapper.selectOrderByOrderNo(customerNo, orderNo);String warehouseCode = outboundOrderDO.getWarehouseCode();// 查询订单明细假设我们的出库订单是一单一件OutboundOrderDetailDO detailDO = orderDetailMapper.selectDetailByOrderNo(outboundOrderDO.getOrderNo());String sku = detailDO.getSku();Integer qty = detailDO.getQty();String lockKey = String.format("inventory:%s_%s_%s", customerNo, warehouseCode, sku);// 30秒过期zkLock.acquireLock(() -> {// 查询库存TbInventoryDO inventoryDO = tbInventoryMapper.selectSkuInventory(customerNo, warehouseCode, sku);Integer availableInventory = inventoryDO.getAvailableInventory();// 判断库存是否足够if (qty > availableInventory){System.err.println("库存不足,不能出库");throw new ServiceException(StatusEnum.SERVICE_ERROR, "库存不足,不能出库");}// 扣减库存TbInventoryDO updateInventory = new TbInventoryDO();updateInventory.setCustomerNo(customerNo);updateInventory.setWarehouseCode(warehouseCode);updateInventory.setSku(sku);// 库存差值updateInventory.setDiffInventory(qty);tbInventoryMapper.updateInventory(updateInventory);});}
Apache Curator实现的分布式锁适用于需要在分布式环境中实现强一致性和高可靠性的并发控制场景,但是它对ZooKeeper的依赖就涉及到了一些网络开销以及运维复杂性等方面的缺点。
总结
分布式锁是一种在分布式系统中实现互斥控制的机制,确保在多台机器间,某一资源在同一时刻只被一个服务或者一个请求所访问或修改。它的核心挑战在于如何保证在无中心化环境下的全局唯一性和一致性。
其实现主要依赖分布式存储系统或协调服务。常见的实现方式有如下几种方式:
而实际业务开发中,我们需要根据具体的业务以及系统资源等考虑,选择合适的分布式锁实现方式。
本网站的文章部分内容可能来源于网络和网友发布,仅供大家学习与参考,如有侵权,请联系站长进行删除处理,不代表本网站立场,转载者并注明出处:https://jmbhsh.com/baihuokuaixun/34350.html