zkclient操作zookeeper

喜欢ヅ旅行 2023-07-20 12:22 105阅读 0赞

目录

1、概述

2、创建zookeeper节点

引入zkclient 包

自定义ZkClient 工具类

zookeeper节点操作类

运行结果

分析

3、写入zookeeper节点数据

序列化类

User类

ZkClient 工具类增加方法

测试类

运行结果

分析

4、zookeeper 监听

监听数据变化

示例代码

运行结果

分析

监听节点变化

示例代码

运行结果

分析


1、概述

Zookeeper 是树状结构的注册中心,每个节点的类型分为持久节点、持久顺序节点、临时节点、临时顺序节点。

  • 持久节点:服务注册后保证节点不会丢失,注册中心重启也会存在。
  • 在持久节点特性的基础上增加了节点先后顺序的能力。
  • 临时节点:服务注册后连接丢失或 session 超时,注册的节点会自动被移除。
  • 临时顺序节点:在临时节点特性的基础上增加了节点先后顺序的能力。

2、创建zookeeper节点

引入zkclient 包

  1. <dependency>
  2. <groupId>com.101tec</groupId>
  3. <artifactId>zkclient</artifactId>
  4. <version>0.10</version>
  5. </dependency>

自定义ZkClient 工具类

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import org.I0Itec.zkclient.ZkClient;
  3. import java.util.List;
  4. public class ZkClientUtil {
  5. private static String zkServer = "127.0.0.1:2181";//zookeeper地址
  6. private static ZkClient zkClient;
  7. public static ZkClient connectZkClient() {
  8. if (zkClient != null) return zkClient;
  9. zkClient = new ZkClient(zkServer);//创建zookeeper的java客户端连接
  10. return zkClient;
  11. }
  12. public static void closeZkClient() {
  13. if (zkClient != null) {
  14. zkClient.close();
  15. zkClient = null;
  16. }
  17. }
  18. /**
  19. * 遍历展示目录下的所有节点
  20. * @author LAN
  21. * @date 2018年12月3日
  22. * @param root
  23. */
  24. public static void showZkPath(String root) {
  25. showZkPath(connectZkClient(), root);
  26. }
  27. public static void showZkPath(ZkClient zkClient, String root) {
  28. List<String> children = zkClient.getChildren(root);//获取节点下的所有直接子节点
  29. if (children.isEmpty()) {
  30. return;
  31. }
  32. for (String s : children) {
  33. String childPath = root.endsWith("/") ? (root + s) : (root + "/" + s);
  34. System.err.println(childPath);
  35. showZkPath(zkClient, childPath);//递归获取所有子节点
  36. }
  37. }
  38. }

zookeeper节点操作类

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import org.I0Itec.zkclient.ZkClient;
  3. public class ZookeeperOperation {
  4. public static void main(String[] args) {
  5. ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
  6. if(!zkClient.exists("/test")) {
  7. zkClient.createPersistent("/test");
  8. }
  9. System.err.println("***************在/test下分别创建4种节点***************");
  10. if(!zkClient.exists("/test/永久节点1")) {
  11. zkClient.createPersistent("/test/永久节点1");
  12. }
  13. String s1 = zkClient.createPersistentSequential("/test/永久顺序节点", null);
  14. zkClient.createEphemeral("/test/临时节点1");
  15. zkClient.createEphemeralSequential("/test/临时顺序节点1", null);
  16. ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录
  17. System.err.println("***************关闭客户端再创建新的客户端***************");
  18. ZkClientUtil.closeZkClient();
  19. zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
  20. ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录
  21. System.err.println("***************删除节点s1***************");
  22. zkClient.delete(s1);//删除某个节点,如果该节点下有子节点,则会报错
  23. ZkClientUtil.showZkPath(zkClient, "/test");//展示test目录下的所有子目录
  24. zkClient.deleteRecursive("/test");//强制删除某个节点,并且删除节点下的所有子节点
  25. zkClient.close();
  26. }
  27. }

