Zookeeper最早起点于雅虎的研商院的二个讨论小组

[TOC]

原文:https://www.jianshu.com/p/70151fc0ef5d

Zookeeper客户端Curator高级天性

提醒:首先你无法不添加curator-recipes敬重,下文仅仅对recipes一些特征的应用举办分解和举例,不打算举行源码级其他探索

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

重点指示:强烈推荐使用ConnectionStateListener监控连接的情形,当连接景况为LOST,curator-recipes下的有所Api将会失灵只怕逾期,即便前面全体的例子都并未利用到ConnectionStateListener。

Zookeeper客户端Curator使用详解

Zookeeper客户端Curator使用详解

缓存

Zookeeper原生接济通过挂号沃特cher来展开事件监听,然则开发者要求频仍注册(沃特cher只好单次注册单次使用)。Cache是Curator中对事件监听的包装,可以用作是对事件监听的地头缓存视图,可以自动为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的变化。

简介

Curator是Netflix集团开源的一套zookeeper客户端框架,化解了重重Zookeeper客户端格外底层的细节开发工作,包涵连接重连、反复注册沃特cher和NodeExistsException很是等等。Patrixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的原由是比较好玩的,上面的有的摘抄自《从PAXOS到ZOOKEEPEXC伍拾陆分布式一致性原理与实践》一书:
Zookeeper最早起点于雅虎的研讨院的3个切磋小组。在当下,商量人士发现,在雅虎内部很多特大型的连串必要依赖三个近似的系统举办分布式协调,可是那一个种类往往存在分布式单点难点。所以雅虎的开发人士就准备开发一个通用的无单点难题的分布式协调框架。在立项初期,考虑到许多门类都以用动物的名字来定名的(例如出名的Pig项目),雅虎的工程师希望给那几个连串也取一个动物的名字。时任研讨院的上位数学家Raghu
Ramakrishnan开玩笑说:再如此下来,我们那儿就改成动物园了。此话一出,大家纷繁表示就叫动物园管理员吧——因为各类以动物命名的分布式组件放在一块儿,雅虎的万事分布式系统看上去似乎2个重型的动物园了,而Zookeeper正好用来拓展分布式环境的和谐——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”或然”管理者”,不精晓是还是不是支付小组有意而为之,作者估算有或者那样命名的缘故是印证Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包罗了多少个包:
curator-framework:对zookeeper的底层api的有的包裹
curator-client:提供部分客户端的操作,例如重试策略等
curator-recipes:卷入了一些尖端个性,如:Cache事件监听、公投、分布式锁、分布式计数器、分布式Barrier等
Maven看重(使用curator的本子:2.12.0,对应Zookeeper的版本为:3.4.x,如果跨版本会有兼容性难点,很有只怕引致节点操作失利):

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

前提

多年来正巧用到了zookeeper,做了三个基于SpringBoot、Curator、Bootstrap写了多个可视化的Web应用:

zookeeper-console

迎接使用和star。

Path Cache

Path Cache用来监督多个ZNode的子节点. 当二个子节点增添,更新,删除时,
Path
Cache会改变它的情事,会蕴藏最新的子节点,子节点的多少和情景,而气象的更变将经过PathChildrenCacheListener公告。

其实使用时会涉及到七个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

透过上面的构造函数创建Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想利用cache,必须调用它的start办法,使用完后调用close方法。

public void addListener(PathChildrenCacheListener listener)
可以追加listener监听缓存的成形。

getCurrentData()格局重回一个List<ChildData>目的,能够遍历全体的子节点。

