顾乔芝士网

持续更新的前后端开发技术栈

zk源码—1.数据节点与Watcher机制及权限一

大纲

1.ZooKeeper的数据模型、节点类型与应用

(1)数据模型之树形结构

(2)节点类型与特性(持久 + 临时 + 顺序 )

(3)节点的状态结构(各种zxid + 各种version)

(4)节点的版本(version + cversion + aversion)

(5)使用ZooKeeper实现锁(悲观锁 + 乐观锁)

2.发布订阅模式:用Watcher机制实现分布式通知

(1)Watcher机制是如何实现的

(2)Watcher机制的底层原理

(3)客户端Watcher注册实现过程

(4)服务端处理Watcher过程

(5)服务端Watch事件的触发过程

(6)客户端回调Watcher的处理过程

(7)利用Watcher实现发布订阅

(8)Watcher具有的特性

3.ACL权限控制:避免未经授权的访问

(1)ACL的使用(scheme:id:permission)

(2)实现自己的权限控制

(3)ACL内部实现原理之客户端处理过程

(4)ACL内部实现原理之服务端实现过程

(5)ACL权限总结


使用3.5.10版本


1.ZooKeeper的数据模型、节点类型与应用

(1)数据模型之树形结构

(2)ZNode节点类型与特性(持久 + 临时 + 顺序)

(3)节点的状态结构(各种zxid + 各种version)

(4)节点的版本(version + cversion + aversion)

(5)使用ZooKeeper实现锁(悲观锁 + 乐观锁)

(6)总结


zk基础有三大模块:数据模型ACL权限控制Watch监控。其中数据模型最重要,zk很多典型的应用场景都是利用其数据模块实现的。比如可利用数据模型中的临时节点Watcher监控实现分布式发布订阅


(1)数据模型之树形结构

计算机最根本的作用其实就是处理和存储数据,作为一款分布式一致性框架,zk也是如此。数据模型就是zk用来存储和处理数据的一种逻辑结构,zk数据模型最根本的功能就像一个数据库。接下来按如下步骤做一些简单操作,最终会在zk服务器上得到一个具有层级关系的数据结构。

//步骤一:配置文件zoo.cfg
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181

//步骤二:服务启动
bin.zkServer.sh start

//步骤三:使用客户端连接服务器
bin/zkCli.sh -server 127.0.0.1:2181

//步骤四:完成单机版的开发环境构建之后,便通过zk提供的create命令来创建几个节点 
create /locks
create /servers
create /works

zk中的数据模型是一种树形结构,有一个根文件夹,下面有很多子文件夹。zk的数据模型也具有一个固定的根节点"/",可以在根节点下创建子节点,并在子节点下继续创建下一级节点。zk树中的每一层级用斜杠"/"分隔开只能用绝对路径的方式查询zk节点,如"get /work/task1",不能用相对路径的方式查询zk节点


(2)ZNode节点类型与特性(持久 + 临时 + 顺序)

zk中的数据节点分为持久节点临时节点顺序节点三种类型。


一.持久节点

持久节点在zk最为常用的,几乎所有业务场景中都会包含持久节点的创建。之所以叫作持久节点是因为:一旦将节点创建为持久节点节点数据就会一直存储在ZK服务器上。即使创建该节点的客户端与服务端的会话关闭了,该节点依然不会被删除。如果想要删除持久节点,需要显式调用delete函数进行删除。


二.临时节点

临时节点从名称上可以看出它最重要的一个特性就是临时性。所谓临时性是指:如果将节点创建为临时节点那么该节点数据不会一直存储在zk服务器上。当创建该临时节点的客户端,因会话超时发生异常而关闭时,该节点也相应在zk服务器上被删除。同样,可以像删除持久节点一样主动删除临时节点。


可以利用临时节点的临时特性来进行服务器集群内机器运行情况的统计。比如将集群设置为"/servers"节点,并为集群下的每台服务器创建一个临时节点"/servers/host",当服务器下线时其对应的临时节点就会自动被删除,最后统计临时节点个数就可以知道集群中的运行情况。


