基于Curator更新写入zookeeper分布式集群节点数据
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
public class Main {
public static void main(String[] args) {
//初始化log4j,zookeeper否则报错。
org.apache.log4j.BasicConfigurator.configure();
try {
Main m = new Main();
} catch (Exception e) {
e.printStackTrace();
}
}
public Main() throws Exception {
String ip = "localhost";
String addrs = ip + ":2181," + ip + ":2182," + ip + ":2183";
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
String node = "test_data";
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(addrs)
.sessionTimeoutMs(10 * 1000)
.connectionTimeoutMs(20 * 1000)
.namespace(node) //命名空间。
.retryPolicy(retryPolicy)
.build(); // 建立连接通道。
client.start();
String path = "/my_path";
//检测是否存在该路径。
Stat stat = client.checkExists().forPath(path);
//如果不存在这个路径,stat为null,创建新的节点路径。
if (stat == null) {
client.create()
.withMode(CreateMode.PERSISTENT)
.forPath(path);
}
client.getData().usingWatcher(new MyCuratorWatcher()).forPath(path);
//更新数据。
client.setData()
//.withVersion(0)
.forPath(path, "zhang phil".getBytes("UTF-8"));
synchronized (Main.class) {
Main.class.wait();
}
}
private class MyCuratorWatcher implements CuratorWatcher {
public void process(WatchedEvent event) throws Exception {
System.out.println("路径:" + event.getPath());
System.out.println("事件:" + event.toString());
}
}
}