设置/更新、移除其实是采纳client(CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:若是new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将回到null,cache将不会缓存节点数据。

注意:示范中的Thread.sleep(10)可以注释掉,不过注释后事件监听的触发次数会不全,那说不定与PathCache的贯彻原理有关,不可以太过数拾2遍的触及事件!

Curator的基本Api

简介

Curator是Netflix公司开源的一套zookeeper客户端框架,消除了许多Zookeeper客户端极度底层的底细开发工作,包蕴一而再重连、反复注册沃特cher和NodeExistsException极度等等。帕特rixck
Hunt(Zookeeper)以一句“Guava is to Java that Curator to
Zookeeper”给Curator予高度评价。
引子和趣闻:
Zookeeper名字的来由是相比较有趣的,上边的一对摘抄自《从PAXOS到ZOOKEEPE宝马7系分布式一致性原理与实践》一书:
Zookeeper最早起点于雅虎的切磋院的一个研讨小组。在当下,商讨人员发现,在雅虎内部很多特大型的种类须求借助贰个看似的系统举行分布式协调,不过那一个连串往往存在分布式单点难题。所以雅虎的开发人士就打算开发一个通用的无单点难题的分布式协调框架。在立项初期,考虑到不少种类都是用动物的名字来命名的(例如有名的Pig项目),雅虎的工程师希望给那几个类型也取贰个动物的名字。时任探究院的首席地理学家Raghu
Ramakrishnan开玩笑说:再那样下去,我们那时候就改成动物园了。此话一出,大家纷繁表示就叫动物园管理员吧——因为种种以动物命名的分布式组件放在一块儿,雅虎的全套分布式系统看上去就如二个重型的动物园了,而Zookeeper正好用来拓展分布式环境的调和——于是,Zookeeper的名字由此诞生了。

Curator无疑是Zookeeper客户端中的瑞士联邦军刀,它译作”馆长”或许”管理者”,不亮堂是否付出小组有意而为之,小编臆想有或者那样命名的缘由是验证Curator就是Zookeeper的馆长(脑洞有点大:Curator就是动物园的园长)。
Curator包涵了几个包:
curator-framework:对zookeeper的底层api的一对卷入
curator-client:提供部分客户端的操作,例如重试策略等
curator-recipes:打包了一部分尖端性子,如:Cache事件监听、选举、分布式锁、分布式计数器、分布式巴里r等
Maven看重(使用curator的本子:2.12.0,对应Zookeeper的版本为:3.4.x,假定跨版本会有包容性难点,很有只怕造成节点操作败北):

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-framework</artifactId>
            <version>2.12.0</version>
        </dependency>
        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某3个一定的节点。它关系到上面的三个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:行使cache,如故要调用它的start()格局,使用完后调用close()方法。

getCurrentData()将获得节点当前的事态,通过它的处境可以博得当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:以身作则中的Thread.sleep(10)可以注释,然而注释后事件监听的触发次数会不全,这恐怕与NodeCache的兑现原理有关,不只怕太过数次的触及事件!

注意:NodeCache只好监听2个节点的情形变化。

创办会话

Curator的基本Api

Tree Cache

Tree
Cache可以监督全体树上的具备节点,类似于PathCache和NodeCache的组合,主要涉及到下边八个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊芙nt – 触发的事件类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中从未运用Thread.sleep(10),然则事件触发次数也是常规的。

注意:TreeCache在早先化 (调用start()措施)
的时候会回调TreeCacheListener实例1个事TreeCache伊夫nt,而回调的TreeCache伊芙nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有大概导致空指针至极,那里应该积极处理并防止这种情形。

1.行使静态工程措施创造客户端

3个例证如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

newClient静态工厂方法包涵多少个首要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

开创会话

Leader选举

在分布式计算中, leader elections是很关键的一个功力,
那几个大选进程是那样子的: 指派三个进程作为协会者,将职责分发给各节点。
在任务开端前,
哪个节点都不了解何人是leader(领导者)恐怕coordinator(协调者)。当公投算法初始实践后,
各个节点最后会获取二个唯一的节点作为天职leader. 除此之外,
公投还二十三12日多头会发出在leader意外宕机的动静下,新的leader要被大选出来。

在zookeeper集群中,leader负责写操作,然后经过Zab协议落到实处follower的联名,leader大概follower都得以处理读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是具有存活的客户端不间断的交替做Leader,梅州社会。后者是如若大选出Leader,除非有客户端挂掉重新触发公投,否则不会交出领导权。

2.用到Fluent风格的Api创制会话

中央参数变为流式设置,二个列子如下:

        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

1.施用静态工程措施创立客户端

1个事例如下:

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client =
CuratorFrameworkFactory.newClient(
                        connectionInfo,
                        5000,
                        3000,
                        retryPolicy);

 

newClient静态工厂方法包罗四个主要参数:

参数名 说明
connectionString 服务器列表,格式host1:port1,host2:port2,…
retryPolicy 重试策略,内建有四种重试策略,也可以自行实现RetryPolicy接口
sessionTimeoutMs 会话超时时间,单位毫秒,默认60000ms
connectionTimeoutMs 连接创建超时时间,单位毫秒,默认60000ms

LeaderLatch

LeaderLatch有多个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

一旦运营,LeaderLatch会和任何使用相同latch
path的任何LeaderLatch交涉,然后中间三个末尾会被大选为leader,可以经过hasLeadership措施查看LeaderLatch实例是还是不是leader。

leaderLatch.hasLeadership( );

再次来到true表明当前实例是leader。

类似JDK的CountDownLatch,
LeaderLatch在乞请成为leadership会block(阻塞),一旦不行使LeaderLatch了,必须调用close办法。
若是它是leader,会放出leadership, 其余的参加者将会选出两个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

那3个处理:
LeaderLatch实例可以追加ConnectionStateListener来监听网络连接难点。 当
SUSPENDED 或 LOST
时,leader不再认为本人大概leader。当LOST后,连接重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再也成立二个。LeaderLatch用户必须考虑导致leadership丢失的连日难点。
强烈推荐你拔取ConnectionStateListener。

二个LeaderLatch的运用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的看重方便开展测试,不须求运营真实的zookeeper服务端:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-test</artifactId>
    <version>2.12.0</version>
</dependency>

率先大家创立了拾1个LeaderLatch,运行后它们中的二个会被推举为leader。
因为大选会开销一些时间,start 后并无法立刻就收获 leader。
通过hasLeadership()翻看自个儿是还是不是是leader, 要是是的话再次来到true。
可以经过.getLeader().getId()可以拿走当前的leader的ID。
只好经过close()自由当前的政权。
await()是3个打断方法, 尝试获取leader地位,可是未必能上位。

3.创造包括隔离命名空间的对话

为了促成不一致的Zookeeper业务之间的割裂,要求为各类工作分配1个单独的命名空间(NameSpace),即内定五个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(下边的事例)当客户端钦赐了独立命名空间为“/base”,那么该客户端对Zookeeper上的数目节点的操作都以依据该目录进行的。通过设置Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在两个使用共用1个Zookeeper集群的景观下,那对于贯彻差距应用之间的互相隔离12分有含义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

2.运用Fluent风格的Api成立会话

着力参数变为流式设置,一个列子如下:

 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .build();

 

LeaderSelector

LeaderSelector使用的时候根本涉及下边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

主旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运行,当实例取得领导权时您的listener的takeLeadership()艺术被调用。而takeLeadership()方法唯有领导权被放飞时才重回。
当你不再利用LeaderSelector实例时,应该调用它的close方法。

可怜处理:
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接情状的改变。尽管实例成为leader,它应该响应SUSPENDED
或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在重复连接成功以前它只怕不再是leader了。 假如LOST状态现身,
实例不再是leader, takeLeadership方法重回。

重要: 推荐处理格局是当接到SUSPENDED 或
LOST时抛出CancelLeadershipException格外.。那会造成LeaderSelector实例中断并注销执行takeLeadership方法的很是.。那不行主要,
你无法不考虑扩展LeaderSelectorListenerAdapter.
LeaderSelectorListener艾达pter提供了引进的拍卖逻辑。

下边的二个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

您可以在takeLeadership举行职责的分红等等,并且毫不回来,如果您想要要此实例一直是leader的话可以加3个死循环。调用
leaderSelector.autoRequeue();保险在此实例释放领导权之后还大概赢得领导权。
在此间大家利用AtomicInteger来记录此client得到领导权的次数, 它是”fair”,
每一个client有相同的机会获取领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

对照可以,LeaderLatch必须调用close()措施才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行支配,
在方便的时候释放领导权,那样各样节点都有或许赢得领导权。从而,LeaderSelector具有更好的灵活性和可控性,提出有LeaderElection应用场景下优先使用LeaderSelector。

发轫客户端

当创制会话成功,拿到client的实例然后可以直接调用其start( )方法:

client.start();

3.制造包罗隔离命名空间的对话

为了兑现不一致的Zookeeper业务之间的割裂,要求为各个业务分配三个单身的命名空间(NameSpace),即指定二个Zookeeper的根路径(官方术语:为Zookeeper添加“Chroot”特性)。例如(上面的例子)当客户端内定了单独命名空间为“/base”,那么该客户端对Zookeeper上的数据节点的操作都是依照该目录进行的。通过安装Chroot可以将客户端应用与Zookeeper服务端的一课子树相对应,在几个使用共用一个Zookeeper集群的情景下,那对于落到实处分化选用之间的竞相隔离十二分有意义。

RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        CuratorFramework client =
        CuratorFrameworkFactory.builder()
                .connectString(connectionInfo)
                .sessionTimeoutMs(5000)
                .connectionTimeoutMs(5000)
                .retryPolicy(retryPolicy)
                .namespace("base")
                .build();

 

分布式锁

提醒:

  1. 推介应用ConnectionStateListener监控连接的事态,因为当连接LOST时您不再抱有锁。

  2. 分布式的锁全局同步,
    这意味任何三个时辰点不会有多少个客户端都拥有相同的锁。

数码节点操作

开行客户端

当成立会话成功,得到client的实例然后可以直接调用其start( )方法:

client.start();

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是大局可知的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同一个客户端在装有锁的还要,可以频仍得到,不会被封堵。
它是由类InterProcessMutex来贯彻。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()获取锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()办法释放锁。 InterProcessMutex 实例可以选取。

Revoking: ZooKeeper recipes wiki定义了可商榷的废除机制。
为了撤消mutex,调用下边的点子:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

如若你请求打消当前的锁,
调用attemptRevoke()艺术,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

2回提示:错误处理
依然强烈推荐你使用ConnectionStateListener处理连接景况的更动。
当连接LOST时你不再抱有锁。

率先让大家创造1个模拟的共享能源,
这几个财富期望只好单线程的走访,否则会有出现问题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

然后创制三个InterProcessMutexDemo类, 它肩负请求锁,
使用财富,释放锁这样二个整机的拜访进程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简短,生成拾二个client, 每种client重复执行拾肆次请求锁–访问财富–释放锁的经过。每种client都在单独的线程中。
结果可以见到,锁是随机的被每种实例排他性的拔取。

既然如此是可选拔的,你可以在二个线程中一再调用acquire(),在线程拥有锁时它总是回到true。

您不应当在多少个线程中用同1个InterProcessMutex
你可以在种种线程中都生成二个新的InterProcessMutex实例,它们的path都无异,那样它们得以共享同1个锁。

始建数量节点

Zookeeper的节点成立方式:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带体系号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:权且并且带体系号

**始建一个节点,早先内容为空 **

client.create().forPath("path");

在意:假诺没有安装节点属性,节点创造形式专断认同为持久化节点,内容默许为空

始建壹个节点,附带伊始化内容

client.create().forPath("path","init".getBytes());

开创一个节点,钦定成立格局(目前节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创制三个节点,内定创制格局(一时半刻节点),附带起始化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

开创三个节点,钦命创立格局(暂且节点),附带开首化内容,并且自动递归创建父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

以此creatingParentContainersIfNeeded()接口万分有用,因为相似情况开发人士在开立多少个子节点必须认清它的父节点是不是留存,如若不设有直接创建会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator可以活动递归创造全数所需的父节点。

多少节点操作

不可重入共享锁—Shared Lock

以此锁和地点的InterProcessMutex绝对而言,就是少了Reentrant的功能,也就代表它无法在同1个线程中重入。这几个类是InterProcessSemaphoreMutex,使用方式和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运维后意识,有且只有三个client成功收获第②个锁
(第二个acquire()主意重返true),然后它自身过不去在其次个acquire()方式,获取第1个锁超时;其余具备的客户端都阻塞在率先个acquire()措施超时并且抛出万分。

这般也就注脚了InterProcessSemaphoreMutex贯彻的锁是不行重入的。

除去数据节点

删除1个节点

client.delete().forPath("path");

专注,此措施只好去除叶子节点,否则会抛出尤其。

去除多少个节点,并且递归删除其抱有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删除多少个节点,强制内定版本进行删减

client.delete().withVersion(10086).forPath("path");

去除五个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是壹个保证方法,只要客户端会话有效,那么Curator会在后台持续开展删除操作,直到删除节点成功。

注意:地点的多个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

始建数量节点

Zookeeper的节点创制情势:

  • PERSISTENT:持久化
  • PERSISTENT_SEQUENTIAL:持久化并且带种类号
  • EPHEMERAL:临时
  • EPHEMERAL_SEQUENTIAL:一时半刻并且带种类号

**创制多个节点,起先内容为空 **

client.create().forPath("path");

瞩目:假设没有安装节点属性,节点创立格局暗中同意为持久化节点,内容暗许为空

始建一个节点,附带起初化内容

client.create().forPath("path","init".getBytes());

创制三个节点,钦赐创制形式(一时节点),内容为空

client.create().withMode(CreateMode.EPHEMERAL).forPath("path");

创立一个节点,内定创制形式(一时半刻节点),附带初阶化内容

client.create().withMode(CreateMode.EPHEMERAL).forPath("path","init".getBytes());

创造3个节点,钦定创设形式(目前节点),附带初叶化内容,并且自动递归成立父节点

client.create()
      .creatingParentContainersIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .forPath("path","init".getBytes());

其一creatingParentContainersIfNeeded()接口相当有用,因为相似情形开发人士在开立多个子节点必须认清它的父节点是不是留存,假诺不存在直接创设会抛出NoNodeException,使用creatingParentContainersIfNeeded()之后Curator可以活动递归创建全部所需的父节点。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。3个读写锁管理一对有关的锁。二个担负读操作,其余多少个担当写操作。读操作在写锁没被使用时可同时由多少个过程使用,而写锁在应用时不容许读(阻塞)。

此锁是可重入的。三个具备写锁的线程可重入读锁,不过读锁却无法进来写锁。那也象征写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是十三分的。

可重入读写锁主要由七个类完毕:InterProcessReadWriteLockInterProcessMutex。使用时首先成立三个InterProcessReadWriteLock实例,然后再依据你的急需拿到读锁可能写锁,读写锁的品种是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

读取数据节点数据

读取三个节点的数码内容

client.getData().forPath("path");

瞩目,此措施返的再次来到值是byte[ ];

读取三个节点的数据内容,同时得到到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

除去数据节点

删去二个节点

client.delete().forPath("path");

专注,此措施只可以去除叶子节点,否则会抛出尤其。

剔除3个节点,并且递归删除其负有的子节点

client.delete().deletingChildrenIfNeeded().forPath("path");

删去三个节点,强制内定版本举办删减

client.delete().withVersion(10086).forPath("path");

剔除一个节点,强制保证删除

client.delete().guaranteed().forPath("path");

guaranteed()接口是多少个维持方法,只要客户端会话有效,那么Curator会在后台持续拓展删减操作,直到删除节点成功。

注意:地点的五个流式接口是足以自由组合的,例如:

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(10086).forPath("path");

信号量—Shared Semaphore

多个计数的信号量类似JDK的塞马phore。
JDK中Semaphore维护的一组认可(permits),而Curator中称之为租约(Lease)。
有二种格局得以控制semaphore的最大租约数。第壹种方法是用户给定path并且钦定最大LeaseSize。第①种办法用户给定path并且动用SharedCountReader类。即使不利用SharedCount里德r,必须确保所有实例在多进程中运用同样的(最大)租约数量,否则有大概出现A进度中的实例持有最大租约数量为10,然而在B进程中颇具的最大租约数量为20,此时租约的含义就失效了。

本次调用acquire()会回到2个租约对象。
客户端必须在finally中close那么些租约对象,否则这几个租约会丢失掉。
然则,假诺客户端session由于某种原因,比如crash丢掉,
那么那一个客户端持有的租约会自动close,
那样任何客户端可以继续接纳这几个租约。 租约还足以经过上边的不二法门返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

在意你可以四次性请求多少个租约,借使Semaphore当前的租约不够,则请求线程会被堵塞。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的紧要类包含上面多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

率先大家先拿到了5个租约, 最终大家把它还给了semaphore。
接着请求了一个租约,因为semaphore还有多少个租约,所以恳请可以满足,再次来到八个租约,还剩5个租约。
然后再请求一个租约,因为租约不够,卡住直到超时,依旧没能满意,重临结果为null
(租约不足会阻塞到过期,然后回来null,不会再接再砺抛出特别;若是不安装超时时间,会平昔不通)。

地点说讲的锁都以公平锁 (fair)。 从ZooKeeper的角度看,
每一种客户端都依照请求的次第得到锁,不存在非公平的私吞的情状。

立异数据节点数据

更新2个节点的多少内容

client.setData().forPath("path","data".getBytes());

留意:该接口会回来2个Stat实例

履新2个节点的数码内容,强制内定版本举行更新

client.setData().withVersion(10086).forPath("path","data".getBytes());

读取数据节点数据

读取2个节点的数量内容

client.getData().forPath("path");

在意,此情势返的再次回到值是byte[ ];

读取三个节点的多少内容,同时拿到到该节点的stat

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("path");

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是二个锁的器皿。 当调用acquire()
全部的锁都会被acquire(),如若请求战败,全数的锁都会被release。
同样调用release时怀有的锁都被release(破产被忽视)。
基本上,它就是组锁的意味,在它上边的请求释放操作都会传递给它包括的兼具的锁。

第贰涉嫌三个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数须求包涵的锁的聚集,大概一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建二个InterProcessMultiLock, 包罗二个重入锁和三个非重入锁。
调用acquire()后得以见到线程同时具有了那五个锁。
调用release()观察那三个锁都被放飞了。

说到底再反复一回,
强烈推荐使用ConnectionStateListener监控连接的情景,当连接情状为LOST,锁将会丢掉。

自我批评节点是还是不是存在

client.checkExists().forPath("path");

小心:该方法再次回到1个Stat实例,用于检查ZNode是不是存在的操作.
可以调用额外的点子(监控大概后台处理)并在最后调用forPath(
)内定要操作的ZNode

革新数据节点数据

履新四个节点的数据内容

client.setData().forPath("path","data".getBytes());

留神:该接口会回来2个Stat实例

立异2个节点的多少内容,强制钦定版本举行创新

client.setData().withVersion(10086).forPath("path","data".getBytes());

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以兑现一个集群共享的计数器。
只要利用同样的path就能够得到最新的计数器值,
那是由ZooKeeper的一致性有限支撑的。Curator有八个计数器,
二个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

获取有些节点的全体子节点路径

client.getChildren().forPath("path");

在意:该措施的再次来到值为List<String>,拿到ZNode的子节点Path列表。
可以调用额外的措施(监控、后台处理如故取得状态watch, background or get
stat) 并在最终调用forPath()钦定要操作的父ZNode

检查节点是还是不是存在

client.checkExists().forPath("path");

留神:该办法重返二个Stat实例,用于检查ZNode是还是不是存在的操作.
能够调用额外的措施(监控可能后台处理)并在结尾调用forPath(
)指定要操作的ZNode

分布式int计数器—SharedCount

本条类应用int类型来计数。 紧要涉及五个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器,
可以为它扩大3个SharedCountListener,当计数器改变时此Listener可以监听到改变的轩然大波,而SharedCountReader可以读取到最新的值,
蕴涵字面值和带版本消息的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在这些事例中,大家接纳baseCount来监听计数值
(addListener办法来添加SharedCountListener )。 任意的SharedCount,
只要使用相同的path,都可以取得那一个计数值。
然后大家运用两个线程为计数值增添3个10以内的随机数。相同的path的SharedCount对计数值举办更改,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此间大家运用trySetCount去设置计数器。
先是个参数提供当前的VersionedValue,倘诺时期其余client更新了此计数值,
你的翻新或然不成事,
可是此时你的client更新了新型的值,所以失败了您可以品尝再更新一遍。
setCount是威胁更新计数器的值

瞩目计数器必须start,使用完事后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

事务

CuratorFramework的实例包罗inTransaction(
)接口方法,调用此办法开启一个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为一个原子操作提交。二个事例如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

赢得有个别节点的全数子节点路径

client.getChildren().forPath("path");

专注:该方式的再次来到值为List<String>,得到ZNode的子节点Path列表。
可以调用额外的措施(监控、后台处理照旧取得状态watch, background or get
stat) 并在最后调用forPath()内定要操作的父ZNode

分布式long计数器—DistributedAtomicLong

再看1个Long类型的计数器。 除了计数的界定比SharedCount大了之外,
它首先尝试运用乐观锁的章程设置计数器,
假若不成功(比如时期计数器已经被其余client更新了),
它使用InterProcessMutex方法来更新计数值。

可以从它的里边贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一名目繁多的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 增添一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查重回结果的succeeded(), 它代表此操作是还是不是成功。
尽管操作成功, preValue()意味着操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

异步接口

上面提到的创导、删除、更新、读取等方法都以一起的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次回到的结果新闻。BackgroundCallback接口中3个重大的回调值为CuratorEvent,里面富含事件类型、响应吗和节点的详细新闻。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

多少个异步创设节点的事例如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不钦命executor,那么会暗中认同使用Curator的伊夫ntThread去开展异步处理。

事务

CuratorFramework的实例包蕴inTransaction(
)接口方法,调用此格局开启五个ZooKeeper事务. 可以复合create, setData,
check, and/or delete
等操作然后调用commit()作为贰个原子操作提交。二个例证如下:

client.inTransaction().check().forPath("path")
      .and()
      .create().withMode(CreateMode.EPHEMERAL).forPath("path","data".getBytes())
      .and()
      .setData().withVersion(10086).forPath("path","data2".getBytes())
      .and()
      .commit();

分布式队列

动用Curator也能够简化Ephemeral Node
(目前节点)的操作。Curator也提供ZK Recipe的分布式队列完毕。 利用ZK的
PEPRADOSISTENTS_EQUENTIAL节点,
可以确保放入到行列中的项目是遵从顺序排队的。
如若纯粹的主顾从队列中取数据, 那么它是先入先出的,那也是队列的性子。
假设您严刻要求顺序,你就的施用单一的主顾,可以行使Leader公投只让Leader作为唯一的买主。

然则, 依照Netflix的Curator笔者所说,
ZooKeeper真心不切合做Queue,大概说ZK没有落成三个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传导限制。
    实践中ZNode必须相对较小,而队列包涵众多的新闻,卓殊的大。
  2. 设若有诸多节点,ZK运行时万分的慢。 而使用queue会导致众多ZNode.
    你须要精晓增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不制造了1个特地的先后做那事。
  4. 当很大方的隐含众多的子节点的ZNode时, ZK的习性变得不得了
  5. ZK的数据库完全放在内存中。 大批量的Queue意味着会占有很多的内存空间。

虽说, Curator如故成立了各类Queue的贯彻。
如果Queue的数据量不太多,数据量不太大的意况下,酌情考虑,还可以运用的。

Curator食谱(高级本性)

提示:首先你必须添加curator-recipes正视,下文仅仅对recipes一些特点的采纳举办解释和举例,不打算进行源码级其余商量

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

重点提醒:强烈推荐使用ConnectionStateListener监控连接的意况,当连接意况为LOST,curator-recipes下的具有Api将会失灵可能逾期,即使前面全部的例子都不曾运用到ConnectionStateListener。

异步接口

地点提到的创办、删除、更新、读取等艺术都以同台的,Curator提供异步接口,引入了BackgroundCallback接口用于拍卖异步接口调用之后服务端再次回到的结果消息。BackgroundCallback接口中二个着重的回调值为Curator伊夫nt,里面含有事件类型、响应吗和节点的详细消息。

CuratorEventType

事件类型 对应CuratorFramework实例的方法
CREATE #create()
DELETE #delete()
EXISTS #checkExists()
GET_DATA #getData()
SET_DATA #setData()
CHILDREN #getChildren()
SYNC #sync(String,Object)
GET_ACL #getACL()
SET_ACL #setACL()
WATCHED #Watcher(Watcher)
CLOSING #close()

响应码(#getResultCode())

响应码 意义
0 OK,即调用成功
-4 ConnectionLoss,即客户端与服务端断开连接
-110 NodeExists,即节点已经存在
-112 SessionExpired,即会话过期

二个异步创设节点的例子如下:

Executor executor = Executors.newFixedThreadPool(2);
client.create()
      .creatingParentsIfNeeded()
      .withMode(CreateMode.EPHEMERAL)
      .inBackground((curatorFramework, curatorEvent) -> {      System.out.println(String.format("eventType:%s,resultCode:%s",curatorEvent.getType(),curatorEvent.getResultCode()));
      },executor)
      .forPath("path");

注意:如果#inBackground()方法不内定executor,那么会暗许使用Curator的伊芙ntThread去举行异步处理。

分布式队列—DistributedQueue

DistributedQueue是最平凡的一种队列。 它设计以下多个类:

  • QueueBuilder – 创设队列使用QueueBuilder,它也是其他队列的创始类
  • QueueConsumer – 队列中的音信消费者接口
  • QueueSerializer –
    队列消息体系化和反序列化接口,提供了对队列中的对象的种类化和反体系化
  • DistributedQueue – 队列已毕类

QueueConsumer是顾客,它可以接收队列的数码。处理队列中的数据的代码逻辑可以置身QueueConsumer.consumeMessage()中。

好端端景况下先将音信从队列中移除,再交由消费者消费。但那是两个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当顾客消费数量时享有锁,这样任何消费者不可能消费此音讯。假诺消费退步可能经过死掉,音讯能够交到其余进度。那会拉动一点性质的损失。最好照旧单消费者情势采纳队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了多少个分布式队列和七个顾客,因为PATH是平等的,会设有消费者抢占消费消息的事态。

缓存

Zookeeper原生援救通过挂号沃特cher来开展事件监听,可是开发者须求频仍注册(沃特cher只好单次注册单次使用)。Cache是Curator中对事件监听的包装,可以看做是对事件监听的地头缓存视图,能够自动为开发者处理反复注册监听。Curator提供了二种沃特cher(Cache)来监听结点的浮动。

Curator食谱(高级性子)

晋升:首先你必须添加curator-recipes倚重,下文仅仅对recipes一些风味的使用举办解释和举例,不打算展开源码级其他研讨

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-recipes</artifactId>
            <version>2.12.0</version>
        </dependency>

主要提醒:强烈推荐使用ConnectionStateListener监控连接的情况,当连接处境为LOST,curator-recipes下的兼具Api将会失灵或许逾期,即使前面全部的事例都没有应用到ConnectionStateListener。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和方面的行列类似,唯独可以为队列中的每一个因素设置二个ID
能够因此ID把队列中随意的因素移除。 它关系多少个类:

  • QueueBuilder
  • QueueConsumer
  • 公海赌船网址,QueueSerializer
  • DistributedQueue

透过上边方法创造:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在那几个事例中,
某些成分还并未被消费者消费前就移除了,这样顾客不会收下删除的消息。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

Path Cache

Path Cache用来监督1个ZNode的子节点. 当2个子节点扩展, 更新,删除时,
Path Cache会改变它的事态, 会包括最新的子节点,
子节点的数据和意况,而事态的更变将由此PathChildrenCacheListener文告。

实则选拔时会涉及到两个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

通过上面的构造函数成立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想行使cache,必须调用它的start艺术,使用完后调用close方法。
可以安装StartMode来已毕运转的情势,

StartMode有上面二种:

  1. NO智跑MAL:平常初步化。
  2. BUILD_INITIAL_CACHE:在调用start()事先会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache早先化数据后发送一个PathChildrenCache伊夫nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以增添listener监听缓存的扭转。

getCurrentData()方法重临二个List<ChildData>对象,可以遍历全体的子节点。

设置/更新、移除其实是应用client (CuratorFramework)来操作,
不经过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:假如new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将回到null,cache将不会缓存节点数据。

注意:演示中的Thread.sleep(10)可以注释掉,但是注释后事件监听的触发次数会不全,那或者与PathCache的兑现原理有关,无法太过数十四遍的接触事件!

缓存

Zookeeper原生辅助通过注册沃特cher来拓展事件监听,可是开发者必要反复注册(沃特cher只能够单次注册单次使用)。Cache是Curator中对事件监听的卷入,可以看作是对事件监听的本地缓存视图,可以活动为开发者处理反复注册监听。Curator提供了三种沃特cher(Cache)来监听结点的更动。

优先级分布式队列—DistributedPriorityQueue

先期级队列对队列中的成分依据事先级举行排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它关系上面多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

通过builder.buildPriorityQueue(minItemsBeforeRefresh)方法创设。
当优先级队列得到成分增删新闻时,它会中断处理当下的成分队列,然后刷新队列。minItemsBeforeRefresh钦定刷新前当前移动的行列的纤维数量。
紧要安装你的次第可以忍受的不排序的矮小值。

放入队列时必要钦赐优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

有时候你只怕会有错觉,优先级设置并从未起效。那是因为事先级是对于队列积压的要素而言,如若消费速度过快有大概现身在后二个成分入队操作以前前二个因素已经被消费,这种情景下DistributedPriorityQueue会退化为DistributedQueue。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某三个特定的节点。它涉及到下边的八个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:动用cache,依旧要调用它的start()艺术,使用完后调用close()方法。

getCurrentData()将得到节点当前的事态,通过它的事态能够赢得当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)能够注释,不过注释后事件监听的触发次数会不全,那恐怕与NodeCache的贯彻原理有关,不能太过频仍的接触事件!

注意:NodeCache只可以监听1个节点的情况变化。

Path Cache

Path Cache用来监督1个ZNode的子节点. 当2个子节点增添, 更新,删除时,
Path Cache会改变它的状态, 会包罗最新的子节点,
子节点的数量和状态,而事态的更变将透过PathChildrenCacheListener文告。

实则行使时会涉及到四个类:

  • PathChildrenCache
  • PathChildrenCacheEvent
  • PathChildrenCacheListener
  • ChildData

因此上面的构造函数创立Path Cache:

public PathChildrenCache(CuratorFramework client, String path, boolean cacheData)

想行使cache,必须调用它的start格局,使用完后调用close办法。
能够安装StartMode来完毕运维的方式,

StartMode有下边两种:

  1. NOGL450MAL:平时早先化。
  2. BUILD_INITIAL_CACHE:在调用start()后面会调用rebuild()
  3. POST_INITIALIZED_EVENT:
    当Cache开首化数据后发送二个PathChildrenCache伊夫nt.Type#INITIALIZED事件

public void addListener(PathChildrenCacheListener listener)可以增添listener监听缓存的变更。

getCurrentData()主意重返一个List<ChildData>对象,可以遍历全部的子节点。

设置/更新、移除其实是利用client (CuratorFramework)来操作,
不通过PathChildrenCache操作:

public class PathCacheDemo {

    private static final String PATH = "/example/pathCache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        PathChildrenCache cache = new PathChildrenCache(client, PATH, true);
        cache.start();
        PathChildrenCacheListener cacheListener = (client1, event) -> {
            System.out.println("事件类型:" + event.getType());
            if (null != event.getData()) {
                System.out.println("节点数据:" + event.getData().getPath() + " = " + new String(event.getData().getData()));
            }
        };
        cache.getListenable().addListener(cacheListener);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test01", "01".getBytes());
        Thread.sleep(10);
        client.create().creatingParentsIfNeeded().forPath("/example/pathCache/test02", "02".getBytes());
        Thread.sleep(10);
        client.setData().forPath("/example/pathCache/test01", "01_V2".getBytes());
        Thread.sleep(10);
        for (ChildData data : cache.getCurrentData()) {
            System.out.println("getCurrentData:" + data.getPath() + " = " + new String(data.getData()));
        }
        client.delete().forPath("/example/pathCache/test01");
        Thread.sleep(10);
        client.delete().forPath("/example/pathCache/test02");
        Thread.sleep(1000 * 5);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:即使new PathChildrenCache(client, PATH,
true)中的参数cacheData值设置为false,则示例中的event.getData().getData()、data.getData()将赶回null,cache将不会缓存节点数据。

注意:以身作则中的Thread.sleep(10)可以注释掉,不过注释后事件监听的触发次数会不全,那只怕与帕特hCache的兑现原理有关,不大概太过数10回的触及事件!

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不明了你是否熟练。
DistributedDelayQueue也提供了看似的成效, 元素有个delay值,
消费者隔一段时间才能收到元素。 涉及到上边五个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

透过上面的言辞创造:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时可以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离未来的3个光阴距离,
比如20微秒,而是今后的三个时光戳,如 System.currentTimeMillis() + 10秒。
如若delayUntilEpoch的岁月已经过去,音讯会立马被消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

Tree Cache

Tree
Cache可以监控全数树上的拥有节点,类似于PathCache和NodeCache的结缘,紧要涉嫌到上边两个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊夫nt – 触发的事件类
  • ChildData – 节点数据

public class TreeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:在此示例中尚无动用Thread.sleep(10),然则事件触发次数也是常规的。

注意:TreeCache在初叶化(调用start()措施)的时候会回调TreeCacheListener实例二个事TreeCache伊夫nt,而回调的TreeCache伊夫nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有或许导致空指针至极,那里应该积极处理并防止那种状态。

Node Cache

Node Cache与Path Cache类似,Node
Cache只是监听某三个一定的节点。它涉及到下边的两个类:

  • NodeCache – Node Cache实现类
  • NodeCacheListener – 节点监听器
  • ChildData – 节点数据

注意:行使cache,依旧要调用它的start()方式,使用完后调用close()方法。

getCurrentData()将拿到节点当前的事态,通过它的情形可以博得当前的值。

public class NodeCacheDemo {

    private static final String PATH = "/example/cache";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        final NodeCache cache = new NodeCache(client, PATH);
        NodeCacheListener listener = () -> {
            ChildData data = cache.getCurrentData();
            if (null != data) {
                System.out.println("节点数据:" + new String(cache.getCurrentData().getData()));
            } else {
                System.out.println("节点被删除!");
            }
        };
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
}

注意:示范中的Thread.sleep(10)可以注释,可是注释后事件监听的触发次数会不全,那或许与NodeCache的兑现原理有关,不可以太过数十次的触及事件!

注意:NodeCache只可以监听四个节点的景况变化。

SimpleDistributedQueue

后边即使落成了各个队列,但是你放在心上到没有,这一个队列并没有完结类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(可是尚未落到实处Queue接口)。
创制很粗略:

public SimpleDistributedQueue(CuratorFramework client,String path)

充实成分:

public boolean offer(byte[] data) throws Exception

删除元素:

public byte[] take() throws Exception

其余还提供了任何方式:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take办法在功成名就重临以前会被卡住。
poll措施在队列为空时直接重回null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

然则实际发送了100条音讯,消费完第②条之后,后边的音信不能消费,近来没找到原因。查看一下法定文档推荐的demo使用上面多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

但是其实利用发现依然存在消费阻塞难点。

Leader选举

在分布式总计中, leader elections是很主要的1个功效,
这么些公投进度是那样子的: 指派多少个进度作为协会者,将义务分发给各节点。
在职责发轫前,
哪个节点都不明白何人是leader(领导者)恐怕coordinator(协调者).
当大选算法开首施行后, 每一种节点最后会得到1个唯一的节点作为任务leader.
除此之外,
大选还每每会生出在leader意外宕机的处境下,新的leader要被大选出来。

在zookeeper集群中,leader负责写操作,然后通过Zab商量落到实处follower的共同,leader恐怕follower都得以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前者是拥有存活的客户端不间断的轮流做Leader,锦州社会。后者是如若大选出Leader,除非有客户端挂掉重新触发大选,否则不会交出领导权。某党?

Tree Cache

Tree
Cache可以监控整个树上的持有节点,类似于PathCache和NodeCache的构成,紧要涉嫌到下边七个类:

  • TreeCache – Tree Cache实现类
  • TreeCacheListener – 监听器类
  • TreeCache伊夫nt – 触发的风云类
  • ChildData – 节点数据

    public class TreeCacheDemo {

    private static final String PATH = "/example/cache";
    
    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        client.start();
        client.create().creatingParentsIfNeeded().forPath(PATH);
        TreeCache cache = new TreeCache(client, PATH);
        TreeCacheListener listener = (client1, event) ->
                System.out.println("事件类型:" + event.getType() +
                        " | 路径:" + (null != event.getData() ? event.getData().getPath() : null));
        cache.getListenable().addListener(listener);
        cache.start();
        client.setData().forPath(PATH, "01".getBytes());
        Thread.sleep(100);
        client.setData().forPath(PATH, "02".getBytes());
        Thread.sleep(100);
        client.delete().deletingChildrenIfNeeded().forPath(PATH);
        Thread.sleep(1000 * 2);
        cache.close();
        client.close();
        System.out.println("OK!");
    }
    

    }

注意:在此示例中没有应用Thread.sleep(10),但是事件触发次数也是常规的。

注意:TreeCache在早先化(调用start()艺术)的时候会回调TreeCacheListener实例2个事TreeCache伊芙nt,而回调的TreeCache伊芙nt对象的Type为INITIALIZED,ChildData为null,此时event.getData().getPath()很有只怕引致空指针极度,这里应该主动处理并避免那种气象。

分布式屏障—Barrier

分布式Barrier是如此三个类:
它会阻塞全体节点上的等待历程,直到某三个被满意,
然后拥有的节点继续开展。

比如说赛马比赛前, 等赛马陆续驶来起跑线前。
一声令下,全数的赛马都飞奔而出。

LeaderLatch

LeaderLatch有多少个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

假使运维,LeaderLatch会和其它使用相同latch
path的其余LeaderLatch交涉,然后里面3个结尾会被推举为leader,可以透过hasLeadership艺术查看LeaderLatch实例是或不是leader:

leaderLatch.hasLeadership( ); //重临true表达当前实例是leader

好像JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),一旦不拔取LeaderLatch了,必须调用close主意。
假使它是leader,会释放leadership, 其他的参预者将会大选七个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

老大处理:
LeaderLatch实例可以增添ConnectionStateListener来监听网络连接难点。 当
SUSPENDED 或 LOST 时,
leader不再认为自个儿大概leader。当LOST后连连重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再行创设二个。LeaderLatch用户必须考虑导致leadership丢失的一而再难题。
强烈推荐你使用ConnectionStateListener。

三个LeaderLatch的选用例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的依赖方便进行测试,不须求运转真实的zookeeper服务端:

        <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

第三我们创立了十个LeaderLatch,运行后它们中的一个会被大选为leader。
因为公投会花费一些时日,start后并无法立即就拿到leader。
通过hasLeadership查阅本人是不是是leader, 若是是的话重临true。
可以由此.getLeader().getId()能够拿到当前的leader的ID。
只可以因而close放出当前的政权。
await是1个不通方法, 尝试获取leader地位,不过未必能上位。

Leader选举

在分布式计算中, leader elections是很首要的2个功用,
这些公投进程是那样子的: 指派二个进度作为协会者,将职分分发给各节点。
在职务初阶前,
哪个节点都不知道什么人是leader(领导者)或然coordinator(协调者).
当公投算法初始推行后, 每一种节点最后会获取二个唯一的节点作为天职leader.
除此之外,
大选还时不时会时有发生在leader意外宕机的气象下,新的leader要被大选出来。

在zookeeper集群中,leader负责写操作,然后经过Zab切磋落到实处follower的协同,leader或许follower都可以拍卖读操作。

Curator
有两种leader选举的recipe,分别是LeaderSelectorLeaderLatch

前端是具有存活的客户端不间断的更替做Leader,佳木斯社会。后者是如若大选出Leader,除非有客户端挂掉重新触发公投,否则不会交出领导权。某党?

DistributedBarrier

DistributedBarrier类已毕了栅栏的效果。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要安装栅栏,它将阻塞在它上边等待的线程:

setBarrier();

下一场需求阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当条件满意时,移除栅栏,全体等待的线程将继续执行:

removeBarrier();

万分处理: DistributedBarrier
会监控连接情况,当连接断掉时waitOnBarrier()方法会抛出尤其。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

这些事例创造了controlBarrier来安装栅栏和移除栅栏。
大家成立了多少个线程,在此Barrier上等待。
最终移除栅栏后有着的线程才继续执行。

即便你从头不安装栅栏,全数的线程就不会阻塞住。

LeaderSelector

LeaderSelector使用的时候根本涉嫌上边多少个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

宗旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运营,当实例取得领导权时您的listener的takeLeadership()主意被调用。而takeLeadership()方法唯有领导权被放出时才回来。
当你不再动用LeaderSelector实例时,应该调用它的close方法。

很是处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接处境的更动。如若实例成为leader,
它应当响应SUSPENDED 或 LOST。 当 SUSPENDED 状态现身时,
实例必须假定在再度连接成功此前它或然不再是leader了。 若是LOST状态出现,
实例不再是leader, takeLeadership方法再次来到。

重要: 推荐处理格局是当接受SUSPENDED 或
LOST时抛出CancelLeadershipException很是.。那会招致LeaderSelector实例中断并裁撤执行takeLeadership方法的非常.。那极度关键,
你必须考虑增添LeaderSelectorListener艾达pter.
LeaderSelectorListenerAdapter提供了推介的处理逻辑。

上面的一个例子摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

你可以在takeLeadership举办职责的分配等等,并且毫不回来,假诺你想要要此实例一向是leader的话可以加3个死循环。调用
leaderSelector.autoRequeue();管教在此实例释放领导权之后还大概拿到领导权。
在此地大家应用AtomicInteger来记录此client拿到领导权的次数, 它是”fair”,
各个client有同一的时机获取领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

比较可以,LeaderLatch必须调用close()措施才会释放领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权进行支配,
在恰当的时候释放领导权,那样各样节点都有或者拿到领导权。从而,LeaderSelector具有更好的灵活性和可控性,指出有LeaderElection应用场景下优先使用LeaderSelector。

LeaderLatch

LeaderLatch有五个构造函数:

public LeaderLatch(CuratorFramework client, String latchPath)
public LeaderLatch(CuratorFramework client, String latchPath,  String id)

LeaderLatch的启动:

leaderLatch.start( );

若是运转,LeaderLatch会和其余使用相同latch
path的其余LeaderLatch交涉,然后里面多少个结尾会被推举为leader,可以透过hasLeadership艺术查看LeaderLatch实例是或不是leader:

leaderLatch.hasLeadership( ); //再次来到true说明当前实例是leader

好像JDK的CountDownLatch,
LeaderLatch在呼吁成为leadership会block(阻塞),一旦不拔取LeaderLatch了,必须调用close主意。
即使它是leader,会释放leadership, 其余的参与者将会公投贰个leader。

public void await() throws InterruptedException,EOFException
/*Causes the current thread to wait until this instance acquires leadership
unless the thread is interrupted or closed.*/
public boolean await(long timeout,TimeUnit unit)throws InterruptedException

不行处理:
LeaderLatch实例可以增添ConnectionStateListener来监听互联网连接难题。 当
SUSPENDED 或 LOST 时,
leader不再认为自身如故leader。当LOST后连连重连后RECONNECTED,LeaderLatch会删除先前的ZNode然后再次创立三个。LeaderLatch用户必须考虑导致leadership丢失的连日难题。
强烈推荐你使用ConnectionStateListener。

三个LeaderLatch的选取例子:

public class LeaderLatchDemo extends BaseConnectionInfo {
    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderLatch> examples = Lists.newArrayList();
        TestingServer server=new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderLatch latch = new LeaderLatch(client, PATH, "Client #" + i);
                latch.addListener(new LeaderLatchListener() {

                    @Override
                    public void isLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am Leader");
                    }

                    @Override
                    public void notLeader() {
                        // TODO Auto-generated method stub
                        System.out.println("I am not Leader");
                    }
                });
                examples.add(latch);
                client.start();
                latch.start();
            }
            Thread.sleep(10000);
            LeaderLatch currentLeader = null;
            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
            currentLeader.close();

            Thread.sleep(5000);

            for (LeaderLatch latch : examples) {
                if (latch.hasLeadership()) {
                    currentLeader = latch;
                }
            }
            System.out.println("current leader is " + currentLeader.getId());
            System.out.println("release the leader " + currentLeader.getId());
        } finally {
            for (LeaderLatch latch : examples) {
                if (null != latch.getState())
                CloseableUtils.closeQuietly(latch);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
        }
    }
}