三.顺序节点

其实顺序节点并不算是一种单独种类的节点,而是在持久节点和临时节点特性的基础上,增加一个节点有序的性质。所谓节点有序是指:创建顺序节点时,zk服务器会自动使用一个单调递增的数字作为后缀追加到所创建节点后面


例如一个客户端创建了一个路径为"works/task-"的有序节点,那么zk将会生成一个序号并追加到该节点的路径后,最后该节点的路径就会变为"works/task-1"。通过这种方式可以直观查看到节点的创建顺序。


小结:

zk服务器上存储数据的模型是一种树形结构。zk中的数据节点类型有:持久节点持久顺序节点临时节点临时顺序节点。这几种数据节点虽然类型不同,但每个数据节点都有一个二进制数组(byte data[])每个数据节点都有一个记录自身状态信息的字段stat。其中二进制数组会用来存储节点的数据、ACL访问控制信息、子节点数据。


注意:因为临时节点不允许有子节点,所以临时节点的子节点为null


(3)节点的状态结构(各种zxid + 各种version)

每个节点都有属于自己的状态信息,就像每个人都有其身份证信息一样。打开zk客户端,执行"stat /zk_test",可以看到控制台输出一些信息。如下这些信息就是节点状态信息:

cZxid = 5
ctime = Wed Feb 09 18:17:23 CST 2022
mZxid = 5
mtime = Wed Feb 09 18:17:23 CST 2022
pZxid = 5
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x100011d485f0007
dataLength = 2
numChildren = 0

每个节点都有一个自己的状态属性,这些状态属性记录了节点本身的一些信息,包括如下内容:

(4)节点的版本(version + cversion + aversion)

zk为数据节点引入了版本的概念。每个数据节点有3种类型的版本信息:versioncversionaversion,对数据节点的任何更新操作都会引起版本号的变化。


zk的版本信息表示的是:对节点数据内容、子节点信息或者是ACL信息的修改次数。


(5)使用ZooKeeper实现锁(悲观锁 + 乐观锁)

一.悲观锁

悲观锁认为线程对数据资源的竞争总是会出现。为了保证线程在操作数据时,该条数据不会被其他线程修改,那么该条数据要一直处于被锁定的状态。


假设有n个线程同时访问和修改某一数据资源,为了实现线程安全,可以让线程通过创建zk节点"/locks"的方式获取锁。线程A成功创建zk节点"/locks"后获取到锁继续执行,这时线程B也要访问该数据资源,于是线程B尝试创建zk节点"/locks"来尝试获取锁。但是因为线程A已创建该节点,所以线程B创建节点失败而无法获得锁。这样线程A操作数据时,数据就不会被其他线程修改,从而实现了一个简单的悲观锁。


不过存在一个问题:

就是如果进程A因为异常而中断,那么就会导致"/locks"节点始终存在。此时其他线程就会因为无法再次创建节点无法获取锁,从而产生死锁问题。针对这个问题,可以通过将节点设置为临时节点来进行避免,并通过在服务器端添加监听事件来实现锁被释放时可以通知其他线程重新获取锁


二.乐观锁

乐观锁认为线程对数据资源的竞争不会总是出现。所以相对悲观锁而言,加锁方式没有那么激烈,不会全程锁定数据。而是在数据进行提交更新时,才对数据进行冲突检测,如果发现冲突才拒绝操作。


乐观锁基本可以分为读取校验写入三个步骤。CAS(Compare-And-Swap即比较并替换)就是一个乐观锁的实现。CAS有3个操作数:内存值V旧的预期值A要修改的新值B,当且仅当预期值A和内存值V相同时,才会将内存值V修改为B


zk中的version属性就是用来实现乐观锁机制中的"校验"的,zk每个节点都有数据版本的概念。在调用更新操作时,假如有一个客户端试图进行更新操作,该客户端会携带上次获取到的version值进行更新。如果在这段时间内,zk服务器上该节点的数值恰好被其他客户端更新了,那么该节点的数据版本version值一定也会发生变化,从而导致与客户端携带的version无法匹配,于是无法成功更新。因此有效避免了分布式更新的并发安全问题。


