本文共 6814 字,大约阅读时间需要 22 分钟。
Zookeeper 官方客户端没有与服务端代码分离,通常会在同一个 JAR 文件中。开发者只需引入正确版本的 Maven 依赖即可,确保与服务端版本一致以避免兼容性问题。以下是示例依赖配置:
org.apache.zookeeper zookeeper 3.5.8
在测试环境中,通常在初始化方法中创建 Zookeeper 实例。以下是一个常见的实现示例:
@Slf4jpublic class ZookeeperClientTest { private static final String ZK_ADDRESS = "192.168.109.200:2181"; private static final int SESSION_TIMEOUT = 5000; private static ZooKeeper zooKeeper; private static final String ZK_NODE = "/zk-node"; @Before public void init() throws IOException, InterruptedException { final CountDownLatch countDownLatch = new CountDownLatch(1); zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, new Watcher() { @Override public void process(Watcher.Event event) { if (event.getState() == Watcher.Event.KeeperState.SyncConnected && event.getType() == Watcher.Event.EventType.None) { countDownLatch.countDown(); log.info("连接成功!"); } } }); log.info("连接中……"); countDownLatch.await(); }}
创建节点的方式如下:
@Testpublic void createTest() throws KeeperException, InterruptedException { String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT); log.info("创建路径: {}", path);}
异步创建节点:
@Testpublic void createAsycTest() throws InterruptedException { zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT, (rc, path, ctx, name) -> log.info("rc {}, path {}, ctx {}, name {}", rc, path, ctx, name), TimeUnit.SECONDS.sleep(Integer.MAX_VALUE));}
修改节点数据:
@Testpublic void setTest() throws KeeperException, InterruptedException { Stat stat = new Stat(); byte[] data = zooKeeper.getData(ZK_NODE, false, stat); log.info("修改前: {}", new String(data)); zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion()); byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat); log.info("修改后: {}", new String(dataAfter));}
Curator 是 Netlix 开源的 Zookeeper 客户端框架,是目前最受欢迎的第三方客户端之一。它对 Zookeeper 版本支持最好,且功能强大,推荐使用。Curator 提供了对 Zookeeper 常用功能的封装,如 Leader 选举、分布式计数器、分布式锁等,减少了技术开发的复杂性。
在 Maven 工程中引入 Curator 相关依赖,并配置相关参数。以下是示例配置:
org.apache.curator curator-recipes 5.0.0 org.apache.zookeeper zookeeper org.apache.zookeeper zookeeper 3.5.8 junit junit 4.13 org.project.lombok lombok 1.18.12
Curator 提供了多种方式创建会话。以下是常见的实现方式:
// 传统方式RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);client.start();// 流式方式RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder() .connectString("192.168.128.129:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .namespace("base") .build();client.start();
创建节点:
@Testpublic void testCreate() throws Exception { String path = curatorFramework.create().forPath("/curator-node"); log.info("创建节点: {}", path);}
一次性创建带层级结构的节点:
@Testpublic void testCreateWithParent() throws Exception { String pathWithParent = "/node-parent/sub-node-1"; String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent); log.info("创建节点: {}", path);}
获取数据:
@Testpublic void testGetData() throws Exception { byte[] bytes = curatorFramework.getData().forPath("/curator-node"); log.info("获取数据: {}", new String(bytes));}
更新节点:
@Testpublic void testSetData() throws Exception { curatorFramework.setData().forPath("/curator-node", "changed!".getBytes()); byte[] bytes = curatorFramework.getData().forPath("/curator-node"); log.info("获取修改后的数据: {}", new String(bytes));}
删除节点:
@Testpublic void testDelete() throws Exception { String pathWithParent = "/node-parent"; curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}
Curator 提供了异步接口,通过 BackgroundCallback
处理结果。以下是一个示例:
@Testpublic void testBackground() throws Exception { curatorFramework.getData().inBackground((item1, item2) -> { log.info("背景线程: {}", item2); }).forPath(ZK_NODE); TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);}
Curator 提供了 CuratorListener
接口,用于监听事件。以下是一个示例:
public interface CuratorListener { void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;}
Curator 提供了多种缓存类型,如 NodeCache
、PathChildrenCache
和 TreeCache
,用于监听 Zookeeper 节点数据变化。
Zookeeper 集群分为以下角色:
搭建伪集群时,仅需一台机器启动多个 Zookeeper 实例即可。以下是步骤说明:
验证 JDK 版本:
java -version
下载并解压 Zookeeper:
wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gztar -zxvf apache-zookeeper-3.5.8-bin.tar.gzcd apache-zookeeper-3.5.8-bin
重命名配置文件:
cp conf/zoo_sample.cfg conf/zoo-1.cfg
修改配置文件:
vim conf/zoo-1.cfg
添加以下配置:```bashdataDir=/usr/local/data/zookeeper-1clientPort=2181server.1=127.0.0.1:2001:3001:participantserver.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer
创建文件夹并设置 server ID:
mkdir -p /usr/local/data/zookeeper-1cd /usr/local/data/zookeeper-1echo 1 > myidcd /usr/local/data/zookeeper-2echo 2 > myidcd /usr/local/data/zookeeper-3echo 3 > myidcd /usr/local/data/zookeeper-4echo 4 > myid
启动 Zookeeper 实例:
bin/zkServer.sh start conf/zoo1.cfgbin/zkServer.sh start conf/zoo2.cfgbin/zkServer.sh start conf/zoo3.cfg
验证集群状态:
bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3
Zookeeper 3.5.3 及以上版本支持动态配置,需启用超级管理员身份验证模式 ACLs。以下是动态配置示例:
启用超级管理员模式:
echo -n gj:123 | openssl dgst -binary -sha1 | openssl base64
配置动态文件:
vim conf/zoo_replicated1.cfg.dynamic
添加以下内容:
server.1=192.168.109.200:2001:3001:participant;192.168.109.200:2181server.2=192.168.109.200:2002:3002:participant;192.168.109.200:2182server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183
启动服务并验证:
./bin/zkServer.sh start conf/zoo1.cfg./bin/zkServer.sh status conf/zoo1.cfg
动态更新集群状态:
./bin/zkCli.sh -addauth digest gj:123./bin/zkCli.sh -reconfig -remove 3./bin/zkCli.sh -reconfig -add server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183
通过以上方法,可以实现 Zookeeper 集群的动态扩容和缩容,提升系统的可用性和灵活性。
转载地址:http://preg.baihongyu.com/