import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.CyclicBarrier; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.locks.InterProcessMutex; import org.apache.curator.retry.ExponentialBackoffRetry; public class LockDemo { /** zookeeper地址 */ static final String CONNECT_ADDR = "127.0.0.1:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS"); static int count = 10; public static void genarNo(){ try { count--; System.out.println(count + " - " + sdf.format(new Date())); } finally { } } public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 开启连接 cf.start(); //4 分布式锁 final InterProcessMutex lock = new InterProcessMutex(cf, "/super"); CyclicBarrier barrier = new CyclicBarrier(10) ; for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { // 让10个线程同时运行,才好看出 加锁 和不加锁的效果 barrier.await(); //加锁 lock.acquire(); //-------------业务处理开始 genarNo(); //-------------业务处理结束 } catch (Exception e) { e.printStackTrace(); } finally { try { //释放 lock.release(); } catch (Exception e) { e.printStackTrace(); } } } },"t" + i).start(); } Thread.sleep(5000); } }
1、直接运行结果,运行多次,效果相同
9 - 20:24:06|665
8 - 20:24:06|683
7 - 20:24:06|694
6 - 20:24:06|700
5 - 20:24:06|704
4 - 20:24:06|709
3 - 20:24:06|716
2 - 20:24:06|719
1 - 20:24:06|723
0 - 20:24:06|725
2、将lock.acquire(); 和 lock.release(); 两行注释,即不加锁
2 - 20:27:21|365
7 - 20:27:21|365
6 - 20:27:21|365
7 - 20:27:21|365
0 - 20:27:21|365
8 - 20:27:21|365
5 - 20:27:21|365
4 - 20:27:21|365
3 - 20:27:21|365
1 - 20:27:21|366
顺序错乱,读取的数据有重复的,时间相同
功能类似CycleBarrier , 进入的时候同时拦截,出去的时候同时拦截
import java.util.Random; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; /** * DistributedDoubleBarrier 类似 CycleBarrier的功能 , 在所有线程执行方法(await/enter)前,所有线程进入阻塞状态。 */ public class CuratorBarrier1 { /** zookeeper地址 */ static final String CONNECT_ADDR = "127.0.0.1:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .retryPolicy(retryPolicy) .build(); cf.start(); for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { try { /* * 创建对象的时候,会生成一个ourPath ,执行enter方法的时候,根据ourPath,生成临时节点; * 因为节点不能重复创建(重复创建节点会抛异常: KeeperErrorCode = NodeExists), * 所以每个线程要有自己的DistributedDoubleBarrier对象 */ DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5); String name = Thread.currentThread().getName(); System.out.println(name + " 已经准备"); /* * enter 在此处进行阻塞 , 会添加5个临时节点(key-value : uuid-ip) , 5个临时节点创建都创建好后, * 会创建一个ready节点,value 也是IP , 一共6个临时节点 * 1、监控ready节点是否存在 * 2、每个线程enter后都会创建一个临时节点,前4个线程进入后进入wait()状态,等待notify唤醒 * 3、第5个线程进入后就创建ready节点,出发ready监控;然后在watcher中唤醒所有wait的线程。 */ barrier.enter(); System.out.println(name + " 开始运行..."); Thread.sleep(1000 * (new Random()).nextInt(10)); /* * leave 删除6临时节点 ,每个线程删除自己的临时节点和ready节点( * 多删除ready时抛异常不处理,KeeperException.NoNodeException ignore) * 所有节点全部删除完时才能向下走,否则在此等待wait() */ barrier.leave(); System.out.println(name + " 同时退出运行..."); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } Thread.sleep(10000); } }
t2 已经准备
t4 已经准备
t0 已经准备
t3 已经准备
t1 已经准备
t3 开始运行...
t0 开始运行...
t4 开始运行...
t2 开始运行...
t1 开始运行...
t4 同时退出运行...
t1 同时退出运行...
t2 同时退出运行...
t3 同时退出运行...
t0 同时退出运行...
DistributedBarrier 功能类似 CountDownLatch count = new CountDownLatch(1) ;
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorBarrier2 { /** zookeeper地址 */ static final String CONNECT_ADDR = "127.0.0.1:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); cf.start(); /* * DistributedBarrier 功能类似 CountDownLatch count = new CountDownLatch(1) ; * 设置节点路径 */ DistributedBarrier barrier = new DistributedBarrier(cf, "/super"); for(int i = 0; i < 5; i++){ new Thread(new Runnable() { @Override public void run() { try { String name = Thread.currentThread().getName(); System.out.println(name + " 设置barrier!"); //设置 , 创建节点,重复创建抛异常不处理 barrier.setBarrier(); //等待 , 设置节点监控,节点存在,就wait(),进入线程阻塞; 节点不存在时,代码往下执行; barrier.waitOnBarrier(); System.out.println(name + "---------开始执行程序----------"); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } Thread.sleep(1000); barrier.removeBarrier(); //释放 , 删除节点 } }
t1 设置barrier!
t3 设置barrier!
t4 设置barrier!
t0 设置barrier!
t2 设置barrier!
t1---------开始执行程序----------
t0---------开始执行程序----------
t4---------开始执行程序----------
t2---------开始执行程序----------
t3---------开始执行程序----------
import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.barriers.DistributedBarrier; import org.apache.curator.retry.ExponentialBackoffRetry; public class CuratorBarrier3 { /** zookeeper地址 */ static final String CONNECT_ADDR = "127.0.0.1:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;// ms public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy).build(); cf.start(); /* * DistributedBarrier 功能类似 CountDownLatch count = new CountDownLatch(1) ; 设置节点路径 */ DistributedBarrier barrier = new DistributedBarrier(cf, "/super"); // 设置 , 创建节点,重复创建抛异常不处理 barrier.setBarrier(); new Thread(new Runnable() { @Override public void run() { try { System.out.println("Thread.sleep(5000)"); Thread.sleep(5000); barrier.removeBarrier(); // 释放 , 删除节点 } catch (Exception e) { e.printStackTrace(); } } }, "t1").start(); // 等待 , 设置节点监控,节点存在,就wait(),进入线程阻塞; 节点不存在时,代码往下执行; System.out.println("waitOnBarrier"); barrier.waitOnBarrier(); System.out.println("---------开始执行程序----------"); } }
waitOnBarrier
Thread.sleep(5000)
---------开始执行程序----------
import java.util.concurrent.CyclicBarrier; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.atomic.AtomicValue; import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.curator.retry.RetryNTimes; public class CuratorAtomicInteger { /** zookeeper地址 */ static final String CONNECT_ADDR = "127.0.0.1:2181"; /** session超时时间 */ static final int SESSION_OUTTIME = 5000;//ms public static void main(String[] args) throws Exception { //1 重试策略:初试时间为1s 重试10次 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10); //2 通过工厂创建连接 CuratorFramework cf = CuratorFrameworkFactory.builder() .connectString(CONNECT_ADDR) .sessionTimeoutMs(SESSION_OUTTIME) .retryPolicy(retryPolicy) .build(); //3 开启连接 cf.start(); /* * 4 使用DistributedAtomicInteger , new RetryNTimes(3, 1000) 重试3次,每次间隔10ms * 重试次数最好结合并发量进行配置,如果重试次数小,并发量高的话,一些线程就获取 不到数据。 * 本地单机测试,重试次数和线程数配置相同即可。 */ DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(cf, "/super", new RetryNTimes(10, 10)); CyclicBarrier barrier = new CyclicBarrier(10) ; for(int i = 0; i < 10; i++){ new Thread(new Runnable() { @Override public void run() { try { String name = Thread.currentThread().getName(); barrier.await() ; AtomicValue<Integer> value = atomicIntger.increment(); /* * value.preValue() 旧值 * value.postValue() 最新值 * value.succeeded() true为成功,false则最新值为0 */ System.out.println(name + " : " + value.succeeded() + " - " + value.postValue() + " - " + value.preValue()); } catch (Exception e) { e.printStackTrace(); } } },"t" + i).start(); } Thread.sleep(1000); } }
t6 : true - 170 - 169
t8 : true - 171 - 170
t1 : true - 172 - 171
t4 : true - 173 - 172
t3 : true - 174 - 173
t0 : true - 175 - 174
t2 : true - 176 - 175
t9 : true - 177 - 176
t7 : true - 178 - 177
t5 : true - 179 - 178