在zk的底层实现中,当服务端处理每一个数据更新请求(SetDataRequest)时,首先会调用checkAndIncVersion()方法进行数据版本校验

步骤一:从SetDataRequest请求中获取version

步骤二:通过getRecordForPath()方法获取服务器数据记录nodeRecord,然后从nodeRecord中获取当前服务器上该数据的最新版本currentversion。如果version是-1,表示该请求操作不使用乐观锁,可以忽略版本对比。如果version不是-1,则对比version和currentversion。若相等则进行更新,否则抛出BadVersionException异常中断。


(6)总结

zk的基础内容包括:数据模型节点类型stat状态属性等。利用这些内容可解决:集群中服务器运行情况统计悲观锁乐观锁等。


了解zk数据模型的基本原理后,有一个问题:为什么zk不能采用相对路径查找节点


因为zk大多应用场景是定位数据模型上的节点,并在相关节点上进行操作。对于这种查找与给定值相等的问题,最适合用散列表来解决。因此zk在底层实现时,使用了一个ConcurrentHashMap来存储节点数据,用节点的完整路径作为key去存储节点数据,可以大大提高zk的查找性能

public class ZooKeeperServer implements SessionExpirer, ServerStats.Provider {
    private ZKDatabase zkDb;
    private FileTxnSnapLog txnLogFactory = null;
    ...
}

public class ZKDatabase {
    protected DataTree dataTree;
    protected FileTxnSnapLog snapLog;
    ...
}

public class DataTree {
    private final ConcurrentHashMap<String, DataNode> nodes = new ConcurrentHashMap<String, DataNode>();
    private final WatchManager dataWatches = new WatchManager();
    private final WatchManager childWatches = new WatchManager();
    ...
}


2.发布订阅模式:用Watcher机制实现分布式通知

(1)Watcher机制是如何实现的

(2)Watcher机制的底层原理

(3)客户端Watcher注册实现过程

(4)服务端处理Watcher过程

(5)服务端Watch事件的触发过程

(6)客户端回调Watcher的处理过程

(7)利用Watcher实现发布订阅

(8)Watcher具有的特性


下面介绍zk另一关键技术——Watcher机制,并用它实现一个发布订阅功能


zk的Watcher机制的整个流程:客户端在向zk服务端注册Watcher的同时,会将Watcher对象存储在客户端的WatchManger中。当zk服务端触发Watcher事件后,会向客户端发送通知。客户端线程会从WatchManager中取出对应的Watcher对象来执行回调逻辑。


zk的Watcher机制主要包括三部分客户端线程客户端WatchManagerzk服务端


zk的Watcher机制主要包括三个过程:即客户端注册Watcher服务端处理Watcher客户端回调Watcher,这三个过程其实也是发布订阅功能的几个核心点


(1)Watcher机制是如何实现的

一.客户端向服务端添加Watcher监控事件的方式

二.触发服务端Watcher监控事件通知的条件


一.客户端向服务端添加Watcher监控事件的方式

zk的客户端可以通过Watcher机制订阅服务端上某一节点的数据,以便当该节点的数据状态发生变化时能收到相应的通知


比如可以通过向zk客户端的构造方法传递Watcher参数的方式实现添加Watcher监控事件。下面代码的意思是定义了一个了zk客户端对象实例,并传入三个参数。这个Watcher将作为整个zk会话期间的默认Watcher,一直被保存客户端ZKWatchManagerdefaultWatcher中。

new ZooKeeper(String connectString, int sessionTimeout, Watcher watcher);

connectString:服务端地址
sessionTimeout:超时时间
Watcher:监控事件

除此之外,zk客户端也可以通过getData()、getChildren()和exists()这三个接口,向zk服务端注册Watcher,从而可以方便地在不同的情况下添加Watcher事件。

getData(String path, Watcher watcher, Stat stat);

二.触发服务端Watcher监控事件通知的条件

