Zkclient操作zookeeper案例

水深无声 2022-02-28 05:20 504阅读 0赞

Zkclient操作zookeeper

每天多学一点点~
话不多说,这就开始吧…

文章目录

  • Zkclient操作zookeeper
    • 1.Zookeeper的Java客户端API
    • 2.代码
      • 2.1. pom依赖
      • 2.2. zkCilent的序列化
      • 2.3 zkCilent的Util
      • 2.4 zkCilent的测试
    • 3.结语

1.Zookeeper的Java客户端API

有三种方式

  1. 原生
    (1)Zookeeper的Watcher是一次性的,每次触发之后都需要重新进行注册;
    (2)Session超时之后没有实现重连机制;
    (3)异常处理繁琐,Zookeeper提供了很多异常,对于开发人员来说可能根本不知道该如何处理这些异常信息;
    (4)只提供了简单的byte[]数组的接口,没有提供针对对象级别的序列化;
    (5)创建节点时如果节点存在抛出异常,需要自行检查节点是否存在;
    (6)删除节点无法实现级联删除;
  2. zkclient
    ZkClient是一个开源客户端,在Zookeeper原生API接口的基础上进行了包装,更便于开发人员使用。内部实现了Session超时重连,Watcher反复注册等功能。像dubbo等框架对其也进行了集成使用。
    虽然ZkClient对原生API进行了封装,但也有它自身的不足之处:
    (1)几乎没有参考文档;
    (2)异常处理简化(抛出RuntimeException);
    (3)重试机制比较难用;
    (4)没有提供各种使用场景的实现;
  3. Curator
    Curator是Netflix公司开源的一套Zookeeper客户端框架,和ZkClient一样,解决了非常底层的细节开发工作,包括连接重连、反复注册Watcher和NodeExistsException异常等。目前已经成为Apache的顶级项目。另外还提供了一套易用性和可读性更强的Fluent风格的客户端API框架。

本文选用zkclient 进行操作

2.代码

在这里插入图片描述

2.1. pom依赖

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.10</version>
  5. </dependency>

2.2. zkCilent的序列化

防止数据在客户端呈现乱码,且 能 启动监听机制

  1. package com.jiagouedu.zkclient.watcher;
  2. import org.I0Itec.zkclient.exception.ZkMarshallingError;
  3. import org.I0Itec.zkclient.serialize.ZkSerializer;
  4. import java.io.UnsupportedEncodingException;
  5. /**
  6. * @Classname zk序列化 和 反 序列化 方式
  7. * @Description TODO
  8. * @Date 2019/3/24 3:54
  9. * @Created by 爆裂无球
  10. */
  11. public class MyZkSerializer implements ZkSerializer {
  12. /**
  13. * zk自带的序列化
  14. */
  15. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  16. try {
  17. return new String(bytes, "UTF-8");
  18. } catch (UnsupportedEncodingException e) {
  19. e.printStackTrace();
  20. }
  21. return null;
  22. }
  23. public byte[] serialize(Object obj) throws ZkMarshallingError {
  24. try {
  25. return String.valueOf(obj).getBytes("UTF-8");
  26. } catch (UnsupportedEncodingException e) {
  27. e.printStackTrace();
  28. }
  29. return null;
  30. }
  31. }

