Zookeeper客户端Apache Curator高级特性


1、分布式锁

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CyclicBarrier;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.locks.InterProcessMutex;
import org.apache.curator.retry.ExponentialBackoffRetry;


public class LockDemo {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "127.0.0.1:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	static SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss|SSS");
	static int count = 10;
	public static void genarNo(){
		try {
			count--;
			System.out.println(count + " - " + sdf.format(new Date()));
		} finally {
		
		}
	}
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		//3 开启连接
		cf.start();
		
		//4 分布式锁
		final InterProcessMutex lock = new InterProcessMutex(cf, "/super");
		CyclicBarrier barrier = new CyclicBarrier(10) ;
		
		for(int i = 0; i < 10; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						// 让10个线程同时运行,才好看出 加锁 和不加锁的效果
						barrier.await(); 
						//加锁
						lock.acquire();
						//-------------业务处理开始
						genarNo();
						//-------------业务处理结束
					} catch (Exception e) {
						e.printStackTrace();
					} finally {
						try {
							//释放
							lock.release();
						} catch (Exception e) {
							e.printStackTrace();
						}
					}
				}
			},"t" + i).start();
		}
		Thread.sleep(5000);
		 
	}
}

1、直接运行结果,运行多次,效果相同

9 - 20:24:06|665

8 - 20:24:06|683

7 - 20:24:06|694

6 - 20:24:06|700

5 - 20:24:06|704

4 - 20:24:06|709

3 - 20:24:06|716

2 - 20:24:06|719

1 - 20:24:06|723

0 - 20:24:06|725

2、将lock.acquire(); 和 lock.release(); 两行注释,即不加锁

2 - 20:27:21|365

7 - 20:27:21|365

6 - 20:27:21|365

7 - 20:27:21|365

0 - 20:27:21|365

8 - 20:27:21|365

5 - 20:27:21|365

4 - 20:27:21|365

3 - 20:27:21|365

1 - 20:27:21|366

顺序错乱,读取的数据有重复的,时间相同




2、DistributedDoubleBarrier  分布式双屏障

功能类似CycleBarrier  , 进入的时候同时拦截,出去的时候同时拦截

import java.util.Random;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedDoubleBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;
/**
 * DistributedDoubleBarrier  类似 CycleBarrier的功能  , 在所有线程执行方法(await/enter)前,所有线程进入阻塞状态。
 */
public class CuratorBarrier1 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "127.0.0.1:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.retryPolicy(retryPolicy)
					.build();
		cf.start();
		
		for(int i = 0; i < 5; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						/*
						 *  创建对象的时候,会生成一个ourPath ,执行enter方法的时候,根据ourPath,生成临时节点;
						 *  因为节点不能重复创建(重复创建节点会抛异常: KeeperErrorCode = NodeExists),
						 *  所以每个线程要有自己的DistributedDoubleBarrier对象
						 */
						DistributedDoubleBarrier barrier = new DistributedDoubleBarrier(cf, "/super", 5);
						
						String name = Thread.currentThread().getName();
						System.out.println(name + " 已经准备");
						
						/*
						 *  enter 在此处进行阻塞  , 会添加5个临时节点(key-value : uuid-ip) , 5个临时节点创建都创建好后,
						 *  会创建一个ready节点,value 也是IP , 一共6个临时节点
						 *  1、监控ready节点是否存在
						 *  2、每个线程enter后都会创建一个临时节点,前4个线程进入后进入wait()状态,等待notify唤醒
						 *  3、第5个线程进入后就创建ready节点,出发ready监控;然后在watcher中唤醒所有wait的线程。
						 */
						barrier.enter(); 
						System.out.println(name + " 开始运行...");
						Thread.sleep(1000 * (new Random()).nextInt(10)); 
						
						/*
						 *  leave 删除6临时节点 ,每个线程删除自己的临时节点和ready节点(
						 *  多删除ready时抛异常不处理,KeeperException.NoNodeException ignore)
						 *  所有节点全部删除完时才能向下走,否则在此等待wait()
						 */
						barrier.leave();
						System.out.println(name + " 同时退出运行...");
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}
		
		Thread.sleep(10000);
		
	}
}

t2 已经准备

t4 已经准备

t0 已经准备

t3 已经准备

t1 已经准备

t3 开始运行...

t0 开始运行...

t4 开始运行...

t2 开始运行...

t1 开始运行...

t4 同时退出运行...

t1 同时退出运行...

t2 同时退出运行...

t3 同时退出运行...

t0 同时退出运行...



3、DistributedBarrier 分布式屏障

DistributedBarrier 功能类似 CountDownLatch count = new CountDownLatch(1) ;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
import org.apache.curator.retry.ExponentialBackoffRetry;

public class CuratorBarrier2 {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "127.0.0.1:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		cf.start();
		
		/*
		 *  DistributedBarrier 功能类似 CountDownLatch count = new CountDownLatch(1) ;
		 *  设置节点路径
		 */
		DistributedBarrier barrier = new DistributedBarrier(cf, "/super"); 
		
