zookeeper大规模分布式集群中任一单点设备上线下线心跳感知系统(一)
分布式集群中,为了能全局感知任一单点设备的存活状态,经常有心跳感知系统的设计需求,要实现这样的心跳感知。
常规的做法无法就是保持一个Socket长连接或者http短连接,但是这样的实现手法往往扩展性极差,且问题非常多,维护成本很高。
而zookeeper恰恰就是这种分布式集群大规模设备心跳感知系统的最佳“框架”性解决方案。现在假设一个客户端,它需要实时监测分布式集群中的设备节点上线/下线/掉线情况,那么它可以(假设叫做 程序A):
import org.apache.zookeeper.*;
import org.apache.zookeeper.data.Stat;
import java.util.List;
import java.util.concurrent.TimeUnit;
public class Main {
private ZooKeeper zk;
private String DEVICE_PATH = "/devices";
public static void main(String[] args) {
Main main = new Main();
try {
main.test();
} catch (Exception e) {
e.printStackTrace();
}
}
private void test() throws Exception {
int t = 10;
zk = new ZooKeeper(getAddress(), t * 1000, new MyWatcher());
System.out.println("开始连接...");
while (!zk.getState().isConnected()) {
TimeUnit.SECONDS.sleep(1);
}
System.out.println("连接建立");
//创建/devices节点路径的工作可以手动。不必一定代码完成。
if (!checkNodeExist(DEVICE_PATH)) {
String path = zk.create(DEVICE_PATH, "server".getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println("创建:" + path);
}
synchronized (this) {
wait();
}
}
private class MyWatcher implements Watcher {
@Override
public void process(WatchedEvent event) {
if (!checkNodeExist(DEVICE_PATH)) {
return;
}
System.out.println("---------");
try {
List<String> children = zk.getChildren(DEVICE_PATH, true);
for (String c : children) {
String path = DEVICE_PATH + "/" + c;
byte[] data = zk.getData(path, false, null);
System.out.println("设备在线:" + path + " @ " + new String(data));
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private String getAddress() {
String ip = "127.0.0.1";
return ip + ":2181," + ip + ":2182," + ip + ":2183";
}
private boolean checkNodeExist(String path) {
boolean b = false;
try {
Stat stat = zk.exists(path, true);
b = stat == null ? false : true;
} catch (Exception e) {
e.printStackTrace();
}
return b;
}
}
注意,上面的程序A代码不要把它理解为服务器端,事实上程序A可以运行在分布式集群中的任何设备终端上,它实现的一个功能是捕获和感知任何设备的上线和下线情况。当有新设备接入程序A所在的集群中时候,就被MyWatcher捕获和感知到,同样,只要有设备断开、离开集群,也会被MyWatcher捕获和感知到。
下面再写一个程序B,程序B模拟一个新设备接入分布式集群:
import org.apache.zookeeper.*;
import java.util.concurrent.TimeUnit;
public class Main {
private ZooKeeper zooKeeper;
private String DEVICE_PATH = "/devices";
public static void main(String[] args) {
//初始化log4j,zookeeper否则报错。
//org.apache.log4j.BasicConfigurator.configure();
try {
Main app = new Main();
app.zk();
} catch (Exception e) {
e.printStackTrace();
}
}
private void zk() throws Exception {
zooKeeper = new ZooKeeper(getAddress(), 10 * 1000, new Watcher() {
@Override
public void process(WatchedEvent event) {
//System.out.println(event.toString());
}
});
System.out.println("开始连接...");
while (!zooKeeper.getState().isConnected()) {
TimeUnit.SECONDS.sleep(1);
}
System.out.println("连接建立");
login();
synchronized (this) {
wait();
}
}
private void login() throws Exception {
//创建节点。
String path = zooKeeper.create(DEVICE_PATH + "/client", "ip:127.0.0.1".getBytes("UTF-8"), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL);
byte[] data = zooKeeper.getData(path, null, null);
System.out.println(path + " => " + new String(data));
System.out.println("注册设备成功");
}
private String getAddress() {
String ip = "127.0.0.1";
return ip + ":2181," + ip + ":2182," + ip + ":2183";
}
}
运行程序B,相当于分布式集群中有新设备进入,那么程序B代表的设备就会被程序A捕获。若程序B离开、断开与集群的连接,那么同样可以被程序A感知。
实际上,把程序A的心跳、感知功能代码(其实就是程序A的MyWatcher)写入到程序B中,那么程序A,和程序B就可以互相感知,这样就做到在分布式环境中,设备心跳,上线/下线的互相感知系统。