可以添加test module的借助方便进行测试,不要求运行真实的zookeeper服务端:

 <dependency>
            <groupId>org.apache.curator</groupId>
            <artifactId>curator-test</artifactId>
            <version>2.12.0</version>
        </dependency>

第二我们创设了拾一个LeaderLatch,运行后它们中的3个会被公推为leader。
因为大选会用度一些光阴,start后并不只怕立刻就收获leader。
通过hasLeadership翻看本身是不是是leader, 假若是的话再次来到true。
可以透过.getLeader().getId()能够取得当前的leader的ID。
只好透过close出狱当前的领导权。
await是2个绿灯方法, 尝试获取leader地位,可是未必能上位。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的初叶和得了时手拉手。当丰裕的长河进入到双栅栏时,进度开首计算,
当计算完结时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()主意被调用时,成员被堵塞,直到全数的成员都调用了enter()
leave()格局被调用时,它也不通调用线程,直到全部的分子都调用了leave()
似乎百米赛跑比赛, 发令枪响,
全体的健儿先导跑,等富有的运动员跑过极端线,竞技才甘休。

DistributedDoubleBarrier会监控连接景况,当连接断掉时enter()leave()方法会抛出十二分。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

分布式锁

提醒:

1.推介应用ConnectionStateListener监控连接的地方,因为当连接LOST时你不再具有锁