zk中的接口类Watcher用于表示一个标准的事件处理器,Watcher接口类定义了事件通知的相关逻辑。Watcher接口类中包含了KeeperState和EventType两个枚举类,其中KeeperState枚举类代表会话通知状态EventType枚举类代表事件类型


下图列出了客户端在常见的会话状态下,服务器节点所能支持的事件类型。例如在客户端连接服务端时:可以对数据节点的创建删除数据变更子节点的更新等操作进行监控。

至此已从应用层的角度介绍完zk中的Watcher机制了,通过几个简单的API调用就可以对服务器的节点状态变更进行监控。但在实际生产环境中还会遇到很多意想不到的问题,要想解决好这些问题就要深入理解Watcher机制的底层实现原理。


(2)Watcher机制的底层原理

下面从设计模式角度出发来分析其底层实现:

Watcher机制的结构其实很像设计模式中的观察者模式。一个对象或者数据节点可能会被多个客户端监控,当对应事件被触发时,会通知这些对象或客户端。可以将Watcher机制理解为是分布式环境下的观察者模式,所以接下来以观察者模式的角度来看zk底层Watcher是如何实现的。


实现观察者模式时,最关键的代码就是创建一个列表来存放观察者。而在zk中则是在客户端和服务端分别实现了两个存放观察者列表:也就是客户端的ZKWatchManager服务端的WatchManager。zk的Watcher机制的核心操作其实就是围绕这两个列表展开的。


(3)客户端Watcher注册实现过程

先看客户端的实现过程。当发送一个带有Watcher事件的会话请求时,zk客户端主要会做两个工作:一.标记该会话请求是一个带有Watcher事件的请求二.将Watcher事件存储到ZKWatchManager中


整个客户端Watcher的注册流程如下:

以getData接口为例,当zk客户端发送一个带有Watcher事件的会话请求时:


一.首先将数据节点和Watcher的对应关系封装到DataWatchRegistration

然后把该请求标记为带有Watcher事件的请求,这里说的封装其实指的是new一个对象。


二.接着将请求封装成一个Packet对象并添加到发送队列outgoingQueue

Packet对象被添加到发送队列outgoingQueue后,会进行阻塞等待。也就是通过Packet对象的wait()方法进行阻塞,直到Packet对象标记为完成。


三.随后通过客户端SendThread线程向服务端发送outgoingQueue里的请求

也就是通过ClientCnxnSocket的doTransport()方法处理请求发送和响应。


四.完成请求发送后,客户端SendThread线程会监听并处理服务端响应

也就是由ClientCnxn的内部类SendThreadreadResponse()方法负责接处理务端响应,然后执行ClientCnxn的finishPacket()方法Packet对象中取出对应的Watcher。即通过调用WatchRegistration的register()方法,将Watcher事件注册到ZooKeeper的ZKWatchManager中。


因为客户端一开始将Watcher事件封装到DataWatchRegistration对象中,所以在调用WatchRegistration的register()方法时,客户端就会将之前封装在DataWatchRegistration的Watcher事件交给ZKWatchManager,并最终保存到ZKWatchManager的dataWatches中。


ZKWatchManager的dataWatches是一个Map<String, Set<Watcher>>,用于将数据节点的路径Watcher事件进行一一映射后管理起来。

public class ZooKeeper implements AutoCloseable {
    protected final ClientCnxn cnxn;
    protected final ZKWatchManager watchManager;
    private final ZKClientConfig clientConfig;
    ...
    public byte[] getData(final String path, Watcher watcher, Stat stat) {
        final String clientPath = path;
        PathUtils.validatePath(clientPath);
        WatchRegistration wcb = null;
        if (watcher != null) {
            //1.封装DataWatchRegistration对象暂存数据节点和Watcher的对应关系;
            wcb = new DataWatchRegistration(watcher, clientPath);
        }
        final String serverPath = prependChroot(clientPath);
        RequestHeader h = new RequestHeader();
        h.setType(ZooDefs.OpCode.getData);
        //1.客户端首先会把该会话请求标记为带有Watcher事件的请求
        GetDataRequest request = new GetDataRequest();
        request.setPath(serverPath);
        request.setWatch(watcher != null);
        GetDataResponse response = new GetDataResponse();
        //把封装好的请求和响应交给ClientCnxn客户端进行处理
        ReplyHeader r = cnxn.submitRequest(h, request, response, wcb);
        if (r.getErr() != 0) {
            throw KeeperException.create(KeeperException.Code.get(r.getErr()), clientPath);
        }
        if (stat != null) {
            DataTree.copyStat(response.getStat(), stat);
        }
        return response.getData();
    }
    ...
}

