zookeeper详解_zookeeper

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6

一 zookeeper介绍

        首先需要了解zookeeper是什么zookeeper是一个分布式协调服务。所谓分布式协调主要是来解决分布式系统中多个进程之间的同步限制防止出现脏读例如我们常说的分布式锁。

        zookeeper中的数据是存储在内存当中的因此它的效率十分高效。它内部的存储方式十分类似于文件存储结构采用了分层存储结构。但是它和文件存储结构的区别是它的各个节点中是允许存储数据的需要注意的是zk的每个节点存储数据不能超过1M。它的内存数据结果如下图

我们可以通过不同的路径访问到不同的节点因为它是分层结构我们也可以通过某一个父节点获取到该节点下的所有子节点信息。

        zk只提供了几个简单的api但是我们可以通过灵活使用这些api的组合来实现我们复杂的业务要求

        1create创建一个新节点通过指定路径的方式创建节点例如创建路径为/A/A1/demo则会在A1节点下创建一个demo节点

        2delete删除节点通过路径的方式删除节点如果删除路径为/A/A1/demo则会删除A1节点下的demo节点

        3exists判断指定路径下的节点是否存在例如判断路径为/A/A1/demo则会判断A1节点下的demo节点是否存在

        4get获取指定路径下某个节点的值是什么例如获取路径为/A/A1/demo则会获取A1节点下的demo节点的值什么

        5set为指定路径的节点进行赋值操作例如修改路径为/A/A1/demo则会修改A1节点下的demo节点的值

        6get children获取指定路径节点下的子节点信息例如获取路径为/A则会获取A节点下的A1和A2节点

        7sync获取到同步数据这个涉及到了zk的原理zk集群属于最终一致性调用该方法可以获取到最终的结果值如果不使用该方法在查询的时候可能获取到的值是中间值

        zk中创建的节点分为两种永久性节点和临时性节点。永久性节点即创建以后在不执行delete命令的前提下该节点是永久存在的而临时节点与session有关每个客户端与zk建立链接的时候会生成一个session这个session不会因为链接zk服务器节点的变化而变化只有当客户端断开连接以后该session才会消失而临时节点会随着session的消失而消失。

        zk拥有watch机制也就是监视机制可以支持响应式编程模式它可以对某个路径的终节点及其子节点的变更进行监视当其发生变更以后会调用注册的callback方法然后进行具体的业务逻辑。例如监测路径为/A/A1那么它会加测A1节点以及附属于A1的所有子节点这个子不单单只一层子节点是指所有层的子节点。

        zk拥有以下几个重要特性

        1顺序一致性来自客户端的相关指令会按照顺序执行不会出现乱序的情况客户端发送到服务的指令1->2->3->4那个这些指令就会按照顺序执行

        2原子性更新只有成功和失败没有中间状态

        3可靠性也可以称之为持久性节点更新以后在下次更新之前它的数据不会发生变更

        4准实时性也可以称之为最终一致性在zk集群中一个客户端修改了其中的一个节点一定时间以后所有可用的服务对应的节点都会变成更新以后的值。

二 zk选主流程

        zk的设计目标就是高可用性那么也就意味着在使用zk的时候一般都是使用集群而不是单点模式。首先来看一下zk的集群模式,如下图