2.分布式的锁全局同步,
那象征任何2个时日点不会有八个客户端都拥有同样的锁。

LeaderSelector

LeaderSelector使用的时候根本涉及下边几个类:

  • LeaderSelector
  • LeaderSelectorListener
  • LeaderSelectorListenerAdapter
  • CancelLeadershipException

宗旨类是LeaderSelector,它的构造函数如下:

public LeaderSelector(CuratorFramework client, String mutexPath,LeaderSelectorListener listener)
public LeaderSelector(CuratorFramework client, String mutexPath, ThreadFactory threadFactory, Executor executor, LeaderSelectorListener listener)

类似LeaderLatch,LeaderSelector必须start: leaderSelector.start();
一旦运维,当实例取得领导权时您的listener的takeLeadership()方法被调用。而takeLeadership()方法只有领导权被保释时才回到。
当你不再动用LeaderSelector实例时,应该调用它的close方法。

那多少个处理
LeaderSelectorListener类继承ConnectionStateListener。LeaderSelector必须小心连接景况的改观。若是实例成为leader,
它应有响应SUSPENDED 或 LOST。 当 SUSPENDED 状态出现时,
实例必须假定在重复连接成功此前它只怕不再是leader了。 借使LOST状态出现,
实例不再是leader, takeLeadership方法重返。