public class ClientCnxn {
    private final LinkedBlockingDeque<Packet> outgoingQueue = new LinkedBlockingDeque<Packet>();
    ...
    public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration) {
        return submitRequest(h, request, response, watchRegistration, null);
    }
    
    public ReplyHeader submitRequest(RequestHeader h, Record request, Record response, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
        ReplyHeader r = new ReplyHeader();
        //2.接着将请求封装成一个Packet对象并添加到一个发送队列outgoingQueue
        Packet packet = queuePacket(h, r, request, response, null, null, null, null, watchRegistration, watchDeregistration);
        synchronized (packet) {
            if (requestTimeout > 0) {
                //阻塞等待
                waitForPacketFinish(r, packet);
            } else {
                //阻塞等待
                while (!packet.finished) {
                    packet.wait();
                }
            }
        }
        if (r.getErr() == Code.REQUESTTIMEOUT.intValue()) {
            sendThread.cleanAndNotifyState();
        }
        return r;
    }
    
    public Packet queuePacket(RequestHeader h, ReplyHeader r, Record request, Record response, AsyncCallback cb, String clientPath, String serverPath, Object ctx, WatchRegistration watchRegistration, WatchDeregistration watchDeregistration) {
        Packet packet = null;
        //2.接着将请求封装成一个Packet对象
        packet = new Packet(h, r, request, response, watchRegistration);
        packet.cb = cb;
        packet.ctx = ctx;
        packet.clientPath = clientPath;
        packet.serverPath = serverPath;
        packet.watchDeregistration = watchDeregistration;
        synchronized (outgoingQueue) {
            if (!state.isAlive() || closing) {
                conLossPacket(packet);
            } else {
                if (h.getType() == OpCode.closeSession) {
                    closing = true;
                }
                //2.将封装好的Packet对象添加到一个发送队列outgoingQueue
                outgoingQueue.add(packet);
            }
        }
        sendThread.getClientCnxnSocket().packetAdded();
        return packet;
    }
    
    class SendThread extends ZooKeeperThread {
        ...
        @Override
        public void run() {
            //把outgoingQueue传入clientCnxnSocket中
            clientCnxnSocket.introduce(this, sessionId, outgoingQueue);
            ...
            while (state.isAlive()) {
                //建立和服务端的连接等
                if (!clientCnxnSocket.isConnected()) {
                    if (closing) {
                        break;
                    }
                    if (rwServerAddress != null) {
                        serverAddress = rwServerAddress;
                        rwServerAddress = null;
                    } else {
                        serverAddress = hostProvider.next(1000);
                    }
                    onConnecting(serverAddress);
                    startConnect(serverAddress);
                    clientCnxnSocket.updateNow();
                    clientCnxnSocket.updateLastSendAndHeard();
                }
                ...
                //3.将outgoingQueue里的请求发送出去 + 处理接收到的响应
                clientCnxnSocket.doTransport(to, pendingQueue, ClientCnxn.this);
                ...
            }
        }
        
        //读取服务端发送过来的输入流
        void readResponse(ByteBuffer incomingBuffer) {
            ...
            finishPacket(packet);
        }
    }
    ...
    protected void finishPacket(Packet p) {
        int err = p.replyHeader.getErr();
        if (p.watchRegistration != null) {
            p.watchRegistration.register(err);
        }
        ...
    }
}