运行结果

  1. ***************在/test下分别创建4种节点***************
  2. /test/永久顺序节点0000000004
  3. /test/临时顺序节点10000000006
  4. /test/永久节点1
  5. /test/临时节点1
  6. /test/永久顺序节点0000000001
  7. ***************关闭客户端再创建新的客户端***************
  8. [01/04/20 02:16:24:024 CST] ZkClient-EventThread-11-127.0.0.1:2181 INFO zkclient.ZkEventThread: Terminate ZkClient event thread.
  9. [01/04/20 02:16:24:024 CST] main INFO zookeeper.ZooKeeper: Session: 0x10001204ed20003 closed
  10. [01/04/20 02:16:24:024 CST] main INFO zookeeper.ZooKeeper: Initiating client connection, connectString=127.0.0.1:2181 sessionTimeout=30000 watcher=org.I0Itec.zkclient.ZkClient@12f40c25
  11. [01/04/20 02:16:24:024 CST] main-EventThread INFO zookeeper.ClientCnxn: EventThread shut down for session: 0x10001204ed20003
  12. [01/04/20 02:16:24:024 CST] main INFO zkclient.ZkClient: Waiting for keeper state SyncConnected
  13. [01/04/20 02:16:24:024 CST] ZkClient-EventThread-14-127.0.0.1:2181 INFO zkclient.ZkEventThread: Starting ZkClient event thread.
  14. [01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181) INFO zookeeper.ClientCnxn: Opening socket connection to server 127.0.0.1/127.0.0.1:2181. Will not attempt to authenticate using SASL (unknown error)
  15. [01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181) INFO zookeeper.ClientCnxn: Socket connection established to 127.0.0.1/127.0.0.1:2181, initiating session
  16. [01/04/20 02:16:24:024 CST] main-SendThread(127.0.0.1:2181) INFO zookeeper.ClientCnxn: Session establishment complete on server 127.0.0.1/127.0.0.1:2181, sessionid = 0x10001204ed20004, negotiated timeout = 30000
  17. [01/04/20 02:16:24:024 CST] main-EventThread INFO zkclient.ZkClient: zookeeper state changed (SyncConnected)
  18. /test/永久顺序节点0000000004
  19. /test/永久节点1
  20. /test/永久顺序节点0000000001
  21. ***************删除节点s1***************
  22. /test/永久节点1
  23. /test/永久顺序节点0000000001

分析

首先创建四种节点,当断开与zookeeper连接后,临时节点自动删除,再次连接时永久节点依然存在。

3、写入zookeeper节点数据

序列化类

要在zookeeper节点上写入数据以及读取数据,需要对数据进行序列化及反序列化。

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import org.I0Itec.zkclient.exception.ZkMarshallingError;
  3. import org.I0Itec.zkclient.serialize.ZkSerializer;
  4. import java.io.*;
  5. public class ZkSerialize implements ZkSerializer {
  6. /**
  7. * 序列化
  8. */
  9. @Override
  10. public byte[] serialize(Object data) throws ZkMarshallingError {
  11. ObjectOutputStream oos = null;
  12. ByteArrayOutputStream baos = null;
  13. try {
  14. // 序列化
  15. baos = new ByteArrayOutputStream();
  16. oos = new ObjectOutputStream(baos);
  17. oos.writeObject(data);
  18. byte[] bytes = baos.toByteArray();
  19. return bytes;
  20. } catch (Exception e) {
  21. e.printStackTrace();
  22. }finally {
  23. try {
  24. if(baos!=null) baos.close();
  25. if(oos!=null) oos.close();
  26. } catch (IOException e) {
  27. e.printStackTrace();
  28. }
  29. }
  30. return null;
  31. }
  32. /**
  33. * 反序列化
  34. */
  35. @Override
  36. public Object deserialize(byte[] bytes) throws ZkMarshallingError {
  37. ByteArrayInputStream bais = null;
  38. ObjectInputStream ois = null;
  39. try {
  40. // 反序列化
  41. bais = new ByteArrayInputStream(bytes);
  42. ois = new ObjectInputStream(bais);
  43. return ois.readObject();
  44. } catch (Exception e) {
  45. e.printStackTrace();
  46. } finally {
  47. try {
  48. bais.close();
  49. ois.close();
  50. } catch (IOException e) {
  51. e.printStackTrace();
  52. }
  53. }
  54. return null;
  55. }
  56. }