重要: 推荐处理格局是当收到SUSPENDED 或
LOST时抛出CancelLeadershipException至极.。那会造成LeaderSelector实例中断并撤回执行takeLeadership方法的分外.。那特别主要,
你必须考虑增添LeaderSelectorListenerAdapter.
LeaderSelectorListenerAdapter提供了推介的拍卖逻辑。

上边的一个例证摘抄自官方:

public class LeaderSelectorAdapter extends LeaderSelectorListenerAdapter implements Closeable {
    private final String name;
    private final LeaderSelector leaderSelector;
    private final AtomicInteger leaderCount = new AtomicInteger();

    public LeaderSelectorAdapter(CuratorFramework client, String path, String name) {
        this.name = name;
        leaderSelector = new LeaderSelector(client, path, this);
        leaderSelector.autoRequeue();
    }

    public void start() throws IOException {
        leaderSelector.start();
    }

    @Override
    public void close() throws IOException {
        leaderSelector.close();
    }

    @Override
    public void takeLeadership(CuratorFramework client) throws Exception {
        final int waitSeconds = (int) (5 * Math.random()) + 1;
        System.out.println(name + " is now the leader. Waiting " + waitSeconds + " seconds...");
        System.out.println(name + " has been leader " + leaderCount.getAndIncrement() + " time(s) before.");
        try {
            Thread.sleep(TimeUnit.SECONDS.toMillis(waitSeconds));
        } catch (InterruptedException e) {
            System.err.println(name + " was interrupted.");
            Thread.currentThread().interrupt();
        } finally {
            System.out.println(name + " relinquishing leadership.\n");
        }
    }
}

您可以在takeLeadership进行义务的分配等等,并且永不回来,如果您想要要此实例一直是leader的话可以加多个死循环。调用
leaderSelector.autoRequeue();保证在此实例释放领导权之后还只怕赢得领导权。
在此处大家使用AtomicInteger来记录此client拿到领导权的次数, 它是”fair”,
各个client有相同的火候获取领导权。

public class LeaderSelectorDemo {

    protected static String PATH = "/francis/leader";
    private static final int CLIENT_QTY = 10;


    public static void main(String[] args) throws Exception {
        List<CuratorFramework> clients = Lists.newArrayList();
        List<LeaderSelectorAdapter> examples = Lists.newArrayList();
        TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < CLIENT_QTY; i++) {
                CuratorFramework client
                        = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(20000, 3));
                clients.add(client);
                LeaderSelectorAdapter selectorAdapter = new LeaderSelectorAdapter(client, PATH, "Client #" + i);
                examples.add(selectorAdapter);
                client.start();
                selectorAdapter.start();
            }
            System.out.println("Press enter/return to quit\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } finally {
            System.out.println("Shutting down...");
            for (LeaderSelectorAdapter exampleClient : examples) {
                CloseableUtils.closeQuietly(exampleClient);
            }
            for (CuratorFramework client : clients) {
                CloseableUtils.closeQuietly(client);
            }
            CloseableUtils.closeQuietly(server);
        }
    }
}

比较可以,LeaderLatch必须调用close()办法才会自由领导权,而对于LeaderSelector,通过LeaderSelectorListener可以对领导权举行控制,
在万分的时候释放领导权,这样种种节点都有恐怕获取领导权。从而,LeaderSelector具有更好的灵活性和可控性,指出有LeaderElection应用场景下优先采用LeaderSelector。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都可以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同多个客户端在享有锁的还要,可以频仍收获,不会被封堵。
它是由类InterProcessMutex来兑现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()取得锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()艺术释放锁。 InterProcessMutex 实例可以选拔。

Revoking ZooKeeper recipes wiki定义了可商榷的吊销机制。
为了取消mutex, 调用上面的方法:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

一旦您请求裁撤当前的锁,
调用attemptRevoke()艺术,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

一遍提示:错误处理
仍旧强烈推荐你拔取ConnectionStateListener处理连接情形的改动。
当连接LOST时您不再持有锁。

先是让大家成立2个模仿的共享财富,
那几个财富期望只可以单线程的拜访,否则会有出现难题。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来创立壹个InterProcessMutexDemo类, 它承受请求锁,
使用财富,释放锁那样二个全部的造访进程。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简短,生成拾一个client, 每一个client重复执行拾回请求锁–访问财富–释放锁的进度。各个client都在单独的线程中。
结果可以见见,锁是专断的被各个实例排他性的选取。

既然如此是可采取的,你可以在贰个线程中反复调用acquire(),在线程拥有锁时它连接回到true。

你不该在三个线程中用同2个InterProcessMutex
你可以在各样线程中都生成3个新的InterProcessMutex实例,它们的path都如出一辙,那样它们可以共享同三个锁。

分布式锁

提醒:

1.推介应用ConnectionStateListener监控连接的情形,因为当连接LOST时你不再具备锁

2.分布式的锁全局同步,
那代表任何1个时光点不会有三个客户端都拥有一致的锁。

不可重入共享锁—Shared Lock

以此锁和上边的InterProcessMutex对照,就是少了Reentrant的功用,也就代表它不或许在同3个线程中重入。这一个类是InterProcessSemaphoreMutex,使用格局和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运营后发现,有且唯有三个client成功收获第②个锁(第3个acquire()格局再次回到true),然后它自身过不去在第一个acquire()主意,获取第②个锁超时;其余兼具的客户端都阻塞在率先个acquire()方法超时并且抛出尤其。

如此也就证实了InterProcessSemaphoreMutex落到实处的锁是不行重入的。

可重入共享锁—Shared Reentrant Lock

Shared意味着锁是全局可知的, 客户端都足以请求锁。
Reentrant和JDK的ReentrantLock类似,即可重入,
意味着同2个客户端在装有锁的同时,可以屡屡取得,不会被打断。
它是由类InterProcessMutex来兑现。 它的构造函数为:

public InterProcessMutex(CuratorFramework client, String path)

通过acquire()得到锁,并提供超时机制:

public void acquire()
Acquire the mutex - blocking until it's available. Note: the same thread can call acquire
re-entrantly. Each call to acquire must be balanced by a call to release()

public boolean acquire(long time,TimeUnit unit)
Acquire the mutex - blocks until it's available or the given time expires. Note: the same thread can call acquire re-entrantly. Each call to acquire that returns true must be balanced by a call to release()

Parameters:
time - time to wait
unit - time unit
Returns:
true if the mutex was acquired, false if not

通过release()艺术释放锁。 InterProcessMutex 实例可以接纳。

Revoking ZooKeeper recipes wiki定义了可商榷的废除机制。
为了撤除mutex, 调用上边的章程:

public void makeRevocable(RevocationListener<T> listener)
将锁设为可撤销的. 当别的进程或线程想让你释放锁时Listener会被调用。
Parameters:
listener - the listener

若是您请求裁撤当前的锁,
调用attemptRevoke()办法,注意锁释放时RevocationListener将会回调。

public static void attemptRevoke(CuratorFramework client,String path) throws Exception
Utility to mark a lock for revocation. Assuming that the lock has been registered
with a RevocationListener, it will get called and the lock should be released. Note,
however, that revocation is cooperative.
Parameters:
client - the client
path - the path of the lock - usually from something like InterProcessMutex.getParticipantNodes()

2遍提示:错误处理
如故强烈推荐你使用ConnectionStateListener拍卖连接情状的改变。
当连接LOST时您不再具备锁。

首先让我们创造一个效仿的共享能源,
这么些能源期望只好单线程的造访,否则会有现身难点。

public class FakeLimitedResource {
    private final AtomicBoolean inUse = new AtomicBoolean(false);

    public void use() throws InterruptedException {
        // 真实环境中我们会在这里访问/维护一个共享的资源
        //这个例子在使用锁的情况下不会非法并发异常IllegalStateException
        //但是在无锁的情况由于sleep了一段时间,很容易抛出异常
        if (!inUse.compareAndSet(false, true)) {
            throw new IllegalStateException("Needs to be used by one client at a time");
        }
        try {
            Thread.sleep((long) (3 * Math.random()));
        } finally {
            inUse.set(false);
        }
    }
}

接下来成立1个InterProcessMutexDemo类, 它负责请求锁,
使用财富,释放锁这样一个完好无缺的访问进度。

public class InterProcessMutexDemo {

    private InterProcessMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " could not acquire the lock");
        }
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessMutexDemo example = new InterProcessMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

代码也很简短,生成十二个client, 每一种client重复执行拾肆次请求锁–访问财富–释放锁的进度。每一种client都在单独的线程中。
结果可以看看,锁是随机的被各个实例排他性的运用。

既然如此是可接纳的,你可以在壹个线程中频仍调用acquire(),在线程拥有锁时它连接回到true。

你不应有在多少个线程中用同壹个InterProcessMutex
你可以在各种线程中都生成二个新的InterProcessMutex实例,它们的path都一模一样,那样它们能够共享同三个锁。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。三个读写锁管理一对有关的锁。1个担当读操作,其它贰个担当写操作。读操作在写锁没被拔取时可同时由三个进度使用,而写锁在接纳时不容许读(阻塞)。

此锁是可重入的。一个装有写锁的线程可重入读锁,可是读锁却不只怕进来写锁。那也象征写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是11分的。

可重入读写锁紧要由多少个类落成:InterProcessReadWriteLockInterProcessMutex。使用时首先创立二个InterProcessReadWriteLock实例,然后再依照你的须求得到读锁恐怕写锁,读写锁的门类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

不得重入共享锁—Shared Lock

本条锁和地点的InterProcessMutex对照,就是少了Reentrant的功能,也就代表它无法在同三个线程中重入。这么些类是InterProcessSemaphoreMutex,使用格局和InterProcessMutex类似

public class InterProcessSemaphoreMutexDemo {

    private InterProcessSemaphoreMutex lock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public InterProcessSemaphoreMutexDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        this.lock = new InterProcessSemaphoreMutex(client, lockPath);
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 已获取到互斥锁");
        if (!lock.acquire(time, unit))
        {
            throw new IllegalStateException(clientName + " 不能得到互斥锁");
        }
        System.out.println(clientName + " 再次获取到互斥锁");
        try {
            System.out.println(clientName + " get the lock");
            resource.use(); //access resource exclusively
        } finally {
            System.out.println(clientName + " releasing the lock");
            lock.release(); // always release the lock in a finally block
            lock.release(); // 获取锁几次 释放锁也要几次
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY * 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final InterProcessSemaphoreMutexDemo example = new InterProcessSemaphoreMutexDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
        Thread.sleep(Integer.MAX_VALUE);
    }
}

运营后发现,有且唯有2个client成功博得第①个锁(首个acquire()方式重回true),然后它自身过不去在第①个acquire()主意,获取第③个锁超时;其余兼具的客户端都阻塞在率先个acquire()格局超时并且抛出特别。

这么也就表明了InterProcessSemaphoreMutex一帆风顺的锁是不行重入的。

信号量—Shared Semaphore

一个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组承认(permits),而Curator中称之为租约(Lease)。
有二种方式得以控制semaphore的最大租约数。第叁种方法是用户给定path并且指定最大LeaseSize。第壹种办法用户给定path并且动用SharedCountReader类。即使不拔取SharedCountReader,
必须确保拥有实例在多进程中运用同样的(最大)租约数量,否则有大概出现A进程中的实例持有最大租约数量为10,但是在B进程中兼有的最大租约数量为20,此时租约的含义就失效了。

