基于zookeeper实现分布式锁


对于锁大家肯定不会陌生,在Java中synchronized关键字和ReentrantLock可重入锁在我们的代码中是经常见的,一般我们用其在多线程环境中控制对资源的并发访问,但是随着分布式的快速发展,本地的加锁往往不能满足我们的需要,在我们的分布式环境中上面加锁的方法就会失去作用。于是人们为了在分布式环境中也能实现本地锁的效果,也是纷纷各出其招,今天让我们来聊一聊一般分布式锁实现的套路。

分布式锁概念

Martin Kleppmann是英国剑桥大学的分布式系统的研究员,之前和Redis之父Antirez进行过关于RedLock(红锁,后续有讲到)是否安全的激烈讨论。Martin认为一般我们使用分布式锁有两个场景:

  • 效率:使用分布式锁可以避免不同节点重复相同的工作,这些工作会浪费资源。比如用户付了钱之后有可能不同节点会发出多封短信。
  • 正确性:加分布式锁同样可以避免破坏正确性的发生,如果两个节点在同一条数据上面操作,比如多个节点机器对同一个订单操作不同的流程有可能会导致该笔订单最后状态出现错误,造成损失。

当我们确定了在不同节点上需要分布式锁,那么我们需要了解分布式锁到底应该有哪些特点:

  • 互斥性:和我们本地锁一样互斥性是最基本,但是分布式锁需要保证在不同节点的不同线程的互斥。
  • 可重入性:同一个节点上的同一个线程如果获取了锁之后那么也可以再次获取这个锁。
  • 锁超时:和本地锁一样支持锁超时,防止死锁。
  • 高效,高可用:加锁和解锁需要高效,同时也需要保证高可用防止分布式锁失效,可以增加降级。
  • 支持阻塞和非阻塞:和ReentrantLock一样支持lock和trylock以及tryLock(long timeOut)。
  • 支持公平锁和非公平锁(可选):公平锁的意思是按照请求加锁的顺序获得锁,非公平锁就相反是无序的。这个一般来说实现的比较少。

我们了解了一些特点之后,我们一般实现分布式锁有以下几个方式:

  • ectd
  • Zk
  • Redis
  • 自研分布式锁

使用zk实现分布式锁

使用zk的临时节点,多个JVM在zookeeper上创建同一个相同节点,zookeeper节点是唯一的,那么只有一个JVM可以创建成功。
如果JVM已经使用完毕,当前JVM1的zk已经关闭了当前session,其他的JVM通过事件通知得知节点已经被删除,这时候重新获取锁。

  1. 定义一个锁的接口
public interface Lock {


    /**
     * 获取锁
     */
    void lock();


    /**
     * 释放锁
     */
    void unlock();


}
  1. 锁的抽象类
public abstract class AbstractLock implements Lock {


    private static final String CONNECTION = "127.0.0.1:2181";
    protected ZkClient zkClient = new ZkClient(CONNECTION);
    protected String lockNode = "/lock";
    protected CountDownLatch countDownLatch;


    public void lock() {
        // 连接zk,在zk上创建一个node,临时节点。如果获取锁成功,执行业务逻辑,否则等待
        if (tryLock()) {
            System.out.println("==========获取锁==============");
        } else {
            // 等待, 使用事件通监听该节点是否删除,如果删除重新获取锁
            waitLock();
            lock();
        }


    }


    public void unlock() {
        if (zkClient != null) {
            zkClient.close();
            System.out.println("==========释放锁============");
        }
    }


    /**
     * 获取锁资源
     */
    public abstract boolean tryLock();


    /**
     * 如果节点创建失败,进行等待,使用事件监听通知该节点是否删除,如果被删除进入获取锁的资源
     */
    public abstract void waitLock();
}
  1. 具体子类
public class ZookeeperDistrbuteLock extends AbstractLock {




    @Override
    public boolean tryLock() {
        try {
            zkClient.createEphemeral(lockNode);
            return true;
        } catch (Exception e) {
            // 创建失败
            return false;
        }
    }


    @Override
    public void waitLock() {
        // 事件通知
        IZkDataListener iZkDataListener = new IZkDataListener() {


            /**
             * 节点改变
             */
            public void handleDataChange(String s, Object o) throws Exception {


            }


            /**
             * 节点删除
             */
            public void handleDataDeleted(String s) throws Exception {
                if (countDownLatch != null) {
                    countDownLatch.countDown();
                }
            }
        };
        zkClient.subscribeDataChanges(lockNode, iZkDataListener);
        try {
            countDownLatch = new CountDownLatch(1);
            if (zkClient.exists(lockNode)) {
                countDownLatch.await();
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
        // 删除事件监听
        zkClient.unsubscribeDataChanges(lockNode, iZkDataListener);


    }
}
  1. 测试代码:
//生成订单类
public class OrderNumGenerator {
    //全局订单id
    public static int count = 0;


    public String getNumber() {
        SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
        return simpt.format(new Date()) + "-" + ++count;
    }
}
//使用多线程模拟生成订单号
public class OrderService implements Runnable {
    private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
    private Lock lock = new ZookeeperDistrbuteLock();


    public static void main(String[] args) {
        System.out.println("####生成唯一订单号###");
        for (int i = 0; i < 100; i++) {
            new Thread(new OrderService()).start();
        }


    }


    public void run() {
        getNumber();
    }


    public void getNumber() {
        try {
            lock.lock();
            String number = orderNumGenerator.getNumber();
            System.out.println(Thread.currentThread().getName() + ",生成订单ID:" + number);
        } finally {
            lock.unlock();
        }
    }
}

Author: Re:0
Reprint policy: All articles in this blog are used except for special statements CC BY 4.0 reprint policy. If reproduced, please indicate source Re:0 !
 Previous
LVS+Keepalived+nginx实现负载均衡集群 LVS+Keepalived+nginx实现负载均衡集群
LVS:一种四层负载均衡器,软负载均衡,完成所有负载均衡业务需求,比如数据库、web服务、虚拟化技术。Linux2.4内核之后,默认集成。 Keepalived:LVS基础之上,实现心跳检测、监控服务器实现故障转移,如果服务器发生宕机,可
2022-04-22
Next 
Spring Boot启动之前做了哪些事 Spring Boot启动之前做了哪些事
探究一下在调用我们写的main方法之前,SpringBoot框架为我们做了哪些事情。
2022-04-13
  TOC