该图为zk集群的可用状态从上图中可以看到zk的集群是主从集群客户端可以随意与任何zk服务节点进行连接并且各个客户端都可以进行读写操作这是一个核redis主从集群的区别redis的主从集群如果客户端是写操作那么只能连接redis的主节点才可以。zk的每个客户端是随机连接到zk服务节点的并且每个客户端都可以进行读写操作读操作都是在客户端连接的zk节点进行操作而写操作是有区别的如果该客户端连接的是leader节点那么直接进行写操作如果该客户端连接的是follower节点那么zk的服务节点会自动将该写操作转到leader节点进行。

        zk的集群为主从集群那么也就意味着主节点只有一个那么当主节点挂了以后该zk集群则会处于不可用状态既然zk的设计目的是高可用也就意味着当主节点挂了以后zk会有一定的方式来快速的选出主节点让服务恢复可用状态zk的官方文档中给出的压测报告7台zk服务选主耗时大概200ms。

        介绍zk的选举流程之前需要先解释两个概念zxid以及myid。zxid指的是当前节点的事物id通俗点说就是当前节点完成的数据同步情况该值越大越能说明该节点的数据同步情况越完整丢失数据的情况越小或者丢失数据越少。myid是在创建zk集群的时候我们给它的赋值。

        zk的follower节点和leader节点是通过心跳来查看服务是否可用。在这其中只要有有一台follower节点发现主节点挂掉他就开始向其它follower节点发送选主请求整个集群进入选主流程不再向外提供服务。

        先假设现在有4个zk节点分别为node1node2node3node4他们的myid分别为1,2,3,4选主流程主要分为以下两种情况

        1.初始启动在启动阶段时此时各个服务节点的zxid都为0只与myid有关。假设启动顺序为node1->node2->node3->node4当启动动1和2的时候该zk集群是不可用状态因为zk的选主必须是过半服务节点同意包含自己最低需要启动三个节点才可以进行选举因此只有node1和node2启动的时候此时只有两台服务不满足条件当第三台节点启动以后才满足了选主的最低条件然后进入到选举流程因为node3的myid最大所以此时3号节点为leader然后启动node4由于此时已经选举出3位leader节点并且过半通过则不再选取新的主节点。则该集群的leader节点为node3。

        2.运行过程中初始启动过程中的leadernode3节点挂掉假设此时只有node4节点发现leader已经挂掉node1和node2的Zxid都是10node4的Zxid为9选主的时候需要比较zxid和myid需要注意他们的优先级zxid为第一优先级myid为第二优先级选举流程大致分为以下几步

        1)node4节点给自己投票然后将自己的zxid和myid发送给node1和node2节点

         2node1和node2通过比较zxid和myid发现node4不能成为leader节点将各自的zxid和myid发送给node4然后node4接收到以后发现node1和node2都比自己时候成为leader节点会给它们进行投票

         3node1和node2反驳完node4的选主请求以后开始进行各自的选主流程起过程与node4的过程一致通过上面的优先级我们可以知道最终node2会成为leader节点那么以node2为例说一下接下来的流程。node2首先给自己投票然后将自己zxid和myid推送给node1和node4此时会发现node2适合成为主节点则会给node2节点进行投票最终选出node2成为主节点zk集群恢复成可用状态。

三 zk数据一致性

        zk服务一般上是以集群状态提供服务多个zk节点之间的数据一致性是通过zap原子广播协议来保证的。zk的数据一致性为最终一致性需要注意的是他不是实时的比如node1node2node3其中node3为leadernode1和node2为follower当node1进行节点创建以后leader节点肯定为实时更新但是follower节点不一定为实时更新因为只要过半通过就算节点已经创建成功可能会有的节点当前的数据还不是最终态但是它的更新指令是存在只是可能还没执行。我们的客户端如果想要读取最终态的数据那么可以通过使用上面的sync命令来获取最终数据。

        先看一下下面的流程图然后再进行详细解释

        1首先由客户端发送创建节点的指令给到zk节点假设这个zk节点为follower1节点

        2follower1节点发现是写操作节点则将该指令通过2888端口转发到leader节点执行

        3leader节点更新自己zxid信息也就是事务id信息

        4leader节点先将创建节点信息同步到log日志中然后再follower1和follower2各自的队列中放入创建节点写日志的指令当follower节点接收到指令以后执行写日志操作写入日志成功以后告诉leader写入完成leader会判断目前是否已经有过半的节点包含自己已经写入完成如果完成则先在自己的内存中创建节点然后将在follower对应的节点中加入在内存中创建节点的指令然后follower接收到指令以后进行内存操作操作完成以后告诉leader写入完成同样需要过半完成

        5将创建结束的消息返回给调用的follower然后返回给客户端节点创建结束。

        上面步骤中的第四步其实就是对原子广播协议的一个大致解释原子广播协议可以看成两部分首先原子就代表这只有成功或者失败没有中间状态而广播就是并不意味着所有节点都完成相关操作才算完成只要过半节点是成功的那么本次操作就算成功完成了。在第四步中提到的队列就是对最终一致性的一个解释leader会将所有指令按照顺序放入每个follower对应的队列中每个follower按顺序去执行队列中的指令达到一个最终一致性的结果。

