zookeeper的更新监听

1.zookeeper原生监听方式
pom.xml
<dependency>
	<groupId>org.apache.zookeeper</groupId>
	<artifactId>zookeeper</artifactId>
	<version>3.7.0</version>
</dependency>


监听代码:

package net.highersoft.zookeeper;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;

import org.apache.log4j.Logger;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

public class ZkWatcher implements Watcher {
	private static Logger log=Logger.getLogger(ZkWatcher.class);
	public static abstract class Subscriber {
		abstract String target();
 
		abstract void process(WatchedEvent event,String newValue);
	}
 
	private static ZooKeeper zooKeeper;
	private static final List<Subscriber> subscribers = new ArrayList<>();
	private CountDownLatch latch = new CountDownLatch(1);
 
	@Override
	public void process(WatchedEvent watchedEvent) {
		if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) {
			//init done
			latch.countDown();
		} else if (watchedEvent.getState().equals(Event.KeeperState.Disconnected)) {
			log.info("Disconnected!");
		}
		//foreach
		synchronized (subscribers) {
			for (Subscriber subscribe : subscribers) {
				if (subscribe.target().equals(watchedEvent.getPath())) {
					try {
						//listening again
						String data = new String(zooKeeper.getData(watchedEvent.getPath(),true,null),"utf-8");
						subscribe.process(watchedEvent,data);
					
					} catch (Exception e) {
						log.error(e.getMessage(),e);
					}
				}
			}
		}
	}
 
	/**
	 * command : get path watch
	 */
	public boolean addSubscriber(Subscriber subscriber) {
		try {
			latch.await();
		} catch (InterruptedException e) {
			log.error(e.getMessage(),e);
			return false;
		}
		//add a subscriber
		synchronized (subscribers) {
			subscribers.add(subscriber);
		}
		try {
			//start listener
			zooKeeper.getData(subscriber.target(), true, null);
		} catch (Exception e) {
			log.error(e.getMessage(),e);
			return false;
		}
		return true;
	}
 
	
 
	public ZkWatcher(String url, Integer timeOut) throws IOException {
		zooKeeper = new ZooKeeper(url, timeOut, this);
	}
 
	public static void main(String[] args) throws IOException, InterruptedException {
		ZkWatcher demo = new ZkWatcher("zk1:2181,zk2:2181,zk3:2181", 500);
		boolean res = demo.addSubscriber(new Subscriber() {
			@Override
			String target() {
				return "/rec/web/pmml";
			}
 
			@Override
			void process(WatchedEvent event, String newValue) {
				System.out.println(target() + ", the new value is : " + newValue);
			}
		});
		System.out.println(res);
		while (true) {
			//set("/config/aaa/bbb",System.currentTimeMillis() + "",-1);
			Thread.sleep(2000);
		}
	}
}

2.curator监听

pom.xml:

<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-framework</artifactId>
    <version>2.12.0</version>
</dependency>
<!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier -->
<dependency>
    <groupId>org.apache.curator</groupId>
    <artifactId>curator-recipes</artifactId>
    <version>2.12.0</version>
</dependency>

监听代码:

package net.highersoft.zookeeper;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import org.apache.curator.RetryPolicy;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.recipes.cache.NodeCache;
import org.apache.curator.framework.recipes.cache.NodeCacheListener;
import org.apache.curator.retry.ExponentialBackoffRetry;
import org.apache.log4j.Logger;

public class CuratorWatcher {
	private static Logger log=Logger.getLogger(CuratorWatcher.class);
	public static void main(String args[]) throws Exception {
		 
		RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
        //String zkhost="zk1:2181,zk2:2181,zk3:2181";
	
        CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost)
                .connectionTimeoutMs(5000)
                .sessionTimeoutMs(5000)
                .retryPolicy(retryPolicy);
       
        CuratorFramework client = builder.build();
       
        client.start();
        //String txt=new String(client.getData().forPath("/rec/web/pmml"));
        //System.out.println("txt:"+txt);
        
        WatchThread thread=new WatchThread(client);
       
        thread.start();
	}
	
	
	

}
/**
 * 
 * 监听节点变化
 * 
 * */
class WatchThread extends Thread{
	private static Logger log=Logger.getLogger(CuratorWatcher.class);
	private CuratorFramework client;
	
	public WatchThread(CuratorFramework client){
		this.client=client;
	}
	public void run() {
		ExecutorService pool = Executors.newFixedThreadPool(5);
        //groupConfig配置
        final NodeCache nodeCache = new NodeCache(client, "/rec/web/pmml", false);
       
        try {
        	 nodeCache.start(true);
        	 nodeCache.getListenable().addListener(new NodeCacheListener() {
                 @Override
                 public void nodeChanged() throws Exception {
                     String configFile = new String(nodeCache.getCurrentData().getData());
                     System.out.println("Config file is changed, new data:+" + configFile);
                     
                 }
             }, pool);
			
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
       
        try {
			Thread.sleep(Long.MAX_VALUE);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
       
	}
}

文/程忠 浏览次数:0次   2021-04-29 10:00:02

相关阅读


评论:
点击刷新

↓ 广告开始-头部带绿为生活 ↓
↑ 广告结束-尾部支持多点击 ↑