博客
关于我
二:Zookeeper客户端使用与集群特性
阅读量:369 次
发布时间:2019-03-05

本文共 6814 字,大约阅读时间需要 22 分钟。

Zookeeper Java 客户端与集群动态配置

Zookeeper Java 客户端

项目构建

Zookeeper 官方客户端没有与服务端代码分离,通常会在同一个 JAR 文件中。开发者只需引入正确版本的 Maven 依赖即可,确保与服务端版本一致以避免兼容性问题。以下是示例依赖配置:

org.apache.zookeeper
zookeeper
3.5.8

客户端实例创建

在测试环境中,通常在初始化方法中创建 Zookeeper 实例。以下是一个常见的实现示例:

@Slf4jpublic class ZookeeperClientTest {    private static final String ZK_ADDRESS = "192.168.109.200:2181";    private static final int SESSION_TIMEOUT = 5000;    private static ZooKeeper zooKeeper;    private static final String ZK_NODE = "/zk-node";    @Before    public void init() throws IOException, InterruptedException {        final CountDownLatch countDownLatch = new CountDownLatch(1);        zooKeeper = new ZooKeeper(ZK_ADDRESS, SESSION_TIMEOUT, new Watcher() {            @Override            public void process(Watcher.Event event) {                if (event.getState() == Watcher.Event.KeeperState.SyncConnected &&                        event.getType() == Watcher.Event.EventType.None) {                    countDownLatch.countDown();                    log.info("连接成功!");                }            }        });        log.info("连接中……");        countDownLatch.await();    }}

创建节点

创建节点的方式如下:

@Testpublic void createTest() throws KeeperException, InterruptedException {    String path = zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);    log.info("创建路径: {}", path);}

异步创建节点:

@Testpublic void createAsycTest() throws InterruptedException {    zooKeeper.create(ZK_NODE, "data".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE,             CreateMode.PERSISTENT,             (rc, path, ctx, name) -> log.info("rc {}, path {}, ctx {}, name {}", rc, path, ctx, name),            TimeUnit.SECONDS.sleep(Integer.MAX_VALUE));}

节点数据操作

修改节点数据:

@Testpublic void setTest() throws KeeperException, InterruptedException {    Stat stat = new Stat();    byte[] data = zooKeeper.getData(ZK_NODE, false, stat);    log.info("修改前: {}", new String(data));    zooKeeper.setData(ZK_NODE, "changed!".getBytes(), stat.getVersion());    byte[] dataAfter = zooKeeper.getData(ZK_NODE, false, stat);    log.info("修改后: {}", new String(dataAfter));}

Apache Curator 开源客户端

Curator 介绍

Curator 是 Netlix 开源的 Zookeeper 客户端框架,是目前最受欢迎的第三方客户端之一。它对 Zookeeper 版本支持最好,且功能强大,推荐使用。Curator 提供了对 Zookeeper 常用功能的封装,如 Leader 选举、分布式计数器、分布式锁等,减少了技术开发的复杂性。

Curator 实战

引入依赖

在 Maven 工程中引入 Curator 相关依赖,并配置相关参数。以下是示例配置:

org.apache.curator
curator-recipes
5.0.0
org.apache.zookeeper
zookeeper
org.apache.zookeeper
zookeeper
3.5.8
junit
junit
4.13
org.project.lombok
lombok
1.18.12

Curator 会话创建

Curator 提供了多种方式创建会话。以下是常见的实现方式:

// 传统方式RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.newClient(zookeeperConnectionString, retryPolicy);client.start();// 流式方式RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);CuratorFramework client = CuratorFrameworkFactory.builder()    .connectString("192.168.128.129:2181")    .sessionTimeoutMs(5000)    .connectionTimeoutMs(5000)    .retryPolicy(retryPolicy)    .namespace("base")    .build();client.start();

节点创建与数据操作

创建节点:

@Testpublic void testCreate() throws Exception {    String path = curatorFramework.create().forPath("/curator-node");    log.info("创建节点: {}", path);}

一次性创建带层级结构的节点:

@Testpublic void testCreateWithParent() throws Exception {    String pathWithParent = "/node-parent/sub-node-1";    String path = curatorFramework.create().creatingParentsIfNeeded().forPath(pathWithParent);    log.info("创建节点: {}", path);}

获取数据:

@Testpublic void testGetData() throws Exception {    byte[] bytes = curatorFramework.getData().forPath("/curator-node");    log.info("获取数据: {}", new String(bytes));}

更新节点:

@Testpublic void testSetData() throws Exception {    curatorFramework.setData().forPath("/curator-node", "changed!".getBytes());    byte[] bytes = curatorFramework.getData().forPath("/curator-node");    log.info("获取修改后的数据: {}", new String(bytes));}

删除节点:

@Testpublic void testDelete() throws Exception {    String pathWithParent = "/node-parent";    curatorFramework.delete().guaranteed().deletingChildrenIfNeeded().forPath(pathWithParent);}

Curator 异步接口

