Zookeeper节点操作

╰+哭是因爲堅強的太久メ 2023-10-12 12:33 157阅读 0赞

f83dfacc72056fa6f41275f4e6064385.png

ZooKeeper的节点操作

ZooKeeper的节点类型

ZooKeeper其实也是一个分布式集群,其中维护了一个目录树结构,在这个目录树中,组成的部分是一个个的节点。ZooKeeper的节点可以大致分为两种类型: 短暂类型持久类型

  • 短暂类型ephemeral: 客户端和服务器断开后,创建的节点自己删除。
  • 持久类型persistent: 客户端和服务器断开后,创建的节点不删除(默认情况)。

























节点类型 描述信息
EPHEMERAL 临时节点,在会话结束后自动被删除。
EPHEMERAL_SEQUENTIAL 临时顺序节点,在会话结束后会自动被删除。会在给定的path节点名称后添加一个序列号。
PERSISTENT 永久节点,在会话结束后不会被自动删除。
PERSISTENT_SEQUENTIAL 永久顺序节点,在会话结束后不会被自动删除。会在给定的path节点名称后添加一个序列号。

ZooKeeper的Shell操作

打开Shell客户端

  1. 连接到当前节点的Server服务

    1. [root@qianfeng01 ~]# zkCli.sh
    2. 复制代码
  2. 连接到其他节点的Server服务

    1. [root@qianfeng01 ~]# zkCli.sh -server qianfeng02:2181
    2. 复制代码

Shell操作

ls

  1. 作用: 查看某个节点下的子节点
  2. 选项:
  3. -s 查看具体信息,包括timeversion等信息
  4. 注意事项: 需要使用绝对路径查看
  5. 示例:
  6. ls /
  7. ls /zookeeper
  8. ls -s /zookeeper/config
  9. 复制代码

create

  1. 作用: 创建一个节点,可以设置节点的初始内容
  2. 选项:
  3. -e: 设置短暂类型节点
  4. -s: 设置顺序节点
  5. 示例:
  6. create /test
  7. create /test2 "content message"
  8. create -e /test3 "content message"
  9. create -e -s /test "content message"
  10. 复制代码

get

  1. 作用: 获取节点存储的值
  2. 选项:
  3. -s: 同时获取版本描述信息,例如: timeversion
  4. 示例:
  5. get /zookeeper/config
  6. get -s /zookeeper/config
  7. 复制代码

hehe 节点数据信息
cZxid = 0x800000002 节点创建时的zxid
ctime = Thu May 09 03:41:15 CST 2019 节点创建的时间
mZxid = 0x800000002 对应节点最近一次修改的时间,与子节点无关
mtime = Thu May 09 03:41:15 CST 2019 节点最近一次更新的时间
pZxid = 0x800000002 对应节点与子节点(或者子节点)的修改的时间,与孙子节点无关
cversion = 0 子节点数据更新次数
dataVersion = 0 本节点数据更新次数
aclVersion = 0 节点授权信息(ACL)的更新次数
ephemeralOwner = 0x0 如果该节点为临时节点,ephemeralOwner值表示与该节点绑定的session id. 如果该节点不是临时节点,ephemeralOwner值为0
dataLength = 4 节点的数据长度
numChildren = 0 子节点的个数

set

  1. 作用: 设置节点存储的值
  2. 示例:
  3. set /test "content message"
  4. 复制代码

delete

  1. 作用: 删除节点,只能删除空节点,即没有子节点的节点
  2. 示例:
  3. delete /test
  4. 复制代码

deleteAll

  1. 作用: 删除节点,可以递归删除所有的子节点
  2. 示例:
  3. deleteAll /test
  4. 复制代码

addWatch

  1. 作用: 监听节点,当这个节点发生变化(内容、创建、删除)会得到通知
  2. 示例:
  3. addWatch /test
  4. 复制代码

removewatches

  1. 作用: 移除对节点的监听
  2. 示例:
  3. removewatches /test
  4. 复制代码

quit

  1. 作用: 退出客户端
  2. 复制代码

IDEA操作ZooKeeper的API操作

pom依赖

  1. <dependency>
  2. <groupId>junit</groupId>
  3. <artifactId>junit</artifactId>
  4. <version>4.11</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.apache.zookeeper</groupId>
  8. <artifactId>zookeeper</artifactId>
  9. <version>3.6.3</version>
  10. </dependency>
  11. 复制代码