四 zk分布式锁

        zk作为分布式协调服务它的一个很大的作用就是用来实现分布式锁。zk节点存在临时节点它的生命周期与session有关它会随着session的消失而消失这就比较完美的解决了使用redis作为分布式锁时可能出现的死锁问题。

        下面看一下简单的分布式锁代码编写。

        第一部分代码为连接zk时的watch代码用于监测zk的连接情况它只需要实现Watcher即可。可以根据不同连接状态进行不同的处理我们本次只关心连接状态因为zk是异步连接为了保证zk连接成功以后再做接下来的加锁操作通过CountDownLatch进行阻塞。

/**
 * 连接watcher主要用来监测zk连接状态
 */
public class ConnectionWatch implements Watcher {
    /**
     * 由于zk获取信息为异步通过countDownLatch进行阻塞保证连接成功
     */
    private CountDownLatch countDownLatch;

    public void setCountDownLatch(CountDownLatch countDownLatch){
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void process(WatchedEvent watchedEvent) {
        System.out.println(watchedEvent.toString());
        switch (watchedEvent.getState()){

            case Unknown:
                break;
            case Disconnected:
                break;
            case NoSyncConnected:
                break;
            case SyncConnected:
                // 连接成功去除阻塞
                countDownLatch.countDown();
                break;
            case AuthFailed:
                break;
            case ConnectedReadOnly:
                break;
            case SaslAuthenticated:
                break;
            case Expired:
                break;
            case Closed:
                break;
        }

    }
}

          第二部分代码为zk的工具类用于获取zk实例用于业务代码调用。

public class ZkUtils {

    private static volatile ZooKeeper zooKeeper;
    /**
     * zk服务器节点地址以及锁的主目录
      */
    private final static String url = "127.0.0.1:2181,127.0.0.2:2181,127.0.0.3:2181/orderLock";

    private static ConnectionWatch watch = new ConnectionWatch();

    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 采创建zk
     * @return
     * @throws IOException
     * @throws InterruptedException
     */
    public static ZooKeeper getInstance() throws IOException, InterruptedException {
        watch.setCountDownLatch(countDownLatch);
        // 创建zk实例1000代表的是session过期时间
        zooKeeper = new ZooKeeper(url, 1000, watch);
        // 在zk连接成功之前进行阻塞
        countDownLatch.await();
        return zooKeeper;

    }
}

        第三部分为在加锁过程中相关操作的watch以及callback操作主要功能有创建节点获取子节点检查节点是否存在。zk的加锁过程就是创建节点的过程当创建节点成功并且成功返回则证明该线程加锁成功继续进行业务逻辑处理在加锁的时候一定要考虑锁的可重入性。下面这段代码实现的是公平锁谁先创建了临时节点那么谁就能先获得锁。加锁的大致逻辑是1先创建带有序列的临时节点2在回调函数中获取父节点的所有子节点判断当前线程创建的临时节点是否位于第一个如果是则获取锁如果不是则判断前一个节点是否存在然后一直循环该逻辑。

public class LockWatch implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback {
    private ZooKeeper zooKeeper;
    /**
     * 当前线程名称
      */
    private String threadName;
    /**
     * 当前线程创建的节点名称
     */
    private String nodeName;
    /**
     * 用来进行锁阻塞只有获取到锁才放行否则进行阻塞
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    public void setZooKeeper(ZooKeeper zooKeeper) {
        this.zooKeeper = zooKeeper;
    }

    public void setThreadName(String threadName) {
        this.threadName = threadName;
    }

    /**
     * 加锁操作也就是往zk的指定目录下插入带有序列的临时节点
     * 需要考虑锁的可重入
     */
    public void tryLock() throws InterruptedException {
        zooKeeper.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL,
        this, "orderLock");
        countDownLatch.await();

    }

    /**
     * 解锁操作
     * @throws InterruptedException
     * @throws KeeperException
     */
    public void unLock() throws InterruptedException, KeeperException {
        // -1代表不考虑版本号在zk中获取删除等相关操作允许版本号的传入
        zooKeeper.delete(nodeName, -1);
    }

