zookeeper的更新监听
1.zookeeper原生监听方式
pom.xml
文/程忠 浏览次数:0次 2021-04-29 10:00:02
pom.xml
<dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.7.0</version> </dependency>
监听代码:
package net.highersoft.zookeeper; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.log4j.Logger; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher; import org.apache.zookeeper.ZooKeeper; public class ZkWatcher implements Watcher { private static Logger log=Logger.getLogger(ZkWatcher.class); public static abstract class Subscriber { abstract String target(); abstract void process(WatchedEvent event,String newValue); } private static ZooKeeper zooKeeper; private static final List<Subscriber> subscribers = new ArrayList<>(); private CountDownLatch latch = new CountDownLatch(1); @Override public void process(WatchedEvent watchedEvent) { if (watchedEvent.getState().equals(Event.KeeperState.SyncConnected)) { //init done latch.countDown(); } else if (watchedEvent.getState().equals(Event.KeeperState.Disconnected)) { log.info("Disconnected!"); } //foreach synchronized (subscribers) { for (Subscriber subscribe : subscribers) { if (subscribe.target().equals(watchedEvent.getPath())) { try { //listening again String data = new String(zooKeeper.getData(watchedEvent.getPath(),true,null),"utf-8"); subscribe.process(watchedEvent,data); } catch (Exception e) { log.error(e.getMessage(),e); } } } } } /** * command : get path watch */ public boolean addSubscriber(Subscriber subscriber) { try { latch.await(); } catch (InterruptedException e) { log.error(e.getMessage(),e); return false; } //add a subscriber synchronized (subscribers) { subscribers.add(subscriber); } try { //start listener zooKeeper.getData(subscriber.target(), true, null); } catch (Exception e) { log.error(e.getMessage(),e); return false; } return true; } public ZkWatcher(String url, Integer timeOut) throws IOException { zooKeeper = new ZooKeeper(url, timeOut, this); } public static void main(String[] args) throws IOException, InterruptedException { ZkWatcher demo = new ZkWatcher("zk1:2181,zk2:2181,zk3:2181", 500); boolean res = demo.addSubscriber(new Subscriber() { @Override String target() { return "/rec/web/pmml"; } @Override void process(WatchedEvent event, String newValue) { System.out.println(target() + ", the new value is : " + newValue); } }); System.out.println(res); while (true) { //set("/config/aaa/bbb",System.currentTimeMillis() + "",-1); Thread.sleep(2000); } } }
2.curator监听
pom.xml:
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <!-- 封装了一些高级特性,如:Cache事件监听、选举、分布式锁、分布式Barrier --> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency>
监听代码:
package net.highersoft.zookeeper; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.framework.recipes.cache.NodeCache; import org.apache.curator.framework.recipes.cache.NodeCacheListener; import org.apache.curator.retry.ExponentialBackoffRetry; import org.apache.log4j.Logger; public class CuratorWatcher { private static Logger log=Logger.getLogger(CuratorWatcher.class); public static void main(String args[]) throws Exception { RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); //String zkhost="zk1:2181,zk2:2181,zk3:2181"; CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder().connectString(zkhost) .connectionTimeoutMs(5000) .sessionTimeoutMs(5000) .retryPolicy(retryPolicy); CuratorFramework client = builder.build(); client.start(); //String txt=new String(client.getData().forPath("/rec/web/pmml")); //System.out.println("txt:"+txt); WatchThread thread=new WatchThread(client); thread.start(); } } /** * * 监听节点变化 * * */ class WatchThread extends Thread{ private static Logger log=Logger.getLogger(CuratorWatcher.class); private CuratorFramework client; public WatchThread(CuratorFramework client){ this.client=client; } public void run() { ExecutorService pool = Executors.newFixedThreadPool(5); //groupConfig配置 final NodeCache nodeCache = new NodeCache(client, "/rec/web/pmml", false); try { nodeCache.start(true); nodeCache.getListenable().addListener(new NodeCacheListener() { @Override public void nodeChanged() throws Exception { String configFile = new String(nodeCache.getCurrentData().getData()); System.out.println("Config file is changed, new data:+" + configFile); } }, pool); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } try { Thread.sleep(Long.MAX_VALUE); } catch (InterruptedException e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
相关阅读
评论:
↓ 广告开始-头部带绿为生活 ↓
↑ 广告结束-尾部支持多点击 ↑