初始化ZooKeeper客户端对象

  1. package com.qianfeng.zk;
  2. import org.apache.zookeeper.*;
  3. import org.apache.zookeeper.data.Stat;
  4. import org.junit.Before;
  5. import org.junit.Test;
  6. import java.io.IOException;
  7. import java.util.List;
  8. /**
  9. * ZooKeeper的API操作
  10. *
  11. * @author 千锋大数据教研院
  12. */
  13. public class ZkAPI {
  14. // Zookeeper客户端对象
  15. private ZooKeeper zkCli;
  16. @Before
  17. public void init() throws IOException, InterruptedException {
  18. // 连接到ZooKeeper的Server端
  19. String connectString = "qianfeng01:2181,qianfeng02:2181,qianfeng03:2181";
  20. // 连接超时时间
  21. int sessionTimeout = 5000;
  22. // 初始化一个ZooKeeper客户端实例,需要参数: 服务端、连接超时时间、观察者做回调
  23. zkCli = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
  24. @Override
  25. public void process(WatchedEvent watchedEvent) {
  26. // 暂不做任何处理
  27. }
  28. });
  29. }
  30. }
  31. 复制代码

创建节点(同步)

注意: 以下的API操作,需要使用到zkCli对象。在上述的初始化部分已经完成了对zkCli对象的初始化。后面的所有操作,只需要将方法粘贴到ZkAPI类中即可。需要导入的包,也在上方的初始化部分导入完成了。

  1. // ACL权限类型:
  2. // OPEN_ACL_UNSAFE: 完全开放的ACL,任何连接的客户端都可以操作该节点
  3. // CREATOR_ALL_ACL: 只有创建者才有ACL权限
  4. // READ_ACL_UNSAFE: 只能读取ACL
  5. //
  6. // CreateMode:
  7. // EPHEMERAL: 临时型
  8. // EPHEMERAL_SEQUENTIAL: 临时顺序型
  9. // PERSISTENT: 永久型
  10. // PERSISTENT_SEQUENTIAL: 永久顺序型
  11. //
  12. // 同步创建节点,遇到不正常的情况直接抛出异常。
  13. @Test
  14. public void createNode() throws InterruptedException, KeeperException {
  15. zkCli.create("/ApiNode1", "ApiContent1".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
  16. }
  17. 复制代码

文末扫码有福利!

创建节点(异步)

  1. @Test
  2. public void createNodeAsync() throws InterruptedException {
  3. zkCli.create("/ApiNode2", "ApiContent2".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, new CreateNodeCallBack(), "");
  4. // 因为是异步操作,如果不等待一下,操作直接就结束了,来不及等到回调事件的触发
  5. Thread.sleep(Integer.MAX_VALUE);
  6. }
  7. private static class CreateNodeCallBack implements AsyncCallback.StringCallback {
  8. /**
  9. * 异步创建节点回调方法
  10. * @param i 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
  11. * @param s 创建的节点路径
  12. * @param o 创建节点时传递的ctx
  13. * @param s1 在ZK上实际创建的节点名(针对顺序节点)
  14. */
  15. @Override
  16. public void processResult(int i, String s, Object o, String s1) {
  17. System.out.println("i = " + i + ", s = " + s + ", o = " + o + ", s1 = " + s1);
  18. switch (i) {
  19. case 0:
  20. System.out.println("节点创建成功");
  21. break;
  22. case -4:
  23. System.out.println("客户端与服务端连接已断开");
  24. break;
  25. case -110:
  26. System.out.println("指定节点已存在");
  27. break;
  28. case -112:
  29. System.out.println("会话已过期");
  30. break;
  31. }
  32. }
  33. }
  34. 复制代码

删除节点(同步)

  1. // 同步删除,遇到无法正常删除的情况,直接异常
  2. @Test
  3. public void deleteNode() throws Exception {
  4. zkCli.delete("/ApiNode1", -1);
  5. }
  6. 复制代码

删除节点(异步)

  1. @Test
  2. public void deleteNodeAsync() throws Exception {
  3. zkCli.delete("/ApiNode1", -1, new DeleteNodeCallBack(), "");
  4. Thread.sleep(Integer.MAX_VALUE);
  5. }
  6. /**
  7. * 删除节点回调
  8. */
  9. private static class DeleteNodeCallBack implements AsyncCallback.VoidCallback {
  10. /**
  11. * 异步删除节点回调方法
  12. * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
  13. * @param path 删除的节点路径
  14. * @param ctx 删除节点时传递的ctx
  15. */
  16. @Override
  17. public void processResult(int rc, String path, Object ctx) {
  18. System.out.println("删除结果:rc=" + rc + ",path=" + path + ",ctx=" + ctx);
  19. switch (rc) {
  20. case 0:
  21. System.out.println("节点删除成功");
  22. break;
  23. case -4:
  24. System.out.println("客户端与服务端连接已断开");
  25. break;
  26. case -112:
  27. System.out.println("会话已过期");
  28. break;
  29. default:
  30. System.out.println("服务端响应码" + rc + "未知");
  31. break;
  32. }
  33. }
  34. }
  35. 复制代码

修改节点内容(同步)

  1. @Test
  2. public void setNode() throws InterruptedException, KeeperException {
  3. zkCli.setData("/ApiNode1", "hello".getBytes(), -1);
  4. }
  5. 复制代码

修改节点内容(异步)

  1. @Test
  2. public void setNodeAsync() throws Exception {
  3. zkCli.setData("/ApiNode2", "hello".getBytes(), -1, new SetNodeCallBack(), "");
  4. Thread.sleep(Integer.MAX_VALUE);
  5. }
  6. private static class SetNodeCallBack implements AsyncCallback.StatCallback {
  7. /**
  8. * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
  9. * @param path 修改的节点路径
  10. * @param ctx 修改节点时传递的ctx
  11. * @param stat 节点状态,由服务器端响应的新stat替换
  12. */
  13. @Override
  14. public void processResult(int rc, String path, Object ctx, Stat stat) {
  15. switch (rc) {
  16. case 0:
  17. System.out.println("节点数据设置成功");
  18. break;
  19. case -4:
  20. System.out.println("客户端与服务端连接已断开");
  21. break;
  22. case -112:
  23. System.out.println("会话已过期");
  24. break;
  25. default:
  26. System.out.println("服务端响应码" + rc + "未知");
  27. break;
  28. }
  29. }
  30. }
  31. 复制代码

获取节点内容(同步)

  1. @Test
  2. public void getNode() throws InterruptedException, KeeperException {
  3. // 实例化对象,用于记录节点的状态信息
  4. Stat stat = new Stat();
  5. // 获取数据
  6. byte[] data = zkCli.getData("/ApiNode1", true, stat);
  7. // 打印数据
  8. System.out.println(new String(data));
  9. // 打印节点状态信息
  10. System.out.println(stat);
  11. System.out.println(stat.getVersion());
  12. System.out.println(stat.getCtime());
  13. }
  14. 复制代码

获取节点内容(异步)

  1. @Test
  2. public void getNodeAsync() throws InterruptedException {
  3. zkCli.getData("/ApiNode1", true, new GetNodeCallBack(), "");
  4. Thread.sleep(Integer.MAX_VALUE);
  5. }
  6. private static class GetNodeCallBack implements AsyncCallback.DataCallback {
  7. /**
  8. * 获取数据的回调
  9. * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
  10. * @param path 获取数据的节点路径
  11. * @param ctx 获取数据时传递的ctx
  12. * @param data 获取到的节点数据
  13. * @param stat 获取到的节点状态
  14. */
  15. @Override
  16. public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) {
  17. switch (rc) {
  18. case 0:
  19. System.out.println("节点数据获取成功: " + new String(data));
  20. break;
  21. case -4:
  22. System.out.println("客户端与服务端连接已断开");
  23. break;
  24. case -112:
  25. System.out.println("会话已过期");
  26. break;
  27. default:
  28. System.out.println("服务端响应码" + rc + "未知");
  29. break;
  30. }
  31. }
  32. }
  33. 复制代码