本次调用acquire()会回到3个租约对象。
客户端必须在finally中close那个租约对象,否则那些租约会丢失掉。 可是,
不过,即使客户端session由于某种原因比如crash丢掉,
那么那个客户端持有的租约会自动close,
那样任何客户端可以持续采用这一个租约。 租约还足以透过上面的不二法门返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

在意你可以一次性请求多个租约,如果Semaphore当前的租约不够,则请求线程会被打断。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的主要类包蕴下边几个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {

            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");

            resource.use();

            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);

            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
}

率先大家先拿到了两个租约, 最终大家把它还给了semaphore。
接着请求了3个租约,因为semaphore还有多少个租约,所以恳请能够满意,重回一个租约,还剩5个租约。
然后再请求三个租约,因为租约不够,卡住到过期,照旧没能满意,重返结果为null(租约不足会阻塞到过期,然后回来null,不会再接再砺抛出尤其;假诺不安装超时时间,会雷同阻塞)。

地方说讲的锁都以公平锁(fair)。 总ZooKeeper的角度看,
每一种客户端都根据请求的顺序得到锁,不存在非公平的抢占的场馆。

可重入读写锁—Shared Reentrant Read Write Lock

类似JDK的ReentrantReadWriteLock。多个读写锁管理一对相关的锁。贰个承受读操作,其它二个承受写操作。读操作在写锁没被采用时可同时由多少个经过使用,而写锁在动用时不容许读(阻塞)。

此锁是可重入的。二个享有写锁的线程可重入读锁,然而读锁却不可以进来写锁。那也象征写锁可以降级成读锁,
比如请求写锁 —>请求读锁—>释放读锁
—->释放写锁
。从读锁升级成写锁是可怜的。

可重入读写锁紧要由多个类完毕:InterProcessReadWriteLockInterProcessMutex。使用时首先创造3个InterProcessReadWriteLock实例,然后再按照你的要求得到读锁或然写锁,读写锁的门类是InterProcessMutex

public class ReentrantReadWriteLockDemo {

    private final InterProcessReadWriteLock lock;
    private final InterProcessMutex readLock;
    private final InterProcessMutex writeLock;
    private final FakeLimitedResource resource;
    private final String clientName;

    public ReentrantReadWriteLockDemo(CuratorFramework client, String lockPath, FakeLimitedResource resource, String clientName) {
        this.resource = resource;
        this.clientName = clientName;
        lock = new InterProcessReadWriteLock(client, lockPath);
        readLock = lock.readLock();
        writeLock = lock.writeLock();
    }

    public void doWork(long time, TimeUnit unit) throws Exception {
        // 注意只能先得到写锁再得到读锁,不能反过来!!!
        if (!writeLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到写锁");
        }
        System.out.println(clientName + " 已得到写锁");
        if (!readLock.acquire(time, unit)) {
            throw new IllegalStateException(clientName + " 不能得到读锁");
        }
        System.out.println(clientName + " 已得到读锁");
        try {
            resource.use(); // 使用资源
            Thread.sleep(1000);
        } finally {
            System.out.println(clientName + " 释放读写锁");
            readLock.release();
            writeLock.release();
        }
    }

    private static final int QTY = 5;
    private static final int REPETITIONS = QTY ;
    private static final String PATH = "/examples/locks";

    public static void main(String[] args) throws Exception {
        final FakeLimitedResource resource = new FakeLimitedResource();
        ExecutorService service = Executors.newFixedThreadPool(QTY);
        final TestingServer server = new TestingServer();
        try {
            for (int i = 0; i < QTY; ++i) {
                final int index = i;
                Callable<Void> task = new Callable<Void>() {
                    @Override
                    public Void call() throws Exception {
                        CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
                        try {
                            client.start();
                            final ReentrantReadWriteLockDemo example = new ReentrantReadWriteLockDemo(client, PATH, resource, "Client " + index);
                            for (int j = 0; j < REPETITIONS; ++j) {
                                example.doWork(10, TimeUnit.SECONDS);
                            }
                        } catch (Throwable e) {
                            e.printStackTrace();
                        } finally {
                            CloseableUtils.closeQuietly(client);
                        }
                        return null;
                    }
                };
                service.submit(task);
            }
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
        } finally {
            CloseableUtils.closeQuietly(server);
        }
    }
}

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是2个锁的器皿。 当调用acquire()
全体的锁都会被acquire(),若是请求退步,全部的锁都会被release。
同样调用release时享有的锁都被release(挫折被忽视)。
基本上,它就是组锁的表示,在它上面的伸手释放操作都会传递给它包罗的具有的锁。

根本涉嫌两个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数须求包括的锁的汇集,或许一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

新建二个InterProcessMultiLock, 包罗一个重入锁和二个非重入锁。
调用acquire()后可以看到线程同时具备了那多少个锁。
调用release()见到那三个锁都被假释了。

最后再重复四次,
强烈推荐使用ConnectionStateListener监控连接的动静,当连接处境为LOST,锁将会丢掉。

信号量—Shared Semaphore

1个计数的信号量类似JDK的Semaphore。
JDK中Semaphore维护的一组认同(permits),而Curator中称之为租约(Lease)。
有二种办法得以控制semaphore的最大租约数。第三种形式是用户给定path并且钦赐最大LeaseSize。第1种方法用户给定path并且使用SharedCountReader类。假诺不选取SharedCountReader,
必须有限支撑拥有实例在多进程中采纳同样的(最大)租约数量,否则有大概出现A进程中的实例持有最大租约数量为10,可是在B进度中具备的最大租约数量为20,此时租约的含义就失效了。

这一次调用acquire()会回去二个租约对象。
客户端必须在finally中close这么些租约对象,否则那个租约会丢失掉。 但是,
可是,假设客户端session由于某种原因比如crash丢掉,
那么这一个客户端持有的租约会自动close,
这样任何客户端可以三番五次行使那一个租约。 租约还是能够通过上面的办法返还:

public void returnAll(Collection<Lease> leases)
public void returnLease(Lease lease)

瞩目你可以一回性请求七个租约,借使Semaphore当前的租约不够,则呼吁线程会被卡住。
同时还提供了晚点的重载方法。

public Lease acquire()
public Collection<Lease> acquire(int qty)
public Lease acquire(long time, TimeUnit unit)
public Collection<Lease> acquire(int qty, long time, TimeUnit unit)

Shared Semaphore使用的重中之重类包涵下边多少个:

  • InterProcessSemaphoreV2
  • Lease
  • SharedCountReader

    public class InterProcessSemaphoreDemo {

    private static final int MAX_LEASE = 10;
    private static final String PATH = "/examples/locks";
    
    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
    
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
    
            InterProcessSemaphoreV2 semaphore = new InterProcessSemaphoreV2(client, PATH, MAX_LEASE);
            Collection<Lease> leases = semaphore.acquire(5);
            System.out.println("get " + leases.size() + " leases");
            Lease lease = semaphore.acquire();
            System.out.println("get another lease");
    
            resource.use();
    
            Collection<Lease> leases2 = semaphore.acquire(5, 10, TimeUnit.SECONDS);
            System.out.println("Should timeout and acquire return " + leases2);
    
            System.out.println("return one lease");
            semaphore.returnLease(lease);
            System.out.println("return another 5 leases");
            semaphore.returnAll(leases);
        }
    }
    

    }

首先大家先拿到了三个租约, 最终大家把它还给了semaphore。
接着请求了贰个租约,因为semaphore还有伍个租约,所以恳请可以满足,再次来到二个租约,还剩肆个租约。
然后再请求多个租约,因为租约不够,卡住到过期,如故没能满意,重回结果为null(租约不足会阻塞到过期,然后回来null,不会主动抛出很是;若是不安装超时时间,会雷同阻塞)。

地点说讲的锁皆以天公地道锁(fair)。 总ZooKeeper的角度看,
每种客户端都依照请求的一一拿到锁,不设有非公平的抢占的情况。

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以兑现一个集群共享的计数器。
只要利用同样的path就足以拿走最新的计数器值,
那是由ZooKeeper的一致性保险的。Curator有七个计数器,
二个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

多共享锁对象 —Multi Shared Lock

Multi Shared Lock是二个锁的容器。 当调用acquire()
全体的锁都会被acquire(),若是请求退步,全数的锁都会被release。
同样调用release时具有的锁都被release(未果被忽略)。
基本上,它就是组锁的象征,在它上边的伸手释放操作都会传递给它包涵的有所的锁。

根本涉嫌多少个类:

  • InterProcessMultiLock
  • InterProcessLock

它的构造函数需求包涵的锁的聚众,或然一组ZooKeeper的path。

public InterProcessMultiLock(List<InterProcessLock> locks)
public InterProcessMultiLock(CuratorFramework client, List<String> paths)

用法和Shared Lock相同。

public class MultiSharedLockDemo {

    private static final String PATH1 = "/examples/locks1";
    private static final String PATH2 = "/examples/locks2";

    public static void main(String[] args) throws Exception {
        FakeLimitedResource resource = new FakeLimitedResource();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            InterProcessLock lock1 = new InterProcessMutex(client, PATH1);
            InterProcessLock lock2 = new InterProcessSemaphoreMutex(client, PATH2);

            InterProcessMultiLock lock = new InterProcessMultiLock(Arrays.asList(lock1, lock2));

            if (!lock.acquire(10, TimeUnit.SECONDS)) {
                throw new IllegalStateException("could not acquire the lock");
            }
            System.out.println("has got all lock");

            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());

            try {
                resource.use(); //access resource exclusively
            } finally {
                System.out.println("releasing the lock");
                lock.release(); // always release the lock in a finally block
            }
            System.out.println("has got lock1: " + lock1.isAcquiredInThisProcess());
            System.out.println("has got lock2: " + lock2.isAcquiredInThisProcess());
        }
    }
}

 

新建三个InterProcessMultiLock, 包括一个重入锁和二个非重入锁。
调用acquire()后方可看出线程同时负有了那五个锁。
调用release()看到那七个锁都被释放了。

最终再重蹈覆辙一遍,
强烈推荐使用ConnectionStateListener监控连接的动静,当连接情况为LOST,锁将会丢掉。

分布式int计数器—SharedCount

本条类应用int类型来计数。 主要涉及多个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount代表计数器,
可以为它扩大多个SharedCountListener,当计数器改变时此Listener可以监听到改变的轩然大波,而SharedCountReader可以读取到新型的值,
包蕴字面值和带版本音讯的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在那些例子中,大家利用baseCount来监听计数值(addListener办法来添加SharedCountListener
)。 任意的SharedCount, 只要采用同一的path,都可以拿走这几个计数值。
然后我们运用六个线程为计数值伸张二个10以内的随机数。相同的path的SharedCount对计数值举办转移,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此处我们接纳trySetCount去设置计数器。
第②个参数提供当前的VersionedValue,即使中间其他client更新了此计数值,
你的更新恐怕不成事,
然则此时你的client更新了新式的值,所以失利了您可以尝尝再更新一次。
setCount是挟持更新计数器的值

留神计数器必须start,使用完事后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式计数器

顾名思义,计数器是用来计数的,
利用ZooKeeper可以兑现一个集群共享的计数器。
只要使用相同的path就可以取得最新的计数器值,
那是由ZooKeeper的一致性保险的。Curator有五个计数器,
一个是用int来计数(SharedCount),一个用long来计数(DistributedAtomicLong)。

分布式long计数器—DistributedAtomicLong

再看二个Long类型的计数器。 除了计数的范围比SharedCount大了之外,
它首先尝试选取乐观锁的办法设置计数器,
假使不成事(比如时期计数器已经被其他client更新了),
它采取InterProcessMutex措施来更新计数值。

可以从它的中间贯彻DistributedAtomicValue.trySet()中看出:

   AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一多重的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 伸张一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须反省重回结果的succeeded(), 它代表此操作是不是中标。
如果操作成功, preValue()意味着操作前的值,
postValue()意味着操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式int计数器—SharedCount

本条类应用int类型来计数。 主要涉及两个类。

  • SharedCount
  • SharedCountReader
  • SharedCountListener

SharedCount意味着计数器,
能够为它增添三个SharedCountListener,当计数器改变时此Listener可以监听到改变的风浪,而SharedCountReader可以读取到新型的值,
包蕴字面值和带版本新闻的值VersionedValue。

public class SharedCounterDemo implements SharedCountListener {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        final Random rand = new Random();
        SharedCounterDemo example = new SharedCounterDemo();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();

            SharedCount baseCount = new SharedCount(client, PATH, 0);
            baseCount.addListener(example);
            baseCount.start();