public class ClientCnxnSocketNIO extends ClientCnxnSocket {
    ...
    @Override
    void doTransport(int waitTimeOut, List<Packet> pendingQueue, ClientCnxn cnxn) {
        ...
        doIO(pendingQueue, cnxn);
        ...
    }
    
    void doIO(List<Packet> pendingQueue, ClientCnxn cnxn) {
        SocketChannel sock = (SocketChannel) sockKey.channel();
        ...
        //处理接收响应
        if (sockKey.isReadable()) {
            ...
            sendThread.readResponse(incomingBuffer);
            ...
        }
        //处理发送请求
        if (sockKey.isWritable()) {
            Packet p = findSendablePacket(outgoingQueue, sendThread.tunnelAuthInProgress());
            ...
            p.createBB();
            sock.write(p.bb);
            outgoingQueue.removeFirstOccurrence(p);
            pendingQueue.add(p);
            ...
        }
        ...
    }
}

public class ZooKeeper implements AutoCloseable {
    protected final ClientCnxn cnxn;
    protected final ZKWatchManager watchManager;
    private final ZKClientConfig clientConfig;
    ...
    public abstract class WatchRegistration {
        private Watcher watcher;
        private String clientPath;
        ...
        public void register(int rc) {
            if (shouldAddWatch(rc)) {
                Map<String, Set<Watcher>> watches = getWatches(rc);
                synchronized(watches) {
                    Set<Watcher> watchers = watches.get(clientPath);
                    if (watchers == null) {
                        watchers = new HashSet<Watcher>();
                        watches.put(clientPath, watchers);
                    }
                    watchers.add(watcher);
                }
            }
        }
        abstract protected Map<String, Set<Watcher>> getWatches(int rc);
        protected boolean shouldAddWatch(int rc) {
            return rc == 0;
        }
    }
    
    class DataWatchRegistration extends WatchRegistration {
        ...
        @Override
        protected Map<String, Set<Watcher>> getWatches(int rc) {
            return watchManager.dataWatches;
        }
    }
    
    static class ZKWatchManager implements ClientWatchManager {
        private final Map<String, Set<Watcher>> dataWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> existWatches = new HashMap<String, Set<Watcher>>();
        private final Map<String, Set<Watcher>> childWatches =  new HashMap<String, Set<Watcher>>();
        ...
    }
}

注意:客户端每调用一次getData()接口,就会注册一个Watcher,但这些Watcher实体不会随着客户端请求被发送到服务端。如果客户端的所有Watcher都被发送到服务端的话,服务端可能就会出现内存紧张或其他性能问题。虽然封装Packet对象的时候会传入DataWatchRegistration对象,但是在底层实际的网络传输对Packet对象序列化的过程中,并没有将DataWatchRegistration对象序列化字节数组


具体来说,在Packet的createBB()方法中,zk只会将requestHeader和request两个属性进行序列化。尽管DataWatchRegistration对象被封装在了Packet中,但是并没有被序列化到底层字节数组里去,因此不会进行网络传输。

public class ClientCnxn {
    ...
    static class Packet {
        RequestHeader requestHeader;
        ReplyHeader replyHeader;
        Record request;
        Record response;
        ByteBuffer bb;
        ...
        public void createBB() {
            ByteArrayOutputStream baos = new ByteArrayOutputStream();
            BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
            boa.writeInt(-1, "len"); // We'll fill this in later
            if (requestHeader != null) {
                requestHeader.serialize(boa, "header");
            }
            if (request instanceof ConnectRequest) {
                request.serialize(boa, "connect");
                // append "am-I-allowed-to-be-readonly" flag
                boa.writeBool(readOnly, "readOnly");
            } else if (request != null) {
                request.serialize(boa, "request");
            }
            baos.close();
            this.bb = ByteBuffer.wrap(baos.toByteArray());
            this.bb.putInt(this.bb.capacity() - 4);
            this.bb.rewind();
        }
        ...
    }
    ...
}
控制面板
您好,欢迎到访网站!
  查看权限
网站分类
最新留言