获取所有子节点(同步)

  1. @Test
  2. public void getChildren() throws InterruptedException, KeeperException {
  3. List<String> children = zkCli.getChildren("/", true);
  4. System.out.println(children);
  5. }
  6. 复制代码

获取所有子节点(异步)

  1. @Test
  2. public void getChildrenAsync() throws InterruptedException {
  3. zkCli.getChildren("/", true, new GetChildrenCallBack(), "");
  4. Thread.sleep(Integer.MAX_VALUE);
  5. }
  6. private static class GetChildrenCallBack implements AsyncCallback.ChildrenCallback {
  7. /**
  8. * 获取到所有的子节点的回调
  9. * @param rc 服务端响应代码: 0 => 成功,-4 => 客户端与服务端连接断开,-110 => 指定节点已存在,-112 => 会话已过期
  10. * @param path 获取子节点的节点路径
  11. * @param ctx 调用方法传递的ctx
  12. * @param children 所有的子节点
  13. */
  14. @Override
  15. public void processResult(int rc, String path, Object ctx, List<String> children) {
  16. switch (rc) {
  17. case 0:
  18. System.out.println("子节点获取成功: " + children);
  19. break;
  20. case -4:
  21. System.out.println("客户端与服务端连接已断开");
  22. break;
  23. case -112:
  24. System.out.println("会话已过期");
  25. break;
  26. default:
  27. System.out.println("服务端响应码" + rc + "未知");
  28. break;
  29. }
  30. }
  31. }

更多大数据精彩内容欢迎B站搜索“千锋教育”或者扫码领取大数据学习全套资料!

发表评论

表情:
评论列表 (有 0 条评论,157人围观)

还没有评论,来说两句吧...

相关阅读

    相关 ZooKeeper 节点类型

    ZooKeeper 节点是有生命周期的,这取决于节点的类型。在 ZooKeeper 中,节点类型可以分为持久节点(PERSISTENT )、临时节点(EPHEMERAL),以及