Curator 提供了异步接口,通过 BackgroundCallback 处理结果。以下是一个示例:

@Testpublic void testBackground() throws Exception {    curatorFramework.getData().inBackground((item1, item2) -> {        log.info("背景线程: {}", item2);    }).forPath(ZK_NODE);    TimeUnit.SECONDS.sleep(Integer.MAX_VALUE);}

Curator 监听器

Curator 提供了 CuratorListener 接口,用于监听事件。以下是一个示例:

public interface CuratorListener {    void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception;}

Curator Caches

Curator 提供了多种缓存类型,如 NodeCachePathChildrenCacheTreeCache,用于监听 Zookeeper 节点数据变化。

Zookeeper 集群与动态扩容

集群模式

Zookeeper 集群分为以下角色:

  • Leader: 处理所有写请求,集群中唯一一个 Leader。
  • Follower: 只处理读请求,作为 Leader 的候选节点。
  • Observer: 只处理读请求,不参与选举。

集群安装

搭建伪集群时,仅需一台机器启动多个 Zookeeper 实例即可。以下是步骤说明:

  • 验证 JDK 版本

    java -version
  • 下载并解压 Zookeeper

    wget https://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.8/apache-zookeeper-3.5.8-bin.tar.gztar -zxvf apache-zookeeper-3.5.8-bin.tar.gzcd apache-zookeeper-3.5.8-bin
  • 重命名配置文件

    cp conf/zoo_sample.cfg conf/zoo-1.cfg
  • 修改配置文件

  • vim conf/zoo-1.cfg

    添加以下配置:```bashdataDir=/usr/local/data/zookeeper-1clientPort=2181server.1=127.0.0.1:2001:3001:participantserver.2=127.0.0.1:2002:3002:participantserver.3=127.0.0.1:2003:3003:participantserver.4=127.0.0.1:2004:3004:observer
    1. 创建文件夹并设置 server ID

      mkdir -p /usr/local/data/zookeeper-1cd /usr/local/data/zookeeper-1echo 1 > myidcd /usr/local/data/zookeeper-2echo 2 > myidcd /usr/local/data/zookeeper-3echo 3 > myidcd /usr/local/data/zookeeper-4echo 4 > myid
    2. 启动 Zookeeper 实例

      bin/zkServer.sh start conf/zoo1.cfgbin/zkServer.sh start conf/zoo2.cfgbin/zkServer.sh start conf/zoo3.cfg
    3. 验证集群状态

      bin/zkCli.sh -server ip1:port1,ip2:port2,ip3:port3
    4. 动态配置

      Zookeeper 3.5.3 及以上版本支持动态配置,需启用超级管理员身份验证模式 ACLs。以下是动态配置示例:

    5. 启用超级管理员模式

      echo -n gj:123 | openssl dgst -binary -sha1 | openssl base64
    6. 配置动态文件

      vim conf/zoo_replicated1.cfg.dynamic

      添加以下内容:

      server.1=192.168.109.200:2001:3001:participant;192.168.109.200:2181server.2=192.168.109.200:2002:3002:participant;192.168.109.200:2182server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183
    7. 启动服务并验证

      ./bin/zkServer.sh start conf/zoo1.cfg./bin/zkServer.sh status conf/zoo1.cfg
    8. 动态更新集群状态

      ./bin/zkCli.sh -addauth digest gj:123./bin/zkCli.sh -reconfig -remove 3./bin/zkCli.sh -reconfig -add server.3=192.168.109.200:2003:3003:participant;192.168.109.200:2183
    9. 通过以上方法,可以实现 Zookeeper 集群的动态扩容和缩容,提升系统的可用性和灵活性。

    转载地址:http://preg.baihongyu.com/

    你可能感兴趣的文章
    (四十四)c#Winform自定义控件-水波-HZHControls
    查看>>
    c#winform主题实现的一个方法
    查看>>
    asp.net打印网页后自动关闭网页【无需插件】
    查看>>
    一个人开发的html整站源码分享网站就这么上线了
    查看>>
    SQLServer 查看耗时较多的SQL语句(转)
    查看>>
    【计算机网络】应用层
    查看>>
    【Markdown】公式指导手册
    查看>>
    【Maven】POM基本概念
    查看>>
    【Java思考】Java 中的实参与形参之间的传递到底是值传递还是引用传递呢?
    查看>>
    【设计模式】单例模式
    查看>>
    【SpringCloud】Hystrix熔断器
    查看>>
    【SpringCloud】Gateway新一代网关
    查看>>
    【Linux】2.3 Linux目录结构
    查看>>
    java.util.Optional学习笔记
    查看>>
    远程触发Jenkins的Pipeline任务的并发问题处理
    查看>>
    CoProcessFunction实战三部曲之二:状态处理
    查看>>
    jackson学习之七:常用Field注解
    查看>>
    jackson学习之八:常用方法注解
    查看>>
    Web应用程序并发问题处理的一点小经验
    查看>>
    asp.net core的授权过滤器中获取action上的Attribute
    查看>>