品味Zookeeper之客户端对比选择
本文思维导图
使用框架的好处是自带一套实用的API,但是Zookeeper虽然非常强大,但是社区却安静的可怕,版本更新较慢,下面会先从zookeeper原生API的不足说起,然后引出现在流行的开源客户端工具。
1.原生API
- 1.创建连接的时候是异步的,所以我们在开发的时候需要人工的写代码等待创建节点的状态,如果需要的话。
- 2.连接时无超时重连机制。本人觉得这个非常重要,因为在现实使用中,网络是不可信的,在创建节点的时候要考虑到网络的不稳定性。因此,超时重连机制是非常必要的。
- 3.zookeepr的通信是网络通信,因此在数据通信的时候会消耗一定的网络IO和带宽。但zookeeper没有序列化机制,需要开发者自行开发。
- 4.Watcher注册一次,触发后会自动失效。
- 5.不支持递归创建树形节点。这点是比较有用的,类似Linux的命令:
mkdir -p /xxx/xxx/
基于以上的一些不足,引起了业界一些大佬的不满,因此它们自行开发了一些开源的客户端工具。比如ZkClient和Curator。对前者简单介绍,现在使用最多的是后者。
2.ZkClient
2.1 ZkClient概述
ZkClient是Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Voss一起开发(最仰慕的就是这类大佬,类似Linus那样,一不爽写个开源版本(git)出来…)
- 解决session会话超时重连。
- Watcher反复注册。
- 简化开发api。
当然还有很多的很多修改的功能,使用也很简单,但是社区不活跃,连api文档都不完善,对于我们来说只能看源码来开发应用了,也略有麻烦的。有兴趣的开源上github看看。 https://github.com/sgroschupf/zkclient
2.2 ZkClien API
- 创建客户端
ZkClient zkclient = new ZkClient("192.168.17.128:2181,192.168.17.129:2181,192.168.17.130:2181",5000);
- 创建节点
zkclient.create(path, data, CreateMode.PERSISTENT);
- 删除节点
zkclient.delete(path);
- 获取子节点
zkclient.getChildren(path);
- 关闭客户端
zkclient.close();
- 读节点数据
zkclient.readData(path);
- 判断节点是否存在
zkclient.exists(path);
3.Curator
3.1 Curator概述
Curator是Apache基金会的顶级项目之一。Apache基金会就类似万能储备室,把全球顶级的开源项目收纳其中,造福一方百姓啊。
- 下面是Curator对比原生zk的API和ZkClient比较重要的完善点:
- 解决session会话超时重连。
- Watcher反复注册。
- 简化开发api。
- 遵循Fluent风格Api规范。
- NodeExistsException异常处理。
- ……
3.2 Curator API介绍
- 创建会话
- 1.使用CuratorFrameworkFactory工厂的两个静态方法创建客户端
-
2.Start()方法启动
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(“localhost:2181,localhost:2182”) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace(“base”).build(); client.start();
-
1.重试策略(实现接口RetryPolicy可以自定义重试策略)
boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)
-
2.默认四种重试策略: Exponential BackoffRetry、RetryNTimes、RetryOneTime、 RetryUntilElapsed
- 2.1 ExponentialBackoffRetry
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
-
当前应该sleep的时间:
baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))
- 2.2 RetryNTimes
-
RetryNTimes(int n, int sleepMsBetweenRetries)
- 2.3 RetryOneTime
-
RetryOneTime(int sleepMsBetweenRetry)
-
2.4 RetryUntilElapsed
RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)
- 3.Fluent风格的API
- 定义:一种面向对象的开发方式,目的是提高代码的可读性
- 实现方式通过方法的级联或者方法链的方式实现
-
例子:
client = CuratorFrameworkFactory.builder() .connectString("localhost:2181,localhost:2182") .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace("base").build();
- 4.常用API
-
4.1创建节点
public void createNode(String path, byte[] data) throws Exception { client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", "test:123456".getBytes()); client.create().creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT).withACL(Ids.CREATOR_ALL_ACL) .forPath(path, data); }
-
4.2删除节点
public void deleteNode(String path, int version) throws Exception { client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version) .inBackground(new DeleteCallBack()).forPath(path); }
-
4.3读取节点
public void readNode(String path) throws Exception { byte[] data = client.getData().inBackground(new DeleteCallBack()).forPath(path); System.out.println(path + "的数据:" + new String(data)); }
-
4.4更新节点数据
public void updateNode(String path, byte[] data, int version) throws Exception { client.setData().withVersion(version).inBackground(new DeleteCallBack()).forPath(path, data); }
-
4.5获取子节点
public void getChildren(String path) throws Exception { List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/test"); for (String pth : children) { System.out.println("child=" + pth); } }
4.6为节点添加监听
- NodeCache
- 监听数据节点的内容变更
- 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听
- PathChildrenCache
- 监听指定节点的子节点变化情况
-
包括新增子节点,子节点数据变更和子节点删除
public void addNodeDataWatcher(String path) throws Exception { final NodeCache nodeC = new NodeCache(client, path); nodeC.start(true); nodeC.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { String data = new String(nodeC.getCurrentData().getData()); System.out.println("path=" + nodeC.getCurrentData().getPath() + ":data=" + data); } }); }
4.7异步回调
inBackground()
inBackground(Object context)
inBackground(BackgroundCallback callback)
inBackground(BackgroundCallback callback, Object context)
inBackground(BackgroundCallback callback, Executor executor)
inBackground(BackgroundCallback callback, Object context, Executor executor)
Curator的回调与zk的原生异步api相同,多了一个线程池,用于执行回调。
异步操作事件状态:event.getType() 异步操作事件状态码:event.getResultCode()
以上就是Curator常用的API,都是利用这些API来进行更加复杂的应用开发的,比如分布式锁,集群管理等应用。
4.小结
好的开源工具解放我们的开发,但是不要在其中迷失,还要深入的了解其中的设计原理,不能只是调用API,要学习人家的思想,这样才能跟随大神的脚步,提升自己的实力。