博客
关于我
二:Zookeeper客户端使用与集群特性
阅读量:378 次
发布时间:2019-03-05

本文共 6919 字,大约阅读时间需要 23 分钟。

Zookeeper Java 客户端与集群动态配置

Zookeeper Java 客户端

项目构建

Zookeeper 官方客户端没有与服务端代码分离,通常会在同一个 JAR 文件中。开发者只需引入正确版本的 Maven 依赖即可,确保与服务端版本一致以避免兼容性问题。以下是示例依赖配置:

org.apache.zookeeper
zookeeper
3.5.8

客户端实例创建

在测试环境中,通常在初始化方法中创建 Zookeeper 实例。以下是一个常见的实现示例:

@Slf4j
public class ZookeeperClientTest {
private static final String ZK_ADDRESS = "192.168.109.200:2181";
private static final int SESSION_TIMEOUT = 5000;
private static ZooKeeper zooKeeper;
private static final String ZK_NODE = "/zk-node";
@Before
public void init() throws IOException, InterruptedException {
final CountDownLatch countDownLatch = new CountDownLatch(1);
zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, new Watcher() {
@Override
public void process(Watcher.Event event) {
if (event.getState() == Watcher.Event.KeeperState.SyncConnected &&
event.getType() == Watcher.Event.EventType.None) {
countDownLatch.countDown();
log.info("连接成功!");
}
}
});
log.info("连接中……");
countDownLatch.await();
}
}

创建节点

创建节点的方式如下:

@Test
public void createTest() throws KeeperException, InterruptedException {
String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
log.info("创建路径: {}", path);
}

异步创建节点:

@Test
public void createAsycTest() throws InterruptedException {
zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT,
(rc, path, ctx, name) -> log.info("rc {}, path {}, ctx {}, name {}", rc, path, ctx, name),
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE));
}

节点数据操作

修改节点数据:

@Test
public void setTest() throws KeeperException, InterruptedException {
Stat stat = new Stat();
byte[] data = zooKeeper.getData(ZK_NODE, false, stat);
log.info("修改前: {}", new String(data));
zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());
byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);
log.info("修改后: {}", new String(dataAfter));
}

Apache Curator 开源客户端

Curator 介绍

Curator 是 Netlix 开源的 Zookeeper 客户端框架,是目前最受欢迎的第三方客户端之一。它对 Zookeeper 版本支持最好,且功能强大,推荐使用。Curator 提供了对 Zookeeper 常用功能的封装,如 Leader 选举、分布式计数器、分布式锁等,减少了技术开发的复杂性。

Curator 实战

引入依赖

在 Maven 工程中引入 Curator 相关依赖,并配置相关参数。以下是示例配置:

org.apache.curator
curator-recipes
5.0.0
org.apache.zookeeper
zookeeper
org.apache.zookeeper
zookeeper
3.5.8
junit
junit
4.13
org.project.lombok
lombok
1.18.12

Curator 会话创建

Curator 提供了多种方式创建会话。以下是常见的实现方式:

// 传统方式
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);
client.start();
// 流式方式
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("192.168.128.129:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base")
.build();
client.start();

节点创建与数据操作

创建节点:

@Test
public void testCreate() throws Exception {
String path = curatorFramework.create().forPath("/curator-node");
log.info("创建节点: {}", path);
}

一次性创建带层级结构的节点:

@Test
public void testCreateWithParent() throws Exception {
String pathWithParent = "/node-parent/sub-node-1";
String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);
log.info("创建节点: {}", path);
}

获取数据:

@Test
public void testGetData() throws Exception {
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("获取数据: {}", new String(bytes));
}

更新节点:

@Test
public void testSetData() throws Exception {
curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());
byte[] bytes = curatorFramework.getData().forPath("/curator-node");
log.info("获取修改后的数据: {}", new String(bytes));
}

删除节点:

@Test
public void testDelete() throws Exception {
String pathWithParent = "/node-parent";
curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);
}

Curator 异步接口

Curator 提供了异步接口,通过 BackgroundCallback 处理结果。以下是一个示例:

@Test
public void testBackground() throws Exception {
curatorFramework.getData().inBackground((item1, item2) -> {
log.info("背景线程: {}", item2);
}).forPath(ZK_NODE);
TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);
}

Curator 监听器