2.3 zkCilent的Util

  1. package com.jiagouedu.zkclient.znode;
  2. import com.jiagouedu.zkclient.watcher.MyZkSerializer;
  3. import org.I0Itec.zkclient.ZkClient;
  4. import org.apache.zookeeper.CreateMode;
  5. import org.apache.zookeeper.ZooDefs;
  6. import java.util.List;
  7. /**
  8. * @Classname ZkClientCrud
  9. * @Description TODO
  10. * @Date 2019/3/24 2:54
  11. * @Created by 爆裂无球
  12. */
  13. public class ZkClientCrud<T> {
  14. ZkClient zkClient;
  15. // private String connectString="192.168.0.31:2181,192.168.0.32:2181,192.168.0.33:2181";
  16. private String connectString = "127.0.0.1:2181";
  17. // public ZkClientUtil() {
  18. // this.zkClient = new ZkClient(connectString,5000,5000,new SerializableSerializer()); // zk默认的序列化方式
  19. // }
  20. public ZkClientUtil() {
  21. //定义序列化方式,别忘了
  22. this.zkClient = new ZkClient(connectString, 5000, 5000, new MyZkSerializer());
  23. }
  24. public void createPersistent(String path) {
  25. zkClient.createPersistent(path, true); //创建持久化节点,true表示如果父节点不存在则创建父节点
  26. }
  27. //创建 永久 节点,并设置数据
  28. public void createPersistent(String path, Object data) {
  29. zkClient.createPersistent(path, data);
  30. }
  31. // 创建永久 有序节点
  32. public void createPersistentSequential(String path, Object data) {
  33. zkClient.createEphemeralSequential(path, data);
  34. }
  35. //创建临时节点 会话失效后删除
  36. public void createEphemeral(String path, Object data) {
  37. zkClient.createEphemeral(path, data);
  38. }
  39. //创建 临时节点 有序 节点 会话失效后删除
  40. public void createEphemeralSequential(String path, Object data) {
  41. zkClient.createEphemeralSequential(path, data);
  42. }
  43. //创建alc节点
  44. public void createAcl(String path, Object data, final List<ACL> acl, final CreateMode mode) {
  45. zkClient.create(path, data, acl, mode);
  46. }
  47. //设置acl 属性
  48. public void setAcl(String path, List<ACL> acl) {
  49. zkClient.setAcl(path, acl);
  50. }
  51. //获得acl属性
  52. public Map.Entry<List<ACL>, Stat> getAcl(String path) {
  53. return zkClient.getAcl(path);
  54. }
  55. //读取数据
  56. public T readData(String path) {
  57. // return zkClient.readData(path);
  58. //没有不会抛异常,而是返回null
  59. return zkClient.readData(path, true);
  60. }
  61. /**
  62. * 读取 子节点 只能找 其 子一级 下 所有的
  63. */
  64. public List<String> getChildren(String path) {
  65. return zkClient.getChildren(path);
  66. }
  67. /**
  68. * 递归查找 所有 子节点
  69. */
  70. public void getChilderRecursive(String path) {
  71. System.out.println(path);
  72. if (zkClient.exists(path)) {
  73. List<String> list = zkClient.getChildren(path);
  74. if (list.size() > 0) {
  75. list.stream().forEach(n -> {
  76. getChilderRecursive(path + "/" + n);
  77. });
  78. }
  79. }
  80. }
  81. // 更新内容
  82. public void writeData(String path, Object object) {
  83. zkClient.writeData(path, object);
  84. }
  85. //删除单个节点 即这个节点下不能有子节点
  86. public void delete(String path) {
  87. zkClient.delete(path);
  88. }
  89. //递归删除节点 即删除其节点下 所有子节点 对应rmr 命令
  90. public void deleteRecursive(String path) {
  91. zkClient.deleteRecursive(path);
  92. }
  93. /***
  94. * 支持创建递归方式 但是不能写入数据
  95. * @param path
  96. * @param createParents true,表明需要递归创建父节点
  97. */
  98. public void createPersistentRecursive(String path, boolean createParents) {
  99. zkClient.createPersistent(path, createParents);
  100. }
  101. /**
  102. * 关闭zk
  103. */
  104. public void close() {
  105. zkClient.close();
  106. }
  107. /**
  108. * 监听
  109. */
  110. public void lister(String path) {
  111. //对节点添加监听变化。 当前节点内容修改、节点删除 能监听数据
  112. zkClient.subscribeDataChanges(path, new IZkDataListener() {
  113. @Override
  114. public void handleDataChange(String dataPath, Object data) throws Exception {
  115. System.out.printf(" 变更的节点为:%s,%s", dataPath, data); // 节点变更 变更的节点为:/w,123
  116. }
  117. @Override
  118. public void handleDataDeleted(String dataPath) throws Exception {
  119. System.out.printf(" 删除的节点为:%s", dataPath);
  120. }
  121. });
  122. //对父节点添加监听子节点变化。监听 下面子节点的新增、删除 和 当前节点 不监听数据发生修改和变化。 parentPath: /w,currentChilds:[ww1]
  123. zkClient.subscribeChildChanges(path, new IZkChildListener() {
  124. @Override
  125. public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
  126. System.out.println(" parentPath: " + parentPath + ",currentChilds:" + currentChilds);
  127. }
  128. });
  129. //zeng gai shan
  130. //对父节点添加监听子节点变化。
  131. zkClient.subscribeStateChanges(new IZkStateListener() {
  132. @Override
  133. public void handleStateChanged(Watcher.Event.KeeperState state) throws Exception {
  134. if (state == Watcher.Event.KeeperState.SyncConnected) {
  135. //当我重新启动后start,监听触发
  136. System.out.println("连接成功");
  137. } else if (state == Watcher.Event.KeeperState.Disconnected) {
  138. System.out.println("连接断开");//当我在服务端将zk服务stop时,监听触发
  139. } else
  140. System.out.println("其他状态" + state);
  141. }
  142. @Override
  143. public void handleNewSession() throws Exception {
  144. System.out.println("重建session");
  145. }
  146. @Override
  147. public void handleSessionEstablishmentError(Throwable error) throws Exception {
  148. }
  149. });
  150. }
  151. }