    /**
     * 节点创建回调方法
     * @param i
     * @param path
     * @param ctx
     * @param name
     */
    @Override
    public void processResult(int i, String path, Object ctx, String name) {
        if (Objects.nonNull(name) && !"".equals(name)){
            System.out.println(threadName + " create node:" + name);
            nodeName = name.substring(1);
            zooKeeper.getChildren("/", false, this, ctx);
        }
    }

    /**
     * 获取子节点信息的回调方法
     * @param i
     * @param path
     * @param o
     * @param children
     * @param stat
     */
    @Override
    public void processResult(int i, String path, Object o, List<String> children, Stat stat) {
        if (children == null || children.isEmpty()){
            System.out.println("children is null......");
            return;
        }
        // 将子节点进行排序找序号由低到高
        Collections.sort(children);
        // 获取当前创建节点排序以后的下标
        int index = children.indexOf(nodeName);
        // 如果当前节点为第一个节点则加锁成功
        try {
            if (index < 1){
                System.out.println(threadName +" get lock...");
                // -1代表不考虑版本
                zooKeeper.setData("/", threadName.getBytes(), -1);
                countDownLatch.countDown();
            } else {
                System.out.println(threadName +" not get lock...");
                // 判断该节点的前一个节点是否存在目的是为了注册节点监控事件监测删除节点操作
                zooKeeper.exists("/" + children.get(i-1), this, this, o);
            }
        } catch (KeeperException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }

    /**
     * 判断节点是否存在的watch方法
     * @param watchedEvent
     */
    @Override
    public void process(WatchedEvent watchedEvent) {
        switch (watchedEvent.getType()){

            case None:
                break;
            case NodeCreated:
                break;
            case NodeDeleted:
                // 当节点删除的时候代表着解锁触发后续的抢锁操作
                zooKeeper.getChildren("/", false, this, "orderLock");
                break;
            case NodeDataChanged:
                break;
            case NodeChildrenChanged:
                break;
            case DataWatchRemoved:
                break;
            case ChildWatchRemoved:
                break;
            case PersistentWatchRemoved:
                break;
        }
    }

    /**
     * 校验节点是否存在回调方法
     * @param i
     * @param s
     * @param o
     * @param stat
     */
    @Override
    public void processResult(int i, String s, Object o, Stat stat) {

    }
}

        第四部分则为锁的简单应用使用了junit进行测试代码如下

public class ZkLock {

    private ZooKeeper zooKeeper;
    /**
     * 没有业务意义只是为了阻塞主线程
     */
    private CountDownLatch countDownLatch = new CountDownLatch(1);

    /**
     * 初始化的时候首先保证获取到zk的链接实例
     * @throws IOException
     * @throws InterruptedException
     */
    @BeforeAll
    public void connect() throws IOException, InterruptedException {
        zooKeeper = ZkUtils.getInstance();
    }

    /**
     * 使用完以后关闭zk连接
     * @throws InterruptedException
     */
    @AfterAll
    public void close() throws InterruptedException {
        zooKeeper.close();
    }

    /**
     * 使用10个线程模拟抢锁
     */
    @Test
    public void testLock() throws InterruptedException {
        for(int i = 0; i < 10; i++){
            new Thread(){
                @Override
                public void run() {
                    String threadName =Thread.currentThread().getName();
                    LockWatch lockWatch = new LockWatch();
                    lockWatch.setThreadName(threadName);
                    lockWatch.setZooKeeper(zooKeeper);

                    try {
                        lockWatch.tryLock();
                        System.out.println(threadName + "deal business...");
                        lockWatch.unLock();
                    } catch (InterruptedException | KeeperException e) {
                        e.printStackTrace();
                    }
                }
            }.start();
        }

        countDownLatch.await();
    }
}

        补充zk服务中2888端口用于follower调用leader进行写操作3888端口为选主使用端口2181端口为客户端连接zk服务节点端口。

阿里云国内75折 回扣 微信号:monov8
阿里云国际,腾讯云国际,低至75折。AWS 93折 免费开户实名账号 代冲值 优惠多多 微信号:monov8 飞机:@monov6