            List<SharedCount> examples = Lists.newArrayList();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final SharedCount count = new SharedCount(client, PATH, 0);
                examples.add(count);
                Callable<Void> task = () -> {
                    count.start();
                    Thread.sleep(rand.nextInt(10000));
                    System.out.println("Increment:" + count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10)));
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            for (int i = 0; i < QTY; ++i) {
                examples.get(i).close();
            }
            baseCount.close();
        }
        Thread.sleep(Integer.MAX_VALUE);
    }

    @Override
    public void stateChanged(CuratorFramework arg0, ConnectionState arg1) {
        System.out.println("State changed: " + arg1.toString());
    }

    @Override
    public void countHasChanged(SharedCountReader sharedCount, int newCount) throws Exception {
        System.out.println("Counter's value is changed to " + newCount);
    }
}

在那几个例子中,大家利用baseCount来监听计数值(addListener办法来添加SharedCountListener
)。 任意的SharedCount, 只要采纳同一的path,都足以博得那几个计数值。
然后我们运用多个线程为计数值扩展1个10以内的随机数。相同的path的SharedCount对计数值举行转移,将会回调给baseCount的SharedCountListener。

count.trySetCount(count.getVersionedValue(), count.getCount() + rand.nextInt(10))

此地我们接纳trySetCount去设置计数器。
第2个参数提供当前的VersionedValue,如果中间此外client更新了此计数值,
你的更新可能不成事,
不过此时你的client更新了新式的值,所以败北了您可以尝尝再更新三回。
setCount是挟持更新计数器的值

注意计数器必须start,使用完事后必须调用close关闭它。

强烈推荐使用ConnectionStateListener
在本例中SharedCountListener扩展ConnectionStateListener

分布式队列

拔取Curator也足以简化Ephemeral Node
(一时半刻节点)的操作。Curator也提供ZK Recipe的分布式队列完结。 利用ZK的
PE宝马X5SISTENTS_EQUENTIAL节点,
可以确保放入到行列中的项目是根据顺序排队的。
即便纯粹的消费者从队列中取数据, 那么它是先入先出的,那也是队列的特点。
即便您严苛必要顺序,你就的行使单一的买主,可以选拔Leader大选只让Leader作为唯一的顾客。

可是, 依据Netflix的Curator小编所说,
ZooKeeper真心不合乎做Queue,或然说ZK没有落到实处一个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传输限制。
    实践中ZNode必须相对较小,而队列包括众多的信息,相当的大。
  2. 比方有好多节点,ZK运维时分外的慢。 而使用queue会导致层见迭出ZNode.
    你要求明确增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创制了壹个专门的主次做那事。
  4. 当很大气的带有众多的子节点的ZNode时, ZK的性格变得不佳
  5. ZK的数据库完全放在内存中。 大批量的Queue意味着会占有很多的内存空间。

虽说, Curator照旧创设了各样Queue的贯彻。
借使Queue的数据量不太多,数据量不太大的景观下,酌情考虑,依然得以行使的。

分布式long计数器—DistributedAtomicLong

再看2个Long类型的计数器。 除了计数的限量比SharedCount大了之外,
它首先尝试使用乐观锁的法子设置计数器,
假设不成事(比如时期计数器已经被其余client更新了),
它利用InterProcessMutex办法来更新计数值。

可以从它的内部贯彻DistributedAtomicValue.trySet()中看出:

 AtomicValue<byte[]>   trySet(MakeValue makeValue) throws Exception
    {
        MutableAtomicValue<byte[]>  result = new MutableAtomicValue<byte[]>(null, null, false);

        tryOptimistic(result, makeValue);
        if ( !result.succeeded() && (mutex != null) )
        {
            tryWithMutex(result, makeValue);
        }

        return result;
    }

此计数器有一多元的操作:

  • get(): 获取当前值
  • increment(): 加一
  • decrement(): 减一
  • add(): 扩大一定的值
  • subtract(): 减去特定的值
  • trySet(): 尝试设置计数值
  • forceSet(): 强制设置计数值

必须检查再次回到结果的succeeded(), 它表示此操作是不是中标。
假使操作成功, preValue()意味着操作前的值,
postValue()代表操作后的值。

public class DistributedAtomicLongDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/counter";

    public static void main(String[] args) throws IOException, Exception {
        List<DistributedAtomicLong> examples = Lists.newArrayList();
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedAtomicLong count = new DistributedAtomicLong(client, PATH, new RetryNTimes(10, 10));

                examples.add(count);
                Callable<Void> task = () -> {
                    try {
                        AtomicValue<Long> value = count.increment();
                        System.out.println("succeed: " + value.succeeded());
                        if (value.succeeded())
                            System.out.println("Increment: from " + value.preValue() + " to " + value.postValue());
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }
}

分布式队列—DistributedQueue

DistributedQueue是最平凡的一种队列。 它安顿以下五个类:

  • QueueBuilder – 创设队列使用QueueBuilder,它也是其它队列的开创类
  • QueueConsumer – 队列中的新闻消费者接口
  • Queue塞里alizer –
    队列新闻种类化和反种类化接口,提供了对队列中的对象的种类化和反体系化
  • DistributedQueue – 队列已毕类

QueueConsumer是消费者,它可以接收队列的数据。处理队列中的数据的代码逻辑可以置身QueueConsumer.consumeMessage()中。

常规意况下先将音讯从队列中移除,再交由消费者消费。但那是多少个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当消费者消费数量时享有锁,那样任何消费者不只怕消费此音信。若是消费失利或然经过死掉,新闻能够交到其余进度。那会带来或多或少性质的损失。最好可能单消费者情势采纳队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了多个分布式队列和三个顾客,因为PATH是平等的,会设有消费者抢占消费音讯的情景。

分布式队列

接纳Curator也可以简化Ephemeral Node
(暂且节点)的操作。Curator也提供ZK Recipe的分布式队列完结。 利用ZK的
PEKugaSISTENTS_EQUENTIAL节点,
可以确保放入到行列中的项目是坚守顺序排队的。
借使纯粹的消费者从队列中取数据, 那么它是先入先出的,那也是队列的风味。
若是您严厉须要顺序,你就的行使单一的买主,可以接纳Leader选举只让Leader作为唯一的顾客。

不过, 依据Netflix的Curator我所说,
ZooKeeper真心不适合做Queue,大概说ZK没有落成1个好的Queue,详细内容可以看
Tech Note
4

原因有五:

  1. ZK有1MB 的传导限制。
    实践中ZNode必须相对较小,而队列包蕴众多的新闻,非凡的大。
  2. 万一有诸多节点,ZK运营时万分的慢。 而使用queue会导致众多ZNode.
    你需求肯定增大 initLimit 和 syncLimit.
  3. ZNode很大的时候很难清理。Netflix不得不创立了一个特意的顺序做那事。
  4. 当很大方的盈盈众多的子节点的ZNode时, ZK的品质变得不好
  5. ZK的数据库完全放在内存中。 多量的Queue意味着会占用很多的内存空间。

即使, Curator依旧创造了各个Queue的落到实处。
假诺Queue的数据量不太多,数据量不太大的场馆下,酌情考虑,还能动用的。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和地方的连串类似,不过足以为队列中的每一个元素设置几个ID
可以经过ID把队列中任意的因素移除。 它涉及多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

通过下边方法创制:

builder.buildIdQueue()

放入元素时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在那么些例子中,
有个别成分还尚未被消费者消费前就移除了,这样消费者不会收到删除的音讯。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

分布式队列—DistributedQueue

DistributedQueue是最平凡的一种队列。 它设计以下多个类:

  • QueueBuilder – 创造队列使用QueueBuilder,它也是其余队列的创造类
  • QueueConsumer – 队列中的信息消费者接口
  • QueueSerializer –
    队列音信种类化和反体系化接口,提供了对队列中的对象的种类化和反系列化
  • DistributedQueue – 队列落成类

QueueConsumer是主顾,它可以接收队列的多少。处理队列中的数据的代码逻辑可以置身QueueConsumer.consumeMessage()中。

正规情状下先将消息从队列中移除,再付诸消费者消费。但这是多个步骤,不是原子的。可以调用Builder的lockPath()消费者加锁,当顾客消费数量时拥有锁,那样任何消费者无法消费此消息。如若消费失利大概经过死掉,音讯可以交到其余进程。那会推动一些性质的损失。最好如故单消费者情势拔取队列。

public class DistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework clientA = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientA.start();
        CuratorFramework clientB = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
        clientB.start();
        DistributedQueue<String> queueA;
        QueueBuilder<String> builderA = QueueBuilder.builder(clientA, createQueueConsumer("A"), createQueueSerializer(), PATH);
        queueA = builderA.buildQueue();
        queueA.start();

        DistributedQueue<String> queueB;
        QueueBuilder<String> builderB = QueueBuilder.builder(clientB, createQueueConsumer("B"), createQueueSerializer(), PATH);
        queueB = builderB.buildQueue();
        queueB.start();
        for (int i = 0; i < 100; i++) {
            queueA.put(" test-A-" + i);
            Thread.sleep(10);
            queueB.put(" test-B-" + i);
        }
        Thread.sleep(1000 * 10);// 等待消息消费完成
        queueB.close();
        queueA.close();
        clientB.close();
        clientA.close();
        System.out.println("OK!");
    }

    /**
     * 队列消息序列化实现类
     */
    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {
            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }
        };
    }

    /**
     * 定义队列消费者
     */
    private static QueueConsumer<String> createQueueConsumer(final String name) {
        return new QueueConsumer<String>() {
            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("连接状态改变: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("消费消息(" + name + "): " + message);
            }
        };
    }
}

事例中定义了多少个分布式队列和多少个顾客,因为PATH是均等的,会设有消费者抢占消费消息的状态。

优先级分布式队列—DistributedPriorityQueue

先期级队列对队列中的成分依照事先级举办排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它事关上边多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

因而builder.buildPriorityQueue(minItemsBeforeRefresh)方法创立。
当优先级队列拿到成分增删信息时,它会打退堂鼓处理当下的因素队列,然后刷新队列。minItemsBeforeRefresh内定刷新前当前运动的队列的蝇头数量。
主要安装你的程序可以容忍的不排序的微乎其微值。

放入队列时索要钦赐优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

突发性你可能会有错觉,优先级设置并没有起效。那是因为事先级是对此队列积压的成分而言,即使消费速度过快有大概出现在后贰个因素入队操作从前前一个要素已经被消费,那种状态下DistributedPriorityQueue会退化为DistributedQueue。

带Id的分布式队列—DistributedIdQueue

DistributedIdQueue和下面的队列类似,不过足以为队列中的每一个成分设置三个ID
可以由此ID把队列中私下的因素移除。 它涉及几个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedQueue

透过上面方法创制:

builder.buildIdQueue()

放入成分时:

queue.put(aMessage, messageId);

移除成分时:

int numberRemoved = queue.remove(messageId);

在这几个事例中,
有个别成分还未曾被消费者消费前就移除了,这样顾客不会接受删除的消息。

public class DistributedIdQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedIdQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildIdQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put(" test-" + i, "Id" + i);
                Thread.sleep((long) (15 * Math.random()));
                queue.remove("Id" + i);
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println("consume one message: " + message);
            }

        };
    }
}

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不知道你是或不是精晓。
DistributedDelayQueue也提供了看似的效果, 成分有个delay值,
消费者隔一段时间才能拔取成分。 涉及到上面两个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

透过下边的口舌创设:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时可以内定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离今后的一个时刻距离,
比如20阿秒,而是未来的二个时日戳,如 System.currentTimeMillis() + 10秒。
借使delayUntilEpoch的时间已经死亡,音讯会立马被消费者收到。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

事先级分布式队列—DistributedPriorityQueue

预先级队列对队列中的成分根据优先级举办排序。 Priority越小,
成分越靠前, 越先被消费掉
。 它关系上面多少个类:

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedPriorityQueue

因此builder.buildPriorityQueue(minItemsBeforeRefresh)方法成立。
当优先级队列拿到成分增删音讯时,它会半途而返处理当下的要素队列,然后刷新队列。minItemsBeforeRefresh内定刷新前当前移动的行列的蝇头数量。
主要安装你的先后可以忍受的不排序的微乎其微值。

放入队列时需求钦赐优先级:

queue.put(aMessage, priority);

例子:

public class DistributedPriorityQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedPriorityQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildPriorityQueue(0);
            queue.start();

            for (int i = 0; i < 10; i++) {
                int priority = (int) (Math.random() * 100);
                System.out.println("test-" + i + " priority:" + priority);
                queue.put("test-" + i, priority);
                Thread.sleep((long) (50 * Math.random()));
            }

            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                Thread.sleep(1000);
                System.out.println("consume one message: " + message);
            }

        };
    }

}

 