		for(int i = 0; i < 5; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						String name = Thread.currentThread().getName();
						System.out.println(name + " 设置barrier!");
						//设置 , 创建节点,重复创建抛异常不处理
						barrier.setBarrier();	
						
						//等待 , 设置节点监控,节点存在,就wait(),进入线程阻塞; 节点不存在时,代码往下执行;
						barrier.waitOnBarrier();	
						System.out.println(name + "---------开始执行程序----------");
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}
		Thread.sleep(1000);
		barrier.removeBarrier();	//释放 , 删除节点
		
	}
}

t1 设置barrier!

t3 设置barrier!

t4 设置barrier!

t0 设置barrier!

t2 设置barrier!

t1---------开始执行程序----------

t0---------开始执行程序----------

t4---------开始执行程序----------

t2---------开始执行程序----------

t3---------开始执行程序----------


import org.apache.curator.RetryPolicy;    
    import org.apache.curator.framework.CuratorFramework;
    import org.apache.curator.framework.CuratorFrameworkFactory;
    import org.apache.curator.framework.recipes.barriers.DistributedBarrier;
    import org.apache.curator.retry.ExponentialBackoffRetry;
    
    public class CuratorBarrier3 {
    
    	/** zookeeper地址 */
    	static final String CONNECT_ADDR = "127.0.0.1:2181";
    	/** session超时时间 */
    	static final int SESSION_OUTTIME = 5000;// ms
    
    	public static void main(String[] args) throws Exception {
    
    		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
    		CuratorFramework cf = CuratorFrameworkFactory.builder().connectString(CONNECT_ADDR)
    				.sessionTimeoutMs(SESSION_OUTTIME).retryPolicy(retryPolicy).build();
    		cf.start();
    
    		/*
    		 * DistributedBarrier 功能类似 CountDownLatch count = new CountDownLatch(1) ; 设置节点路径
    		 */
    		DistributedBarrier barrier = new DistributedBarrier(cf, "/super");
    		// 设置 , 创建节点,重复创建抛异常不处理
    		barrier.setBarrier();
    		
    		new Thread(new Runnable() {
    			@Override
    			public void run() {
    				try {
    				        System.out.println("Thread.sleep(5000)");
    					Thread.sleep(5000);
    					barrier.removeBarrier(); // 释放 , 删除节点
    					
    				} catch (Exception e) {
    					e.printStackTrace();
    				}
    			}
    		}, "t1").start();
    		// 等待 , 设置节点监控,节点存在,就wait(),进入线程阻塞; 节点不存在时,代码往下执行;
    		System.out.println("waitOnBarrier");
    		barrier.waitOnBarrier();
    		System.out.println("---------开始执行程序----------");
    	}
    }

waitOnBarrier

Thread.sleep(5000)

---------开始执行程序----------



4、DistributedAtomicInteger 原子操作

import java.util.concurrent.CyclicBarrier;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.atomic.AtomicValue;
import org.apache.curator.framework.recipes.atomic.DistributedAtomicInteger;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.curator.retry.RetryNTimes;

public class CuratorAtomicInteger {

	/** zookeeper地址 */
	static final String CONNECT_ADDR = "127.0.0.1:2181";
	/** session超时时间 */
	static final int SESSION_OUTTIME = 5000;//ms 
	
	public static void main(String[] args) throws Exception {
		
		//1 重试策略:初试时间为1s 重试10次
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 10);
		//2 通过工厂创建连接
		CuratorFramework cf = CuratorFrameworkFactory.builder()
					.connectString(CONNECT_ADDR)
					.sessionTimeoutMs(SESSION_OUTTIME)
					.retryPolicy(retryPolicy)
					.build();
		//3 开启连接
		cf.start();

		/*
		 * 4 使用DistributedAtomicInteger , new RetryNTimes(3, 1000)  重试3次,每次间隔10ms
		 * 重试次数最好结合并发量进行配置,如果重试次数小,并发量高的话,一些线程就获取 不到数据。
		 * 本地单机测试,重试次数和线程数配置相同即可。
		 */
		DistributedAtomicInteger atomicIntger = new DistributedAtomicInteger(cf, "/super", new RetryNTimes(10, 10));
		
		CyclicBarrier barrier = new CyclicBarrier(10) ;
		
		for(int i = 0; i < 10; i++){
			new Thread(new Runnable() {
				@Override
				public void run() {
					try {
						String name = Thread.currentThread().getName();
						barrier.await() ;
						AtomicValue<Integer> value = atomicIntger.increment();
						/*	
						 * value.preValue()  旧值
						 * value.postValue() 最新值
						 *  value.succeeded() true为成功,false则最新值为0
						 */
						System.out.println(name + " : " + value.succeeded() + " - " + value.postValue() + " - " + value.preValue());	
					} catch (Exception e) {
						e.printStackTrace();
					}
				}
			},"t" + i).start();
		}
		Thread.sleep(1000);
		
	}
}

t6 : true - 170 - 169

t8 : true - 171 - 170

t1 : true - 172 - 171

t4 : true - 173 - 172

t3 : true - 174 - 173

t0 : true - 175 - 174

t2 : true - 176 - 175

t9 : true - 177 - 176

t7 : true - 178 - 177

t5 : true - 179 - 178



zookeeper Curator

2020.11.18 00:21

https://www.meihaocloud.com.com/324.html , 欢迎转载,请在文章页标出原文连接 !


Copyright © 2020 千夕网 联系站长

粤公网安备 44030302001408号 粤ICP备19099833号-1