2.4 zkCilent的测试

  1. package com.jiagouedu.zkclient.ZkSerializer;
  2. import com.jiagouedu.zkclient.watcher.MyZkSerializer;
  3. import org.I0Itec.zkclient.IZkChildListener;
  4. import org.I0Itec.zkclient.IZkDataListener;
  5. import org.I0Itec.zkclient.IZkStateListener;
  6. import org.I0Itec.zkclient.ZkClient;
  7. import org.I0Itec.zkclient.serialize.SerializableSerializer;
  8. import org.apache.zookeeper.Watcher;
  9. import java.util.List;
  10. /**
  11. * @Classname 监听
  12. * @Description TODO
  13. * @Date 2019/3/24 3:38
  14. * @Created by 爆裂无球
  15. */
  16. public class ZkClientWatcher<T> {
  17. private ZkClientUtil zkClientUtil = new ZkClientUtil();
  18. ZkClient zkClient;
  19. /**
  20. * 新增 节点(持节+临时) 并读取数据 (不能直接创建 子节点)
  21. */
  22. @Test
  23. public void createPersistent() {
  24. User user = new User();
  25. user.setAge(18);
  26. user.setName("zjq");
  27. zkClientUtil.createPersistent("/ry", user); // 创建永久节点
  28. System.out.println(zkClientUtil.readData("/ry")); // 读取 节点 数据
  29. User user2 = new User();
  30. user2.setAge(23);
  31. user2.setName("lj");
  32. zkClientUtil.createEphemeral("/rh", user2); // 创建临时节点
  33. System.out.println(zkClientUtil.readData("/rh"));
  34. }
  35. /**
  36. * 新增 节点(永久有序,临时有序) 并读取数据 (不能直接创建 子节点)
  37. */
  38. @Test
  39. public void create() {
  40. User user = new User();
  41. user.setAge(1);
  42. user.setName("我擦");
  43. zkClientUtil.createPersistentSequential("/ry", user); // 创建 永久有序节点
  44. User user2 = new User("我擦2", 2);
  45. zkClientUtil.createPersistentSequential("/ry", user2); // 创建 临时有序节点
  46. }
  47. /**
  48. * 创建acl节点
  49. */
  50. @Test
  51. public void createAcl() {
  52. User user = new User("acldata", 3);
  53. User user2 = new User("acldata2", 4);
  54. List<ACL> aclList = new ArrayList<>();
  55. int perm = ZooDefs.Perms.ADMIN | ZooDefs.Perms.READ | ZooDefs.Perms.WRITE; // 或 运算 admin 可以设置节点访问控制列表权限
  56. aclList.add(new ACL(perm, new Id("world", "anyone"))); //设置有所人的权限
  57. // aclList.add(new ACL(ZooDefs.Perms.ALL, new Id("ip", "127.0.0.1"))); //设置所有权限,本机ip
  58. zkClientUtil.createPersistent("/acl", user); // 创建节点
  59. zkClientUtil.setAcl("/acl", aclList); // 给节点设置acl权限
  60. //创建并设置acl节点 ZooDefs.Ids.OPEN_ACL_UNSAFE 表anyone
  61. zkClientUtil.createAcl("/alc2", aclList, ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  62. }
  63. /**
  64. * 获得 acl 属性
  65. */
  66. @Test
  67. public void getAndSetAcl() {
  68. Map.Entry acl = zkClientUtil.getAcl("/acl");
  69. System.out.println(acl.getKey());
  70. System.out.println(acl.getValue());
  71. }
  72. /**
  73. * 递归创建节点(但是不能写入数据) 写入数据,读数据,,再 删除子节点
  74. */
  75. @Test
  76. public void test() {
  77. String path = "/zjq/t1";
  78. zkClientUtil.createPersistentRecursive(path, true);
  79. zkClientUtil.writeData(path, new User("xx", 3));
  80. Object o = zkClientUtil.readData(path);
  81. System.out.println(o);
  82. zkClientUtil.deleteRecursive(path);
  83. }
  84. /**
  85. * 创建子节点,并读取
  86. */
  87. @Test
  88. public void getChildren() {
  89. String path = "/zjq/t1";
  90. String path2 = "/zjq/t2";
  91. String path3 = "/zjq/t3";
  92. zkClientUtil.createPersistentRecursive(path, true);
  93. zkClientUtil.createPersistentRecursive(path2, true);
  94. zkClientUtil.createPersistentRecursive(path3, true);
  95. List list = zkClientUtil.getChildren("/zjq");
  96. list.stream().forEach(n -> {
  97. System.out.println(n);
  98. });
  99. }
  100. /**
  101. * 递归查找 所有 子节点
  102. */
  103. @Test
  104. public void getChilderRecursive() {
  105. String path = "/zjq";
  106. zkClientUtil.getChilderRecursive(path);
  107. }
  108. /**
  109. * 测试监听 并且开启 下面的main方法
  110. */
  111. @Test
  112. public void testListen() throws InterruptedException {
  113. ZkClientUtil zkClientUtil = new ZkClientUtil();
  114. String path = "/wukong/w1";
  115. zkClientUtil.deleteRecursive(path); //先删除
  116. zkClientUtil.lister(path); //添加监听
  117. zkClientUtil.createPersistent(path, "123"); //再创建节点
  118. Thread.sleep(2000);
  119. zkClientUtil.writeData(path, "abc"); //修改数据
  120. Thread.sleep(Integer.MAX_VALUE);
  121. }
  122. public static void main(String[] args) throws InterruptedException {
  123. ZkClientUtil zkClientUtil=new ZkClientUtil();
  124. String path="/wukong/w1";
  125. zkClientUtil.writeData(path,"abc"); //能触发 或者在sh zkCli.sh delete /wukong 也行
  126. }
  127. }

大概内容就这么多,不全的可以查查详细文档

3.结语

Curator好比hibernate,而zkclient就好比mybatis,一般情况下,zkclient可以满足需求~
世上无难事,只怕有心人,每天积累一点点,fighting!!!

发表评论

表情:
评论列表 (有 0 条评论,504人围观)

还没有评论,来说两句吧...

相关阅读