一、集成
<dependencies>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.4.2</version>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-test</artifactId>
<version>2.4.2</version>
</dependency>
</dependencies>
二、简单应用
2.1、同步
public class zkCliTest {
private static CuratorFramework client;
private String path = "/zk-book/c1";
static {
//重试策略,可以通过实现RetryPolicy接口来定义自己的策略
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.namespace("base") //指定命名空间,则下面所有操作都是基于base来进行的
.build();
client.start();
}
/*withMode, 这里区分临时节点和持久化节点*/
public void nodeCreate() throws Exception {
client.create().creatingParentsIfNeeded().withMode(CreateMode.PERSISTENT).forPath(path, "init".getBytes());
}
public void nodeRead() throws Exception {
Stat stat = new Stat();
System.out.println(new String(client.getData().storingStatIn(stat).forPath(path)));
}
public void nodeDel() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
client.delete().deletingChildrenIfNeeded()
.withVersion(stat.getVersion()).forPath(path);
}
public void nodeSet() throws Exception {
Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath(path);
System.out.println("Success set node for : " + path + ", new version: "
+ client.setData().withVersion(stat.getVersion()).forPath(path).getVersion());
try {
client.setData().withVersion(stat.getVersion()).forPath(path);
} catch (Exception e) {
System.out.println("Fail set node due to " + e.getMessage());
}
}
}
2.2、异步
在Zookeeper中,所有的异步通知事件处理都是由EventThread这个线程来处理的,EventThread线程用于串行处理所有的事件通知,其可以保证对事件处理的顺序性,但是一旦碰上复杂的处理单元,会消耗过长的处理时间,从而影响其他事件的处理,Curator允许用户传入Executor实例,这样可以将比较复杂的事件处理放到一个专门的线程池中去。
public class ZkBackTest {
private static String path = "/zk-book/c1";
private static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
/*
* Main thread: main
event[code: -110, type: CREATE]
Thread of processResult: main-EventThread
event1[code: -110, type: CREATE]
Thread of processResult: pool-3-thread-1
* */
System.out.println("Main thread: " + Thread.currentThread().getName());
final CountDownLatch semaphore = new CountDownLatch(2);
ExecutorService tp = Executors.newFixedThreadPool(2);
/**
* BackgroundCallback接口 用来处理异步接口调用之后服务端返回的结果
* */
// 此处传入了自定义的Executor
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event1[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
System.out.println("Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}, tp).forPath(path, "init".getBytes());
// 此处没有传入自定义的Executor
client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).inBackground(new BackgroundCallback() {
public void processResult(CuratorFramework client, CuratorEvent event) throws Exception {
System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
System.out.println("Thread of processResult: " + Thread.currentThread().getName());
semaphore.countDown();
}
}).forPath(path, "init".getBytes());
semaphore.await();
tp.shutdown();
}
}
2.3、ZKPaths
//其提供了简单的API来构建znode路径、递归创建、删除节点等。
public class ZKPaths_Sample {
static String path = "/curator_zkpath_sample";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString( "domain1.book.zookeeper:2181" )
.sessionTimeoutMs( 5000 )
.retryPolicy( new ExponentialBackoffRetry( 1000, 3 ) )
.build();
public static void main(String[] args) throws Exception {
client.start();
ZooKeeper zookeeper = client.getZookeeperClient().getZooKeeper();
System.out.println(ZKPaths.fixForNamespace(path,"sub"));
System.out.println(ZKPaths.makePath(path, "sub"));
System.out.println( ZKPaths.getNodeFromPath( "/curator_zkpath_sample/sub1" ) );
PathAndNode pn = ZKPaths.getPathAndNode( "/curator_zkpath_sample/sub1" );
System.out.println(pn.getPath());
System.out.println(pn.getNode());
String dir1 = path + "/child1";
String dir2 = path + "/child2";
ZKPaths.mkdirs(zookeeper, dir1);
ZKPaths.mkdirs(zookeeper, dir2);
System.out.println(ZKPaths.getSortedChildren( zookeeper, path ));
ZKPaths.deleteChildren(client.getZookeeperClient().getZooKeeper(), path, true);
}
}
2.4、EnsurePath
//其提供了一种能够确保数据节点存在的机制,当上层业务希望对一个数据节点进行操作时,操作前需要确保该节点存在
public class EnsurePathDemo {
static String path = "/zk-book/c1";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
public static void main(String[] args) throws Exception {
client.start();
client.usingNamespace( "zk-book" );
EnsurePath ensurePath = new EnsurePath(path);
ensurePath.ensure(client.getZookeeperClient());
ensurePath.ensure(client.getZookeeperClient());
EnsurePath ensurePath2 = client.newNamespaceAwareEnsurePath("/c1");
ensurePath2.ensure(client.getZookeeperClient());
}
}
三、高级应用
3.1、分布式锁
//使用Curator实现分布式锁功能
public class Recipes_Lock {
static String lock_path = "/curator_recipes_lock_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("127.0.0.1:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main(String[] args) throws Exception {
client.start();
final InterProcessMutex lock = new InterProcessMutex(client,lock_path);
//关卡
final CountDownLatch down = new CountDownLatch(1);
for(int i = 0; i < 30; i++){
new Thread(new Runnable() {
public void run() {
try {
down.await();//阻塞
lock.acquire();
} catch ( Exception e ) {}
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
String orderNo = sdf.format(new Date());
System.out.println("生成的订单号是 : "+orderNo);
try {
lock.release();
} catch ( Exception e ) {}
}
}).start();
}
down.countDown();//+1
}
}
3.2、计时器
public class Recipes_DistAtomicInt {
static String distatomicint_path = "/curator_recipes_distatomicint_path";
static CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
public static void main( String[] args ) throws Exception {
client.start();
DistributedAtomicInteger atomicInteger =
new DistributedAtomicInteger( client, distatomicint_path,
new RetryNTimes( 3, 1000 ) );
AtomicValue<Integer> rc = atomicInteger.add( 8 );
System.out.println( "Result: " + rc.succeeded() );
}
}
3.3、分布式Barrier
//使用Curator实现分布式Barrier
public class Recipes_Barrier {
static String barrier_path = "/curator_recipes_barrier_path";
static DistributedBarrier barrier;
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
public void run() {
try {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
barrier = new DistributedBarrier(client, barrier_path);
System.out.println(Thread.currentThread().getName() + "号barrier设置" );
barrier.setBarrier();
barrier.waitOnBarrier();
System.err.println("启动...");
} catch (Exception e) {}
}
}).start();
}
Thread.sleep( 2000 );
barrier.removeBarrier();
}
}
public class Recipes_Barrier2 {
static String barrier_path = "/curator_recipes_barrier_path";
public static void main(String[] args) throws Exception {
for (int i = 0; i < 5; i++) {
new Thread(new Runnable() {
public void run() {
try {
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString("domain1.book.zookeeper:2181")
.retryPolicy(new ExponentialBackoffRetry(1000, 3)).build();
client.start();
DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(client, barrier_path,5);
Thread.sleep( Math.round(Math.random() * 3000) );
System.out.println(Thread.currentThread().getName() + "号进入barrier" );
barrier.enter();
System.out.println("启动...");
Thread.sleep( Math.round(Math.random() * 3000) );
barrier.leave();
System.out.println( "退出..." );
} catch (Exception e) {}
}
}).start();
}
}
}
public class Recipes_CyclicBarrier {
public static CyclicBarrier barrier = new CyclicBarrier( 3 );
public static void main( String[] args ) throws IOException, InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool( 3 );
executor.submit( new Thread( new Runner( "1号选手" ) ) );
executor.submit( new Thread( new Runner( "2号选手" ) ) );
executor.submit( new Thread( new Runner( "3号选手" ) ) );
executor.shutdown();
}
}
class Runner implements Runnable {
private String name;
public Runner( String name ) {
this.name = name;
}
public void run() {
System.out.println( name + " 准备好了." );
try {
Recipes_CyclicBarrier.barrier.await();
} catch ( Exception e ) {}
System.out.println( name + " 起跑!" );
}
}
四、节点管理
4.1、监听
final NodeCache cache = new NodeCache(client,path,false);
cache.start(true);
cache.getListenable().addListener(new NodeCacheListener() {
public void nodeChanged() throws Exception {
System.out.println("Node data update, new data: " +
new String(cache.getCurrentData().getData()));
}
});
PathChildrenCache cache = new PathChildrenCache(client, path, true);
cache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
cache.getListenable().addListener(new PathChildrenCacheListener() {
public void childEvent(CuratorFramework client,
PathChildrenCacheEvent event) throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED," + event.getData().getPath());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED," + event.getData().getPath());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED," + event.getData().getPath());
break;
default:
break;
}
}
});
4.2、选举
String master_path = "/curator_recipes_master_path";
LeaderSelector selector = new LeaderSelector(client,
master_path,
new LeaderSelectorListenerAdapter() {
public void takeLeadership(CuratorFramework client) throws Exception {
System.out.println("成为Master角色");
Thread.sleep( 3000 );
System.out.println( "完成Master操作,释放Master权利" );
}
});
selector.autoRequeue();
selector.start();
Thread.sleep( Integer.MAX_VALUE );
五、测试管理
5.1、单机模拟
public class TestingServer_Sample {
static String path = "/zookeeper";
public static void main(String[] args) throws Exception {
TestingServer server = new TestingServer(2181,new File("/home/admin/zk-book-data"));
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(server.getConnectString())
.sessionTimeoutMs(5000)
.retryPolicy(new ExponentialBackoffRetry(1000, 3))
.build();
client.start();
System.out.println( client.getChildren().forPath( path ));
server.close();
}
}
5.2、集群模拟
public class TestingCluster_Sample {
public static void main(String[] args) throws Exception {
TestingCluster cluster = new TestingCluster(3);
cluster.start();
Thread.sleep(2000);
TestingZooKeeperServer leader = null;
for(TestingZooKeeperServer zs : cluster.getServers()){
System.out.print(zs.getInstanceSpec().getServerId()+"-");
System.out.print(zs.getQuorumPeer().getServerState()+"-");
System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
if( zs.getQuorumPeer().getServerState().equals( "leading" )){
leader = zs;
}
}
leader.kill();
System.out.println( "--After leader kill:" );
for(TestingZooKeeperServer zs : cluster.getServers()){
System.out.print(zs.getInstanceSpec().getServerId()+"-");
System.out.print(zs.getQuorumPeer().getServerState()+"-");
System.out.println(zs.getInstanceSpec().getDataDirectory().getAbsolutePath());
}
cluster.stop();
}
}