zookeeper分布式集群Curator分布式锁InterProcessMutex
import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.data.Stat;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class Main {
private int COUNT = 3;
private int TIME = 50;
private String lockPath = "/path/lock";
public static void main(String[] args) {
//初始化log4j,zookeeper否则报错。
//org.apache.log4j.BasicConfigurator.configure();
try {
Main app = new Main();
app.zk();
} catch (Exception e) {
e.printStackTrace();
}
}
private void doJob(InterProcessMutex lock, int id) throws Exception {
lock.acquire();
System.out.println("客户端" + id + "获得锁");
long l = (long) (Math.random() * 10);
System.out.println("客户端" + id + "休眠" + l + "秒");
TimeUnit.SECONDS.sleep(l);
System.out.println("客户端" + id + "释放锁");
lock.release();
}
private void zk() throws Exception {
//任意位置的SharedCount,只要使用相同的path,都可以得到这个计数值。
String path = "/path/count";
//开启COUNT个线程,模拟对分布式中SharedCount的不同节点的赋值操作。
ExecutorService service = Executors.newFixedThreadPool(COUNT);
for (int i = 0; i < COUNT; i++) {
CuratorFramework client = initClient(path);
int id = i;
Runnable runnable = new Runnable() {
@Override
public void run() {
InterProcessMutex lock = new InterProcessMutex(client, lockPath);
try {
doJob(lock, id);
} catch (Exception e) {
e.printStackTrace();
}
}
};
service.submit(runnable);
}
service.shutdown();
service.awaitTermination(TIME, TimeUnit.SECONDS);
}
private CuratorFramework initClient(String path) throws Exception {
CuratorFramework client = makeClient();
client.start();
boolean b = isPathExist(client, path);
//如果不存在这个路径,stat为null,创建新的节点路径。
if (!b) {
String s = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path);
System.out.println("创建 " + s);
} else {
System.out.println("已存在:" + path + ",不需重复创建");
}
return client;
}
//检测是否存在该路径。
private boolean isPathExist(CuratorFramework client, String path) {
boolean b = false;
//检测是否存在该路径。
try {
Stat stat = client.checkExists().forPath(path);
b = stat == null ? false : true;
} catch (Exception e) {
e.printStackTrace();
}
return b;
}
private CuratorFramework makeClient() {
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 5);
CuratorFramework client = CuratorFrameworkFactory.builder()
.connectString(getAddress())
.sessionTimeoutMs(10 * 1000)
.connectionTimeoutMs(20 * 1000)
.retryPolicy(retryPolicy)
.build();
return client;
}
private String getAddress() {
String ip = "127.0.0.1";
return ip + ":2181," + ip + ":2182," + ip + ":2183";
}
}
输出:
创建 /path/count
已存在:/path/count,不需重复创建
已存在:/path/count,不需重复创建
客户端2获得锁
客户端2休眠3秒
客户端2释放锁
客户端0获得锁
客户端0休眠8秒
客户端0释放锁
客户端1获得锁
客户端1休眠1秒
客户端1释放锁