品味ZooKeeper

客户端对比选择

Posted by Liangjf on March 10, 2018

品味Zookeeper之客户端对比选择

本文思维导图

使用框架的好处是自带一套实用的API,但是Zookeeper虽然非常强大,但是社区却安静的可怕,版本更新较慢,下面会先从zookeeper原生API的不足说起,然后引出现在流行的开源客户端工具。

1.原生API

  • 1.创建连接的时候是异步的,所以我们在开发的时候需要人工的写代码等待创建节点的状态,如果需要的话。
  • 2.连接时无超时重连机制。本人觉得这个非常重要,因为在现实使用中,网络是不可信的,在创建节点的时候要考虑到网络的不稳定性。因此,超时重连机制是非常必要的。
  • 3.zookeepr的通信是网络通信,因此在数据通信的时候会消耗一定的网络IO和带宽。但zookeeper没有序列化机制,需要开发者自行开发。
  • 4.Watcher注册一次,触发后会自动失效。
  • 5.不支持递归创建树形节点。这点是比较有用的,类似Linux的命令:mkdir -p /xxx/xxx/

基于以上的一些不足,引起了业界一些大佬的不满,因此它们自行开发了一些开源的客户端工具。比如ZkClient和Curator。对前者简单介绍,现在使用最多的是后者。

2.ZkClient

2.1 ZkClient概述

ZkClient是Github上一个开源的zk客户端,由datameer的工程师Stefan Groschupf和Peter Voss一起开发(最仰慕的就是这类大佬,类似Linus那样,一不爽写个开源版本(git)出来…)

  • 解决session会话超时重连。
  • Watcher反复注册。
  • 简化开发api。

当然还有很多的很多修改的功能,使用也很简单,但是社区不活跃,连api文档都不完善,对于我们来说只能看源码来开发应用了,也略有麻烦的。有兴趣的开源上github看看。 https://github.com/sgroschupf/zkclient

2.2 ZkClien API

  • 创建客户端 ZkClient zkclient = new ZkClient("192.168.17.128:2181,192.168.17.129:2181,192.168.17.130:2181",5000);
  • 创建节点 zkclient.create(path, data, CreateMode.PERSISTENT);
  • 删除节点 zkclient.delete(path);
  • 获取子节点 zkclient.getChildren(path);
  • 关闭客户端 zkclient.close();
  • 读节点数据 zkclient.readData(path);
  • 判断节点是否存在 zkclient.exists(path);

3.Curator

3.1 Curator概述

Curator是Apache基金会的顶级项目之一。Apache基金会就类似万能储备室,把全球顶级的开源项目收纳其中,造福一方百姓啊。

  • 下面是Curator对比原生zk的API和ZkClient比较重要的完善点:
  • 解决session会话超时重连。
  • Watcher反复注册。
  • 简化开发api。
  • 遵循Fluent风格Api规范。
  • NodeExistsException异常处理。
  • ……

3.2 Curator API介绍

  • 创建会话
  • 1.使用CuratorFrameworkFactory工厂的两个静态方法创建客户端
  • 2.Start()方法启动

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); client = CuratorFrameworkFactory.builder() .connectString(“localhost:2181,localhost:2182”) .sessionTimeoutMs(10000).retryPolicy(retryPolicy) .namespace(“base”).build(); client.start();

  • 1.重试策略(实现接口RetryPolicy可以自定义重试策略) boolean allowRetry(int retryCount, long elapsedTimeMs, RetrySleeper sleeper)

  • 2.默认四种重试策略: Exponential BackoffRetry、RetryNTimes、RetryOneTime、 RetryUntilElapsed

  • 2.1 ExponentialBackoffRetry
  • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries)
  • ExponentialBackoffRetry(int baseSleepTimeMs, int maxRetries, int maxSleepMs)
  • 当前应该sleep的时间: baseSleepTimeMs * Math.max(1, random.nextInt(1 << (retryCount + 1)))

  • 2.2 RetryNTimes
  • RetryNTimes(int n, int sleepMsBetweenRetries)

  • 2.3 RetryOneTime
  • RetryOneTime(int sleepMsBetweenRetry)

  • 2.4 RetryUntilElapsed RetryUntilElapsed(int maxElapsedTimeMs, int sleepMsBetweenRetries)

  • 3.Fluent风格的API
  • 定义:一种面向对象的开发方式,目的是提高代码的可读性
  • 实现方式͹通过方法的级联或者方法链的方式实现
  • 例子:

      client = CuratorFrameworkFactory.builder()
      		.connectString("localhost:2181,localhost:2182")
      		.sessionTimeoutMs(10000).retryPolicy(retryPolicy)
      		.namespace("base").build();
    
  • 4.常用API
  • 4.1创建节点

      public void createNode(String path, byte[] data) throws Exception {
      	client.getZookeeperClient().getZooKeeper().addAuthInfo("digest", "test:123456".getBytes());
      	client.create().creatingParentsIfNeeded()
      			.withMode(CreateMode.PERSISTENT).withACL(Ids.CREATOR_ALL_ACL)
      			.forPath(path, data);
      }
    
  • 4.2删除节点

      public void deleteNode(String path, int version) throws Exception {
          client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(version)
                  .inBackground(new DeleteCallBack()).forPath(path);
      }
    
  • 4.3读取节点

      public void readNode(String path) throws Exception {
          byte[] data = client.getData().inBackground(new DeleteCallBack()).forPath(path);
          System.out.println(path + "的数据:" + new String(data));
      }
    
  • 4.4更新节点数据

      public void updateNode(String path, byte[] data, int version)
              throws Exception {
          client.setData().withVersion(version).inBackground(new DeleteCallBack()).forPath(path, data);
      }
    
  • 4.5获取子节点

      public void getChildren(String path) throws Exception {
          List<String> children = client.getChildren().usingWatcher(new WatcherTest()).forPath("/test");
          for (String pth : children) {
              System.out.println("child=" + pth);
          }
      }
    

4.6为节点添加监听

  • NodeCache
  • 监听数据节点的内容变更
  • 监听节点的创建,即如果指定的节点不存在,则节点创建后,会触发这个监听
  • PathChildrenCache
  • 监听指定节点的子节点变化情况
  • 包括新增子节点,子节点数据变更和子节点删除

     public void addNodeDataWatcher(String path) throws Exception {
         final NodeCache nodeC = new NodeCache(client, path);
         nodeC.start(true);
         nodeC.getListenable().addListener(new NodeCacheListener() {
             public void nodeChanged() throws Exception {
                 String data = new String(nodeC.getCurrentData().getData());
                 System.out.println("path=" + nodeC.getCurrentData().getPath()
                         + ":data=" + data);
             }
         });
     }
    

4.7异步回调

  • inBackground()
  • inBackground(Object context)
  • inBackground(BackgroundCallback callback)
  • inBackground(BackgroundCallback callback, Object context)
  • inBackground(BackgroundCallback callback, Executor executor)
  • inBackground(BackgroundCallback callback, Object context, Executor executor)

Curator的回调与zk的原生异步api相同,多了一个线程池,用于执行回调。

异步操作事件状态:event.getType() 异步操作事件状态码:event.getResultCode()

以上就是Curator常用的API,都是利用这些API来进行更加复杂的应用开发的,比如分布式锁,集群管理等应用。

4.小结

好的开源工具解放我们的开发,但是不要在其中迷失,还要深入的了解其中的设计原理,不能只是调用API,要学习人家的思想,这样才能跟随大神的脚步,提升自己的实力。