User类

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import java.io.Serializable;
  3. public class User implements Serializable {
  4. private String name;
  5. public String getName() {
  6. return name;
  7. }
  8. public void setName(String name) {
  9. this.name = name;
  10. }
  11. @Override
  12. public String toString() {
  13. return "[User:name=" + name + "]";
  14. }
  15. }

ZkClient 工具类增加方法

  1. /**
  2. * 遍历展示目录下的所有节点
  3. * @author test
  4. * @date 2018年12月3日
  5. * @param root
  6. */
  7. public static void showZkPathData(String root, ZkSerialize serializer) {
  8. showZkPathData(connectZkClient(), root, serializer);
  9. }
  10. public static void showZkPathData(ZkClient zkClient, String root, ZkSerialize serializer) {
  11. zkClient.setZkSerializer(serializer);
  12. List<String> children = zkClient.getChildren(root);
  13. if(children.isEmpty()){
  14. return;
  15. }
  16. for(String s:children){
  17. String childPath = root.endsWith("/")?(root+s):(root+"/"+s);
  18. Object data = zkClient.readData(childPath, true);
  19. if(data!=null) {
  20. System.err.println(data.getClass());
  21. }
  22. System.err.println(childPath+"("+data+")");
  23. showZkPathData(zkClient, childPath, serializer);
  24. }
  25. }

测试类

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import org.I0Itec.zkclient.ZkClient;
  3. public class ZookeeperDataOperation {
  4. public static void main(String[] args) {
  5. ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
  6. if(!zkClient.exists("/test")) {
  7. zkClient.createPersistent("/test");
  8. }
  9. User user = new User();
  10. User user2 = new User();
  11. user.setName("阮浩");
  12. user2.setName("张三");
  13. zkClient.setZkSerializer(new ZkSerialize());//这里先设置好序列化工具再写入数据
  14. zkClient.createEphemeral("/test/ruanhao", user);
  15. zkClient.createEphemeral("/test/zhangsan");
  16. zkClient.writeData("/test/zhangsan", user2);
  17. ZkClientUtil.showZkPathData(zkClient, "/test", new ZkSerialize());//展示test目录下的所有子目录
  18. ZkClientUtil.closeZkClient();
  19. }
  20. }

运行结果

  1. class com.alibaba.dubbo.samples.zkclient.User
  2. /test/zhangsan([User:name=张三])
  3. class com.alibaba.dubbo.samples.zkclient.User
  4. /test/ruanhao([User:name=阮浩])

分析

可以在创建路径时存入数据,也可以调用 writeData 存入数据。

4、zookeeper 监听

通过注册监听器,当zookeeper节点发生变化时,zookeeper会主动通知客户端,从而实现一些功能。

监听节点的数据变化事件包括:1、节点被创建; 2、节点上写入数据; 3、节点数据变化; 4、节点数据被删除; 5、节点本身被删除。

监听数据变化

