zookeeper分布式集群中基于Curator实现单点设备上线下线心跳感知系统(二)
假设该设备A处于分布式集群中,且需要实时感知分布式集群中其他设备的上线、下线,存活状态:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.*;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
public class Main {
private String DEVICE_PATH = "/devices";
public static void main(String[] args) {
//初始化log4j,zookeeper否则报错。
//org.apache.log4j.BasicConfigurator.configure();
try {
Main m = new Main();
} catch (Exception e) {
e.printStackTrace();
}
}
private String getAddress() {
String ip = "127.0.0.1";
return ip + ":2181," + ip + ":2182," + ip + ":2183";
}
public Main() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(getAddress())
.sessionTimeoutMs(20 * 1000)
.connectionTimeoutMs(20 * 1000)
.retryPolicy(retryPolicy)
.build();
client.start();
//检测是否存在该路径。
Stat stat = client.checkExists().forPath(DEVICE_PATH);
//如果不存在这个路径,stat为null,创建新的节点路径。
if (stat == null) {
String s = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(DEVICE_PATH);
System.out.println("已创建:" + s);
} else {
System.out.println("已存在:" + DEVICE_PATH);
}
TreeCache treeCache = new TreeCache(client, DEVICE_PATH);
TreeCacheListener treeCacheListener = new TreeCacheListener() {
@Override
public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception {
System.out.println("-----");
System.out.println("路径:" + event.getData().getPath());
System.out.println("数据:" + new String(event.getData().getData(), "UTF-8"));
System.out.println("ZooKeeper:" + client.getZookeeperClient().getZooKeeper().toString());
}
};
treeCache.getListenable().addListener(treeCacheListener);
treeCache.start();
synchronized (this) {
wait();
}
}
}
下面这个设备B模拟一个普通设备接入分布式集群中:
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
public class Main {
private String DEVICE_PATH = "/devices";
public static void main(String[] args) {
//初始化log4j,zookeeper否则报错。
//org.apache.log4j.BasicConfigurator.configure();
try {
Main app = new Main();
} catch (Exception e) {
e.printStackTrace();
}
}
public Main() throws Exception {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(getAddress())
.sessionTimeoutMs(20 * 1000)
.connectionTimeoutMs(20 * 1000)
.retryPolicy(retryPolicy)
.build();
client.start();
String s = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath(DEVICE_PATH + "/client");
System.out.println("已创建:" + s);
synchronized (this) {
wait();
}
}
private String getAddress() {
String ip = "127.0.0.1";
return ip + ":2181," + ip + ":2182," + ip + ":2183";
}
}
设备A和设备B均针对/devices作为node path监听。