偶然你或然会有错觉,优先级设置并从未起效。那是因为事先级是对此队列积压的因素而言,假如消费速度过快有只怕出现在后三个成分入队操作从前前二个因素已经被消费,那种情形下DistributedPriorityQueue会退化为DistributedQueue。

SimpleDistributedQueue

日前固然完毕了各类队列,可是你放在心上到没有,这个队列并没有兑现类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(可是尚未落成Queue接口)。
创设很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

追日成分:

public boolean offer(byte[] data) throws Exception

删去成分:

public byte[] take() throws Exception

除此以外还提供了任何方法:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take宗目的在于中标重回此前会被打断。
poll方法在队列为空时直接回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }


}

只是实际发送了100条音信,消费完第三条之后,前边的音信不能消费,近年来没找到原因。查看一下官方文档推荐的demo使用上边多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

只是其实使用发现照旧存在消费阻塞难点。

分布式延迟队列—DistributedDelayQueue

JDK中也有DelayQueue,不领悟您是或不是熟知。
DistributedDelayQueue也提供了接近的效益, 成分有个delay值,
消费者隔一段时间才能吸纳成分。 涉及到下边两个类。

  • QueueBuilder
  • QueueConsumer
  • QueueSerializer
  • DistributedDelayQueue

经过上边的言语创立:

QueueBuilder<MessageType>    builder = QueueBuilder.builder(client, consumer, serializer, path);
... more builder method calls as needed ...
DistributedDelayQueue<MessageType> queue = builder.buildDelayQueue();

放入成分时可以指定delayUntilEpoch

queue.put(aMessage, delayUntilEpoch);

注意delayUntilEpoch不是离以往的三个时刻间隔,
比如20毫秒,而是今后的3个日子戳,如 System.currentTimeMillis() + 10秒。
如若delayUntilEpoch的时刻已经过去,音讯会登时被消费者接受。

public class DistributedDelayQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        DistributedDelayQueue<String> queue = null;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));

            client.start();
            QueueConsumer<String> consumer = createQueueConsumer();
            QueueBuilder<String> builder = QueueBuilder.builder(client, consumer, createQueueSerializer(), PATH);
            queue = builder.buildDelayQueue();
            queue.start();

            for (int i = 0; i < 10; i++) {
                queue.put("test-" + i, System.currentTimeMillis() + 10000);
            }
            System.out.println(new Date().getTime() + ": already put all items");


            Thread.sleep(20000);

        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(queue);
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    private static QueueSerializer<String> createQueueSerializer() {
        return new QueueSerializer<String>() {

            @Override
            public byte[] serialize(String item) {
                return item.getBytes();
            }

            @Override
            public String deserialize(byte[] bytes) {
                return new String(bytes);
            }

        };
    }

    private static QueueConsumer<String> createQueueConsumer() {

        return new QueueConsumer<String>() {

            @Override
            public void stateChanged(CuratorFramework client, ConnectionState newState) {
                System.out.println("connection new state: " + newState.name());
            }

            @Override
            public void consumeMessage(String message) throws Exception {
                System.out.println(new Date().getTime() + ": consume one message: " + message);
            }

        };
    }
}

分布式屏障—Barrier

分布式Barrier是如此三个类:
它会阻塞全数节点上的等待历程,直到某七个被满意,
然后具有的节点继续开展。

例如赛马比赛前, 等赛马陆续驶来起跑线前。
一声令下,全数的赛马都飞奔而出。

SimpleDistributedQueue

前方纵然落成了各类队列,不过你放在心上到没有,这么些队列并从未落到实处类似JDK一样的接口。
SimpleDistributedQueue提供了和JDK基本一致的接口(不过没有兑现Queue接口)。
创制很简短:

public SimpleDistributedQueue(CuratorFramework client,String path)

日增成分:

public boolean offer(byte[] data) throws Exception

删除成分:

public byte[] take() throws Exception

别的还提供了其他形式:

public byte[] peek() throws Exception
public byte[] poll(long timeout, TimeUnit unit) throws Exception
public byte[] poll() throws Exception
public byte[] remove() throws Exception
public byte[] element() throws Exception

没有add方法, 多了take方法。

take艺术在功成名就再次来到此前会被卡住。
poll办法在队列为空时直接再次回到null。

public class SimpleDistributedQueueDemo {

    private static final String PATH = "/example/queue";

    public static void main(String[] args) throws Exception {
        TestingServer server = new TestingServer();
        CuratorFramework client = null;
        SimpleDistributedQueue queue;
        try {
            client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.getCuratorListenable().addListener((client1, event) -> System.out.println("CuratorEvent: " + event.getType().name()));
            client.start();
            queue = new SimpleDistributedQueue(client, PATH);
            Producer producer = new Producer(queue);
            Consumer consumer = new Consumer(queue);
            new Thread(producer, "producer").start();
            new Thread(consumer, "consumer").start();
            Thread.sleep(20000);
        } catch (Exception ex) {

        } finally {
            CloseableUtils.closeQuietly(client);
            CloseableUtils.closeQuietly(server);
        }
    }

    public static class Producer implements Runnable {

        private SimpleDistributedQueue queue;

        public Producer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            for (int i = 0; i < 100; i++) {
                try {
                    boolean flag = queue.offer(("zjc-" + i).getBytes());
                    if (flag) {
                        System.out.println("发送一条消息成功:" + "zjc-" + i);
                    } else {
                        System.out.println("发送一条消息失败:" + "zjc-" + i);
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
    }

    public static class Consumer implements Runnable {

        private SimpleDistributedQueue queue;

        public Consumer(SimpleDistributedQueue queue) {
            this.queue = queue;
        }

        @Override
        public void run() {
            try {
                byte[] datas = queue.take();
                System.out.println("消费一条消息成功:" + new String(datas, "UTF-8"));
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

不过实际上发送了100条音信,消费完第1条之后,前面的音信不可以消费,近来没找到原因。查看一下合法文档推荐的demo使用上面多少个Api:

Creating a SimpleDistributedQueue

public SimpleDistributedQueue(CuratorFramework client,
                              String path)
Parameters:
client - the client
path - path to store queue nodes
Add to the queue

public boolean offer(byte[] data)
             throws Exception
Inserts data into queue.
Parameters:
data - the data
Returns:
true if data was successfully added
Take from the queue

public byte[] take()
           throws Exception
Removes the head of the queue and returns it, blocks until it succeeds.
Returns:
The former head of the queue
NOTE: see the Javadoc for additional methods

不过实际上利用发现照旧存在消费阻塞难点。

DistributedBarrier

DistributedBarrier类完结了栅栏的功效。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

第贰你必要设置栅栏,它将阻塞在它上边等待的线程:

setBarrier();

下一场必要阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当规则满足时,移除栅栏,全部等待的线程将继续执行:

removeBarrier();

丰裕处理 DistributedBarrier
会监控连接情状,当连接断掉时waitOnBarrier()方法会抛出非凡。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

以此事例创立了controlBarrier来设置栅栏和移除栅栏。
大家创立了五个线程,在此Barrier上等待。
最终移除栅栏后全体的线程才继续执行。

如若你起来不设置栅栏,全数的线程就不会阻塞住。

分布式屏障—Barrier

分布式Barrier是那样二个类:
它会阻塞全数节点上的守候历程,直到某二个被满意,
然后全体的节点继续拓展。

譬如说赛马比赛中, 等赛马陆续来到起跑线前。
一声令下,全体的跑马都飞奔而出。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在测算的初阶和甘休时一并。当丰裕的长河进入到双栅栏时,进度起初估摸,
当计算落成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()艺术被调用时,成员被打断,直到全数的积极分子都调用了enter()
leave()主意被调用时,它也短路调用线程,直到全数的成员都调用了leave()
就如百米赛跑竞技, 发令枪响,
全数的选手开端跑,等具备的健儿跑过巅峰线,竞技才甘休。

DistributedDoubleBarrier会监控连接情形,当连接断掉时enter()leave()方法会抛出十三分。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPE帕杰罗分布式一致性原理与履行》
《 跟着实例学习ZooKeeper的用法》博客种类

品种仓库:
https://github.com/zjcscut/curator-seed
(其实本文的MD是带导航目录[toc]的,比较方便导航到各个章节,只是简书不资助,本文的MD原文放在项目的/resources/md目录下,有爱自取,著功效Typora编写,建议用Typora打开)

End on 2017-5-13 13:10.
Help yourselves!
我是throwable,在马尼拉奋斗,白天上班,早晨和双休不定时加班,早晨悠闲坚定不移写下博客。
希望自身的篇章可以给你带来收获,共勉。

DistributedBarrier

DistributedBarrier类完成了栅栏的功能。 它的构造函数如下:

public DistributedBarrier(CuratorFramework client, String barrierPath)
Parameters:
client - client
barrierPath - path to use as the barrier

首先你需要设置栅栏,它将阻塞在它上边等待的线程:

setBarrier();

然后须求阻塞的线程调用方法等待放行条件:

public void waitOnBarrier()

当规则满意时,移除栅栏,全数等待的线程将继续执行:

removeBarrier();

老大处理 DistributedBarrier
会监控连接情形,当连接断掉时waitOnBarrier()方法会抛出十三分。

public class DistributedBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            DistributedBarrier controlBarrier = new DistributedBarrier(client, PATH);
            controlBarrier.setBarrier();

            for (int i = 0; i < QTY; ++i) {
                final DistributedBarrier barrier = new DistributedBarrier(client, PATH);
                final int index = i;
                Callable<Void> task = () -> {
                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " waits on Barrier");
                    barrier.waitOnBarrier();
                    System.out.println("Client #" + index + " begins");
                    return null;
                };
                service.submit(task);
            }
            Thread.sleep(10000);
            System.out.println("all Barrier instances should wait the condition");
            controlBarrier.removeBarrier();
            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);

            Thread.sleep(20000);
        }
    }
}

其一例子创立了controlBarrier来安装栅栏和移除栅栏。
大家成立了三个线程,在此巴里r上等待。
最终移除栅栏后具有的线程才继续执行。

即使您从头不设置栅栏,全数的线程就不会阻塞住。

双栅栏—DistributedDoubleBarrier

双栅栏允许客户端在盘算的上马和终结时1只。当充分的经过进入到双栅栏时,进度发轫盘算,
当统计完成时,离开栅栏。 双栅栏类是DistributedDoubleBarrier
构造函数为:

public DistributedDoubleBarrier(CuratorFramework client,
                                String barrierPath,
                                int memberQty)
Creates the barrier abstraction. memberQty is the number of members in the barrier. When enter() is called, it blocks until
all members have entered. When leave() is called, it blocks until all members have left.

Parameters:
client - the client
barrierPath - path to use
memberQty - the number of members in the barrier

memberQty是成员数量,当enter()措施被调用时,成员被封堵,直到全部的积极分子都调用了enter()
leave()办法被调用时,它也短路调用线程,直到全数的分子都调用了leave()
如同百米赛跑比赛, 发令枪响,
全部的健儿伊始跑,等具备的运动员跑过极端线,竞赛才停止。

DistributedDouble巴里r会监控连接景况,当连接断掉时enter()leave()方法会抛出特别。

public class DistributedDoubleBarrierDemo {

    private static final int QTY = 5;
    private static final String PATH = "/examples/barrier";

    public static void main(String[] args) throws Exception {
        try (TestingServer server = new TestingServer()) {
            CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new ExponentialBackoffRetry(1000, 3));
            client.start();
            ExecutorService service = Executors.newFixedThreadPool(QTY);
            for (int i = 0; i < QTY; ++i) {
                final DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, PATH, QTY);
                final int index = i;
                Callable<Void> task = () -> {

                    Thread.sleep((long) (3 * Math.random()));
                    System.out.println("Client #" + index + " enters");
                    barrier.enter();
                    System.out.println("Client #" + index + " begins");
                    Thread.sleep((long) (3000 * Math.random()));
                    barrier.leave();
                    System.out.println("Client #" + index + " left");
                    return null;
                };
                service.submit(task);
            }

            service.shutdown();
            service.awaitTermination(10, TimeUnit.MINUTES);
            Thread.sleep(Integer.MAX_VALUE);
        }
    }

}

参考资料:
《从PAXOS到ZOOKEEPERubicon分布式一致性原理与实施》
《 跟着实例学习ZooKeeper的用法》博客种类

体系仓库:
https://github.com/zjcscut/curator-seed

作者:zhrowable
链接:https://www.jianshu.com/p/70151fc0ef5d
來源:简书
简书作品权归我全数,任何款式的转发都请联系小编得到授权并评释出处。

相关文章