示例代码

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import org.I0Itec.zkclient.IZkDataListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. /**
  5. * 监听测试类
  6. */
  7. public class ZookeeperSubscribeListener {
  8. public static void main(String[] args) {
  9. new ZookeeperSubscribeListener().subscribeDataChanges();
  10. }
  11. private void subscribeDataChanges() {
  12. ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
  13. if(!zkClient.exists("/test")) {
  14. zkClient.createPersistent("/test");
  15. }
  16. //注册监听事件
  17. zkClient.subscribeDataChanges("/test/Node", new IZkDataListener() {
  18. @Override
  19. public void handleDataDeleted(String dataPath) throws Exception {
  20. System.out.println("DataDeleted:"+dataPath);
  21. }
  22. @Override
  23. public void handleDataChange(String dataPath, Object data) throws Exception {
  24. System.out.println("DataChange:"+dataPath+",data:"+data);
  25. }
  26. });
  27. System.out.println("****************************************");
  28. zkClient.createPersistent("/test/Node");
  29. for (int i = 0; i< 5; i++) {
  30. sleep(100);
  31. zkClient.writeData("/test/Node", i);
  32. }
  33. zkClient.delete("/test/Node");
  34. sleep(2000);
  35. zkClient.unsubscribeAll();
  36. zkClient.close();
  37. }
  38. private static void sleep(int ms) {
  39. try {
  40. Thread.sleep(ms);
  41. } catch (InterruptedException e) {
  42. e.printStackTrace();
  43. }
  44. }
  45. }

运行结果

  1. ****************************************
  2. DataChange:/test/Node,data:null
  3. DataChange:/test/Node,data:0
  4. DataChange:/test/Node,data:1
  5. DataChange:/test/Node,data:2
  6. DataChange:/test/Node,data:3
  7. DataDeleted:/test/Node

分析

数据改变,触发监听器事件。

监听节点变化

示例代码

  1. package com.alibaba.dubbo.samples.zkclient;
  2. import org.I0Itec.zkclient.IZkChildListener;
  3. import org.I0Itec.zkclient.ZkClient;
  4. import java.util.List;
  5. /**
  6. * 监听节点变化测试类
  7. */
  8. public class ZookeeperSubscribeNodeListener {
  9. public static void main(String[] args) {
  10. new ZookeeperSubscribeNodeListener().subscribeDataChanges();
  11. }
  12. private void subscribeDataChanges() {
  13. ZkClient zkClient = ZkClientUtil.connectZkClient();//创建zookeeper的java客户端连接
  14. if (!zkClient.exists("/test")) {
  15. zkClient.createPersistent("/test");
  16. }
  17. zkClient.subscribeChildChanges("/test/Node", new IZkChildListener() {
  18. @Override
  19. public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
  20. String childs = "";
  21. if (currentChilds != null && currentChilds.size() > 0) {
  22. childs += "[";
  23. for (String s : currentChilds) {
  24. childs += s + ",";
  25. }
  26. childs += "]";
  27. }
  28. System.out.println("ChildChange:" + parentPath + ",childs:" + childs);
  29. }
  30. });
  31. zkClient.createPersistent("/test/Node");
  32. sleep(100);
  33. zkClient.createPersistent("/test/Node/n1");
  34. sleep(100);
  35. zkClient.createPersistent("/test/Node/n2");
  36. sleep(100);
  37. zkClient.createPersistent("/test/Node/n3");
  38. sleep(100);
  39. zkClient.delete("/test/Node/n1");
  40. sleep(100);
  41. zkClient.delete("/test/Node/n2");
  42. sleep(100);
  43. zkClient.delete("/test/Node/n3");
  44. sleep(3000);
  45. System.out.println("****");
  46. zkClient.deleteRecursive("/test/Node");
  47. sleep(3000);
  48. zkClient.close();
  49. }
  50. private static void sleep(int ms) {
  51. try {
  52. Thread.sleep(ms);
  53. } catch (InterruptedException e) {
  54. e.printStackTrace();
  55. }
  56. }
  57. }

运行结果

  1. ChildChange:/test/Node,childs:
  2. ChildChange:/test/Node,childs:[n1,]
  3. ChildChange:/test/Node,childs:[n1,n2,]
  4. ChildChange:/test/Node,childs:[n1,n2,n3,]
  5. ChildChange:/test/Node,childs:[n2,n3,]
  6. ChildChange:/test/Node,childs:[n3,]
  7. ChildChange:/test/Node,childs:
  8. ****
  9. ChildChange:/test/Node,childs:

分析

创建节点,监听该节点的监听器就会打印。

发表评论

表情:
评论列表 (有 0 条评论,105人围观)

还没有评论,来说两句吧...

相关阅读