TBScheduleManagerStatic.java 11.6 KB
package com.taobao.pamirs.schedule.taskmanager;

import java.util.List;
import java.util.Map;

import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.taobao.pamirs.schedule.ScheduleUtil;
import com.taobao.pamirs.schedule.TaskItemDefine;
import com.taobao.pamirs.schedule.strategy.TBScheduleManagerFactory;

public class TBScheduleManagerStatic extends TBScheduleManager {
	private static transient Logger log = LoggerFactory.getLogger(TBScheduleManagerStatic.class);
    /**
	 * 总的任务数量
	 */
    protected int taskItemCount =0;

    protected long lastFetchVersion = -1;
    
    private final Object NeedReloadTaskItemLock = new Object();

	public TBScheduleManagerStatic(TBScheduleManagerFactory aFactory,
			String baseTaskType, String ownSign,IScheduleDataManager aScheduleCenter) throws Exception {
		super(aFactory, baseTaskType, ownSign, aScheduleCenter);
	}
	public void initialRunningInfo() throws Exception{
		scheduleCenter.clearExpireScheduleServer(this.currenScheduleServer.getTaskType(),this.taskTypeInfo.getJudgeDeadInterval());
		List<String> list = scheduleCenter.loadScheduleServerNames(this.currenScheduleServer.getTaskType());
		if(scheduleCenter.isLeader(this.currenScheduleServer.getUuid(),list)){
	    	//是第一次启动,先清楚所有的垃圾数据
			log.debug(this.currenScheduleServer.getUuid() + ":" + list.size());
	    	this.scheduleCenter.initialRunningInfo4Static(this.currenScheduleServer.getBaseTaskType(), this.currenScheduleServer.getOwnSign(),this.currenScheduleServer.getUuid());
	    }
	 }
	public void initial() throws Exception{
    	new Thread(this.currenScheduleServer.getTaskType()  +"-" + this.currentSerialNumber +"-StartProcess"){
    		@SuppressWarnings("static-access")
			public void run(){
    			try{
    			   log.info("开始获取调度任务队列...... of " + currenScheduleServer.getUuid());
    			   while (isRuntimeInfoInitial == false) {
 				      if(isStopSchedule == true){
				    	  log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid());
				    	  return;
				      }
 				      //log.error("isRuntimeInfoInitial = " + isRuntimeInfoInitial);
 				      try{
					  initialRunningInfo();
					  isRuntimeInfoInitial = scheduleCenter.isInitialRunningInfoSucuss(
										currenScheduleServer.getBaseTaskType(),
										currenScheduleServer.getOwnSign());
 				      }catch(Throwable e){
 				    	  //忽略初始化的异常
 				    	  log.error(e.getMessage(),e);
 				      }
					  if(isRuntimeInfoInitial == false){
    				      Thread.currentThread().sleep(1000);
					  }
					}
    			   int count =0;
    			   lastReloadTaskItemListTime = scheduleCenter.getSystemTime();
				   while(getCurrentScheduleTaskItemListNow().size() <= 0){
    				      if(isStopSchedule == true){
    				    	  log.debug("外部命令终止调度,退出调度队列获取:" + currenScheduleServer.getUuid());
    				    	  return;
    				      }
    				      Thread.currentThread().sleep(1000);
        			      count = count + 1;
        			     // log.error("尝试获取调度队列,第" + count + "次 ") ;
    			   }
    			   String tmpStr ="TaskItemDefine:";
    			   for(int i=0;i< currentTaskItemList.size();i++){
    				   if(i>0){
    					   tmpStr = tmpStr +",";    					   
    				   }
    				   tmpStr = tmpStr + currentTaskItemList.get(i);
    			   }
    			   log.info("获取到任务处理队列,开始调度:" + tmpStr +"  of  "+ currenScheduleServer.getUuid());
    			   
    		    	//任务总量
    		    	taskItemCount = scheduleCenter.loadAllTaskItem(currenScheduleServer.getTaskType()).size();
    		    	//只有在已经获取到任务处理队列后才开始启动任务处理器    			   
    			   computerStart();
    			}catch(Exception e){
    				log.error(e.getMessage(),e);
    				String str = e.getMessage();
    				if(str.length() > 300){
    					str = str.substring(0,300);
    				}
    				startErrorInfo = "启动处理异常:" + str;
    			}
    		}
    	}.start();
    }
	/**
	 * 定时向数据配置中心更新当前服务器的心跳信息。
	 * 如果发现本次更新的时间如果已经超过了,服务器死亡的心跳周期,则不能在向服务器更新信息。
	 * 而应该当作新的服务器,进行重新注册。
	 * @throws Exception 
	 */
	public void refreshScheduleServerInfo() throws Exception {
	  try{
		rewriteScheduleInfo();
		//如果任务信息没有初始化成功,不做任务相关的处理
		if(this.isRuntimeInfoInitial == false){
			return;
		}
		
        //重新分配任务
        this.assignScheduleTask();
        
        //判断是否需要重新加载任务队列,避免任务处理进程不必要的检查和等待
        boolean tmpBoolean = this.isNeedReLoadTaskItemList();
        if(tmpBoolean != this.isNeedReloadTaskItem){
        	//只要不相同,就设置需要重新装载,因为在心跳异常的时候,做了清理队列的事情,恢复后需要重新装载。
        	synchronized (NeedReloadTaskItemLock) {
        		this.isNeedReloadTaskItem = true;
        	}
        	rewriteScheduleInfo();
        }
        
        if(this.isPauseSchedule  == true || this.processor != null && processor.isSleeping() == true){
            //如果服务已经暂停了,则需要重新定时更新 cur_server 和 req_server
            //如果服务没有暂停,一定不能调用的
               this.getCurrentScheduleTaskItemListNow();
          }
		}catch(Throwable e){
			//清除内存中所有的已经取得的数据和任务队列,避免心跳线程失败时候导致的数据重复
			this.clearMemoInfo();
			if(e instanceof Exception){
				throw (Exception)e;
			}else{
			   throw new Exception(e.getMessage(),e);
			}
		}
	}	
	/**
	 * 在leader重新分配任务,在每个server释放原来占有的任务项时,都会修改这个版本号
	 * @return
	 * @throws Exception
	 */
	public boolean isNeedReLoadTaskItemList() throws Exception{
		return this.lastFetchVersion < this.scheduleCenter.getReloadTaskItemFlag(this.currenScheduleServer.getTaskType());
	}
	
	/**判断某个任务对应的线程组是否处于僵尸状态。
	 * true 表示有线程组处于僵尸状态。需要告警。
	 * @param type
	 * @param statMap
	 * @return
	 * @throws Exception
	 */
	private boolean isExistZombieServ(String type,Map<String,Stat> statMap) throws Exception{
		boolean exist =false;
		for(String key:statMap.keySet()){
			Stat s  = statMap.get(key);
			if(this.scheduleCenter.getSystemTime() -s.getMtime()>  this.taskTypeInfo.getHeartBeatRate() * 40)
			{
				log.error("zombie serverList exists! serv="+key+" ,type="+type +"超过40次心跳周期未更新");
				exist=true;
			}
		}
		return exist;
		 
	}
	/**
	 * 根据当前调度服务器的信息,重新计算分配所有的调度任务
	 * 任务的分配是需要加锁,避免数据分配错误。为了避免数据锁带来的负面作用,通过版本号来达到锁的目的
	 * 
	 * 1、获取任务状态的版本号
	 * 2、获取所有的服务器注册信息和任务队列信息
	 * 3、清除已经超过心跳周期的服务器注册信息
	 * 3、重新计算任务分配
	 * 4、更新任务状态的版本号【乐观锁】
	 * 5、根系任务队列的分配信息
	 * @throws Exception 
	 */
	public void assignScheduleTask() throws Exception {
		scheduleCenter.clearExpireScheduleServer(this.currenScheduleServer.getTaskType(),this.taskTypeInfo.getJudgeDeadInterval());
		List<String> serverList = scheduleCenter
				.loadScheduleServerNames(this.currenScheduleServer.getTaskType());
		
		if(scheduleCenter.isLeader(this.currenScheduleServer.getUuid(), serverList)==false){
			if(log.isDebugEnabled()){
			   log.debug(this.currenScheduleServer.getUuid() +":不是负责任务分配的Leader,直接返回");
			}
			return;
		}
		//设置初始化成功标准,避免在leader转换的时候,新增的线程组初始化失败
		scheduleCenter.setInitialRunningInfoSucuss(this.currenScheduleServer.getBaseTaskType(), this.currenScheduleServer.getTaskType(), this.currenScheduleServer.getUuid());
		scheduleCenter.clearTaskItem(this.currenScheduleServer.getTaskType(), serverList);
		scheduleCenter.assignTaskItem(this.currenScheduleServer.getTaskType(),this.currenScheduleServer.getUuid(),this.taskTypeInfo.getMaxTaskItemsOfOneThreadGroup(),serverList);
	}	
	/**
	 * 重新加载当前服务器的任务队列
	 * 1、释放当前服务器持有,但有其它服务器进行申请的任务队列
	 * 2、重新获取当前服务器的处理队列
	 * 
	 * 为了避免此操作的过度,阻塞真正的数据处理能力。系统设置一个重新装载的频率。例如1分钟
	 * 
	 * 特别注意:
	 *   此方法的调用必须是在当前所有任务都处理完毕后才能调用,否则是否任务队列后可能数据被重复处理
	 */
	
	public List<TaskItemDefine> getCurrentScheduleTaskItemList() {
		try{
		if (this.isNeedReloadTaskItem == true) {			
			//特别注意:需要判断数据队列是否已经空了,否则可能在队列切换的时候导致数据重复处理
			//主要是在线程不休眠就加载数据的时候一定需要这个判断
			if (this.processor != null) {
					while (this.processor.isDealFinishAllData() == false) {
						Thread.sleep(50);
					}
			}
			//真正开始处理数据
			synchronized (NeedReloadTaskItemLock) {
				this.getCurrentScheduleTaskItemListNow();
				this.isNeedReloadTaskItem = false;
			}
		}
		this.lastReloadTaskItemListTime = this.scheduleCenter.getSystemTime();		
		return this.currentTaskItemList;		
		}catch(Exception e){
			throw new RuntimeException(e);
		}
	}
	//由于上面在数据执行时有使用到synchronized ,但是心跳线程并没有对应加锁。
	//所以在此方法上加一下synchronized。20151015
	protected synchronized List<TaskItemDefine> getCurrentScheduleTaskItemListNow() throws Exception {
		//如果已经稳定了,理论上不需要加载去扫描所有的叶子结点
		//20151019 by kongxuan.zlj
		try{
			Map<String, Stat> statMap= this.scheduleCenter.getCurrentServerStatList(this.currenScheduleServer.getTaskType());
			//server下面的机器节点的运行时环境是否在刷新,如果
			isExistZombieServ(this.currenScheduleServer.getTaskType(), statMap);
		}catch(Exception e ){
			log.error("zombie serverList exists, Exception:",e);
		}
//		
		
		//获取最新的版本号
		this.lastFetchVersion = this.scheduleCenter.getReloadTaskItemFlag(this.currenScheduleServer.getTaskType());
		log.debug(" this.currenScheduleServer.getTaskType()="+this.currenScheduleServer.getTaskType()+",  need reload="+ isNeedReloadTaskItem);
		try{
			//是否被人申请的队列
			this.scheduleCenter.releaseDealTaskItem(this.currenScheduleServer.getTaskType(), this.currenScheduleServer.getUuid());
			//重新查询当前服务器能够处理的队列
			//为了避免在休眠切换的过程中出现队列瞬间的不一致,先清除内存中的队列
			this.currentTaskItemList.clear();
			this.currentTaskItemList = this.scheduleCenter.reloadDealTaskItem(
					this.currenScheduleServer.getTaskType(), this.currenScheduleServer.getUuid());
			
			//如果超过10个心跳周期还没有获取到调度队列,则报警
			if(this.currentTaskItemList.size() ==0 && 
					scheduleCenter.getSystemTime() - this.lastReloadTaskItemListTime
					> this.taskTypeInfo.getHeartBeatRate() * 20){
				StringBuffer buf =new StringBuffer();
				buf.append("调度服务器");
				buf.append( this.currenScheduleServer.getUuid());
				buf.append("[TASK_TYPE=");
				buf.append( this.currenScheduleServer.getTaskType() );
				buf.append( "]自启动以来,超过20个心跳周期,还 没有获取到分配的任务队列;");
				buf.append("  currentTaskItemList.size() ="+currentTaskItemList.size());
				buf.append(" ,scheduleCenter.getSystemTime()="+scheduleCenter.getSystemTime());
				buf.append(" ,lastReloadTaskItemListTime="+lastReloadTaskItemListTime);
				buf.append(" ,taskTypeInfo.getHeartBeatRate()="+taskTypeInfo.getHeartBeatRate()*10);
				log.warn(buf.toString());
			}
			
			if(this.currentTaskItemList.size() >0){
				 //更新时间戳
				 this.lastReloadTaskItemListTime = scheduleCenter.getSystemTime();
			}
			
			return this.currentTaskItemList;
		}catch(Throwable e){
			this.lastFetchVersion = -1; //必须把把版本号设置小,避免任务加载失败
			if(e instanceof Exception ){
				throw (Exception)e;
			}else{
				throw new Exception(e);
			}
		}
	}	
	public int getTaskItemCount(){
		 return this.taskItemCount;
	}

}