Curator 提供了 CuratorListener 接口,用于监听事件。以下是一个示例:

public interface CuratorListener {
void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;
}

Curator Caches

Curator 提供了多种缓存类型,如 NodeCachePathChildrenCacheTreeCache,用于监听 Zookeeper 节点数据变化。

Zookeeper 集群与动态扩容

集群模式

Zookeeper 集群分为以下角色:

  • Leader: 处理所有写请求,集群中唯一一个 Leader。
  • Follower: 只处理读请求,作为 Leader 的候选节点。
  • Observer: 只处理读请求,不参与选举。

集群安装

搭建伪集群时,仅需一台机器启动多个 Zookeeper 实例即可。以下是步骤说明:

  • 验证 JDK 版本

    java -version
  • 下载并解压 Zookeeper

    wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gz
    tar -zxvf apache-zookeeper-3.5.8-bin.tar.gz
    cd apache-zookeeper-3.5.8-bin
  • 重命名配置文件

    cp conf/zoo_sample.cfg conf/zoo-1.cfg
  • 修改配置文件

     
  • vim conf/zoo-1.cfg

    添加以下配置:
    ```bash
    dataDir=/usr/local/data/zookeeper-1
    clientPort=2181
    server.1=127.0.0.1:2001:3001:participant
    server.2=127.0.0.1:2002:3002:participant
    server.3=127.0.0.1:2003:3003:participant
    server.4=127.0.0.1:2004:3004:observer
    1. 创建文件夹并设置 server ID

      mkdir -p /usr/local/data/zookeeper-1
      cd /usr/local/data/zookeeper-1
      echo 1 > myid
      cd /usr/local/data/zookeeper-2
      echo 2 > myid
      cd /usr/local/data/zookeeper-3
      echo 3 > myid
      cd /usr/local/data/zookeeper-4
      echo 4 > myid
    2. 启动 Zookeeper 实例

      bin/zkServer.sh start conf/zoo1.cfg
      bin/zkServer.sh start conf/zoo2.cfg
      bin/zkServer.sh start conf/zoo3.cfg
    3. 验证集群状态

      bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3
    4. 动态配置

      Zookeeper 3.5.3 及以上版本支持动态配置,需启用超级管理员身份验证模式 ACLs。以下是动态配置示例:

    5. 启用超级管理员模式

      echo -n gj:123 | openssl dgst -binary -sha1 | openssl base64
    6. 配置动态文件

      vim conf/zoo_replicated1.cfg.dynamic

      添加以下内容:

      server.1=192.168.109.200:2001:3001:participant;192.168.109.200:2181
      server.2=192.168.109.200:2002:3002:participant;192.168.109.200:2182
      server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183
    7. 启动服务并验证

      ./bin/zkServer.sh start conf/zoo1.cfg
      ./bin/zkServer.sh status conf/zoo1.cfg
    8. 动态更新集群状态

      ./bin/zkCli.sh -addauth digest gj:123
      ./bin/zkCli.sh -reconfig -remove 3
      ./bin/zkCli.sh -reconfig -add server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183
    9. 通过以上方法,可以实现 Zookeeper 集群的动态扩容和缩容,提升系统的可用性和灵活性。

    转载地址:http://preg.baihongyu.com/

    你可能感兴趣的文章
    Objective-C实现articulation-points(关键点)(割点)算法(附完整源码)
    查看>>
    Objective-C实现atoi函数功能(附完整源码)
    查看>>
    Objective-C实现average absolute deviation平均绝对偏差算法(附完整源码)
    查看>>
    Objective-C实现average mean平均数算法(附完整源码)
    查看>>
    Objective-C实现average median平均中位数算法(附完整源码)
    查看>>
    Objective-C实现average mode平均模式算法(附完整源码)
    查看>>
    Objective-C实现avl 树算法(附完整源码)
    查看>>
    Objective-C实现AvlTree树算法(附完整源码)
    查看>>
    Objective-C实现backtracking Jump Game回溯跳跃游戏算法(附完整源码)
    查看>>
    Objective-C实现BACKTRACKING 方法查找集合的幂集算法(附完整源码)
    查看>>
    Objective-C实现bailey borwein plouffe算法(附完整源码)
    查看>>
    Objective-C实现balanced parentheses平衡括号表达式算法(附完整源码)
    查看>>