ScheduleWatcher.java
2.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
package com.taobao.pamirs.schedule.zk;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ScheduleWatcher implements Watcher {
private static transient Logger log = LoggerFactory.getLogger(ScheduleWatcher.class);
private Map<String,Watcher> route = new ConcurrentHashMap<String,Watcher>();
private ZKManager manager;
public ScheduleWatcher(ZKManager aManager){
this.manager = aManager;
}
public void registerChildrenChanged(String path,Watcher watcher) throws Exception{
manager.getZooKeeper().getChildren(path, true);
route.put(path,watcher);
}
public void process(WatchedEvent event) {
if(log.isInfoEnabled()){
log.info("已经触发了" + event.getType() + ":"+ event.getState() + "事件!" + event.getPath());
}
if(event.getType() == Event.EventType.NodeChildrenChanged){
String path = event.getPath();
Watcher watcher = route.get(path);
if( watcher != null ){
try{
watcher.process(event);
}finally{
try{
if(manager.getZooKeeper().exists(path,null) != null){
manager.getZooKeeper().getChildren(path, true);
}
}catch(Exception e){
log.error(path +":" + e.getMessage(),e);
}
}
}else{
log.info("已经触发了" + event.getType() + ":"+ event.getState() + "事件!" + event.getPath());
}
}else if(event.getState()== KeeperState.AuthFailed){
log.info("tb_hj_schedule zk status =KeeperState.AuthFailed!");
}else if(event.getState()== KeeperState.ConnectedReadOnly){
log.info("tb_hj_schedule zk status =KeeperState.ConnectedReadOnly!");
}else if(event.getState()== KeeperState.Disconnected){
log.info("tb_hj_schedule zk status =KeeperState.Disconnected!");
try {
manager.reConnection();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}else if(event.getState()== KeeperState.NoSyncConnected){
log.info("tb_hj_schedule zk status =KeeperState.NoSyncConnected!等待重新建立ZK连接.. ");
try {
manager.reConnection();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}else if (event.getState()== KeeperState.SaslAuthenticated){
log.info("tb_hj_schedule zk status =KeeperState.SaslAuthenticated!");
}else if(event.getState() == KeeperState.Unknown){
log.info("tb_hj_schedule zk status =KeeperState.Unknown!");
}else if (event.getState() == KeeperState.SyncConnected) {
log.info("收到ZK连接成功事件!");
} else if (event.getState() == KeeperState.Expired) {
log.error("会话超时,等待重新建立ZK连接...");
try {
manager.reConnection();
} catch (Exception e) {
log.error(e.getMessage(),e);
}
}
}
}