需求
某分布式系统中,主节点可以有多台,可以动态上下线,任意一台客户端都能实时感知到主节点服务器的上下线。
思路
思路
1. 创建客户端与服务端
2.启动client端 监听
3. 启动server端 注册
4.当server端 发生上下线
5.client端都能感知到
实现步骤
1.在shell命令客户端创建 servers (方便连接)
create /servers "servers"
创建两个自定义类
自定义DistrbuteServer(服务器端代码块)
package com.zookeeper.servers;
import org.apache.xerces.dom.ParentNode;
import org.apache.zookeeper.*;
import java.io.IOException;
import java.util.Scanner;
/**
* @version v1.0
* @Author: huang*
* @Date: 2020/12/16*
*/
public class DistrbuteServer {
private static String connectString="bigdata01:2181,bigdata02:2181,bigdata03:2181";
private static int sessionTimeout=2000;
private ZooKeeper zk=null;
private String partentNode ="/servers";
//创建zk的客户端连接
public void getConnect() throws IOException {
zk = new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState() == Event.KeeperState.SyncConnected) {
System.out.println("servers端连接成功");
}
}
});
}
//注册服务器(创建一个临时序列节点)
//
public void registServer(String hostname) throws Exception{
String create = zk.create(partentNode + "/server", hostname.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
System.out.println(hostname+"is online"+ create);
}
//业务功能
public void business(String hostname) throws Exception{
System.out.println(hostname+"is working,,,,");
//保证线程不结束
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
System.out.println("请输入服务器的名字");
String hostname = new Scanner(System.in).next();
//1.获取zk连接
DistrbuteServer server = new DistrbuteServer();
server.getConnect();
//2.利用zk连接注册服务器信息
server.registServer(hostname);
//3.启动业务功能
server.business(hostname);
}
}
自定义 DistributeClient(客户端代码)
package com.zookeeper.servers;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.ipc.Client;
import org.apache.hadoop.mapreduce.tools.CLI;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import java.util.ArrayList;
import java.util.List;
/**
* @version v1.0
* @Author: huang*
* @Date: 2020/12/16*
*/
public class DistributeClient {
private static String connectString="bigdata01:2181,bigdata02:2181,bigdata03:2181";
private static int sessionTimeout=20000;
private ZooKeeper zk=null;
private String parentNode="/servers";
//创建zk客户端连接
public void getConnect()throws Exception{
zk=new ZooKeeper(connectString, sessionTimeout, new Watcher() {
@Override
public void process(WatchedEvent watchedEvent) {
if (watchedEvent.getState()==Event.KeeperState.SyncConnected) {
System.out.println("client 客户端连接成功!");
}
//再一次启动监听
try {
getServerList();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
//获取服务器列表信息
public void getServerList() throws KeeperException, InterruptedException {
//1.获取服务器节点信息,并且对父节点进行监听
List<String> children = zk.getChildren(parentNode, true);
//2.存储服务器信息列表
ArrayList<String> servers = new ArrayList<>();
//3.遍历所有节点,获取节点中的主机名称信息
for (String child : children) {
byte[] data = zk.getData(parentNode + "/" + child, false, null);
servers.add(new String(data));
//4.打印服务器列表信息
System.out.println(servers);
}
//业务功能
}
public void business() throws InterruptedException {
System.out.println("client is working,,,");
Thread.sleep(Long.MAX_VALUE);
}
public static void main(String[] args) throws Exception {
DistributeClient client
= new DistributeClient();
client.getConnect();
client.getServerList();
client.business();
}
}
测试
shell命令行创建节点