diff --git a/.gitignore b/.gitignore index aebe6849b2daf46e3b08b2fc703f4de28a8b9065..eedb2bfa41e7b1311e4550f5439846244d1910a4 100644 --- a/.gitignore +++ b/.gitignore @@ -2,3 +2,5 @@ .classpath .project .settings/ +.idea/ +*.iml \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/ConsoleManager.java b/src/main/java/cn/uncode/schedule/ConsoleManager.java index 8de495118dbbd7b60fc974191f536b47eae88377..b260ef637d0b8eeac33a2984bd26f7cb32acdd6a 100644 --- a/src/main/java/cn/uncode/schedule/ConsoleManager.java +++ b/src/main/java/cn/uncode/schedule/ConsoleManager.java @@ -12,7 +12,7 @@ import cn.uncode.schedule.core.TaskDefine; public class ConsoleManager { - protected static transient Logger log = LoggerFactory.getLogger(ConsoleManager.class); + private static transient Logger log = LoggerFactory.getLogger(ConsoleManager.class); // private static Gson GSON = new GsonBuilder().create(); @@ -20,14 +20,16 @@ public class ConsoleManager { public static ZKScheduleManager getScheduleManager() throws Exception { if(null == ConsoleManager.scheduleManager){ - ConsoleManager.scheduleManager = (ZKScheduleManager)ZKScheduleManager.getApplicationcontext().getBean(ZKScheduleManager.class); + synchronized(ConsoleManager.class) { + ConsoleManager.scheduleManager = ZKScheduleManager.getApplicationcontext().getBean(ZKScheduleManager.class); + } } return ConsoleManager.scheduleManager; } public static void addScheduleTask(TaskDefine taskDefine) { try { - ConsoleManager.scheduleManager.getScheduleDataManager().addTask(taskDefine); + ConsoleManager.getScheduleManager().getScheduleDataManager().addTask(taskDefine); } catch (Exception e) { log.error(e.getMessage(), e); } diff --git a/src/main/java/cn/uncode/schedule/DynamicTaskManager.java b/src/main/java/cn/uncode/schedule/DynamicTaskManager.java index 8c468f4b95bfbd7220fcbf7f699bb79a9df92091..8bf606ac7a122e219cac7ec542a5fcc160fe2609 100644 --- a/src/main/java/cn/uncode/schedule/DynamicTaskManager.java +++ b/src/main/java/cn/uncode/schedule/DynamicTaskManager.java @@ -14,6 +14,7 @@ import org.springframework.aop.framework.AopProxyUtils; import org.springframework.aop.support.AopUtils; import org.springframework.scheduling.Trigger; import org.springframework.scheduling.support.CronTrigger; +import org.springframework.util.Assert; import org.springframework.util.ReflectionUtils; import cn.uncode.schedule.core.ScheduledMethodRunnable; @@ -104,35 +105,50 @@ public class DynamicTaskManager { * @return */ private static ScheduledMethodRunnable buildScheduledRunnable(String targetBean, String targetMethod, String params){ - Object bean = null; - Method method = null; + Object bean; ScheduledMethodRunnable scheduledMethodRunnable = null; try { - ConsoleManager.getScheduleManager(); bean = ZKScheduleManager.getApplicationcontext().getBean(targetBean); - if(bean != null){ - Class clazz = null; - if(AopUtils.isAopProxy(bean)){ - clazz = AopProxyUtils.ultimateTargetClass(bean); - //method = ReflectionUtils.findMethod(AopProxyUtils.ultimateTargetClass(bean), targetMethod); - }else{ - clazz = bean.getClass(); - //method = ReflectionUtils.findMethod(bean.getClass(), targetMethod); - } - if(params != null){ - method = ReflectionUtils.findMethod(clazz, targetMethod,String.class); - }else{ - method = ReflectionUtils.findMethod(clazz, targetMethod); - } - if(method != null){ - scheduledMethodRunnable = new ScheduledMethodRunnable(bean, method, params); - } - } + scheduledMethodRunnable = _buildScheduledRunnable(bean, targetMethod, params); } catch (Exception e) { LOGGER.debug(e.getLocalizedMessage(), e); } return scheduledMethodRunnable; } - + private static ScheduledMethodRunnable buildScheduledRunnable(Object bean, String targetMethod, String params){ + ScheduledMethodRunnable scheduledMethodRunnable = null; + try { + scheduledMethodRunnable = _buildScheduledRunnable(bean, targetMethod, params); + }catch (Exception e){ + LOGGER.debug(e.getLocalizedMessage(), e); + } + return scheduledMethodRunnable; + } + + + private static ScheduledMethodRunnable _buildScheduledRunnable(Object bean, String targetMethod, String params) throws Exception { + + Assert.notNull(bean, "target object must not be null"); + Assert.hasLength(targetMethod, "Method name must not be empty"); + + Method method; + ScheduledMethodRunnable scheduledMethodRunnable; + + Class clazz; + if (AopUtils.isAopProxy(bean)) { + clazz = AopProxyUtils.ultimateTargetClass(bean); + } else { + clazz = bean.getClass(); + } + if (params != null) { + method = ReflectionUtils.findMethod(clazz, targetMethod, String.class); + } else { + method = ReflectionUtils.findMethod(clazz, targetMethod); + } + + Assert.notNull(method, "can not find method named " + targetMethod); + scheduledMethodRunnable = new ScheduledMethodRunnable(bean, method, params); + return scheduledMethodRunnable; + } } diff --git a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java index c18d173edfcd19aa73ecd3726d9bb6b5e1390aea..8359d2660761878d2e2b7b546488fadd0ea514a2 100644 --- a/src/main/java/cn/uncode/schedule/ZKScheduleManager.java +++ b/src/main/java/cn/uncode/schedule/ZKScheduleManager.java @@ -73,11 +73,11 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic private Map isOwnerMap = new ConcurrentHashMap(); private Timer hearBeatTimer; - protected Lock initLock = new ReentrantLock(); - protected boolean isStopSchedule = false; - protected Lock registerLock = new ReentrantLock(); + private Lock initLock = new ReentrantLock(); + private boolean isStopSchedule = false; + private Lock registerLock = new ReentrantLock(); - volatile String errorMessage = "No config Zookeeper connect infomation"; + private volatile String errorMessage = "No config Zookeeper connect information"; private InitialThread initialThread; public ZKScheduleManager() { @@ -93,7 +93,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic } public void reInit(Properties p) throws Exception { - if (this.start == true || this.hearBeatTimer != null) { + if (this.start || this.hearBeatTimer != null) { throw new Exception("调度器有任务处理,不能重新初始化"); } this.init(p); @@ -120,10 +120,10 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic } } - public void rewriteScheduleInfo() throws Exception { + private void rewriteScheduleInfo() throws Exception { registerLock.lock(); try { - if (this.isStopSchedule == true) { + if (this.isStopSchedule) { if (LOGGER.isDebugEnabled()) { LOGGER.debug("外部命令终止调度,不在注册调度服务,避免遗留垃圾数据:" + currenScheduleServer.getUuid()); @@ -134,8 +134,8 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic if (errorMessage != null) { this.currenScheduleServer.setDealInfoDesc(errorMessage); } - if (this.scheduleDataManager - .refreshScheduleServer(this.currenScheduleServer) == false) { + if (!this.scheduleDataManager + .refreshScheduleServer(this.currenScheduleServer)) { // 更新信息失败,清除内存数据后重新注册 this.clearMemoInfo(); this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer); @@ -169,8 +169,8 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic public void assignScheduleTask() throws Exception { scheduleDataManager.clearExpireScheduleServer(); List serverList = scheduleDataManager.loadScheduleServerNames(); - if (scheduleDataManager.isLeader(this.currenScheduleServer.getUuid(), - serverList) == false) { + if (!scheduleDataManager.isLeader(this.currenScheduleServer.getUuid(), + serverList)) { if (LOGGER.isDebugEnabled()) { LOGGER.debug(this.currenScheduleServer.getUuid() + ":不是负责任务分配的Leader,直接返回"); @@ -179,8 +179,9 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic } //黑名单 for(String ip:zkManager.getIpBlacklist()){ - if(serverList.contains(ip)){ - serverList.remove(ip); + int index = serverList.indexOf(ip); + if (index > -1){ + serverList.remove(index); } } // 设置初始化成功标准,避免在leader转换的时候,新增的线程组初始化失败 @@ -197,7 +198,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic try { rewriteScheduleInfo(); // 如果任务信息没有初始化成功,不做任务相关的处理 - if (this.isScheduleServerRegister == false) { + if (!this.isScheduleServerRegister) { return; } @@ -229,7 +230,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic public void initialData() throws Exception { this.zkManager.initial(); this.scheduleDataManager = new ScheduleDataManager4ZK(this.zkManager); - if (this.start == true) { + if (this.start) { // 注册调度管理器 this.scheduleDataManager.registerScheduleServer(this.currenScheduleServer); if (hearBeatTimer == null) { @@ -256,7 +257,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic String name = ScheduleUtil.getTaskNameFormBean(beanNames[0], targetMethod.getName()); boolean isOwner = false; try { - if(isScheduleServerRegister == false){ + if(!isScheduleServerRegister){ Thread.sleep(1000); } if(zkManager.checkZookeeperState()){ @@ -317,7 +318,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic sm.initLock.lock(); try { int count = 0; - while (sm.zkManager.checkZookeeperState() == false) { + while (!sm.zkManager.checkZookeeperState()) { count = count + 1; if (count % 50 == 0) { sm.errorMessage = "Zookeeper connecting ......" @@ -326,7 +327,7 @@ public class ZKScheduleManager extends ThreadPoolTaskScheduler implements Applic log.error(sm.errorMessage); } Thread.sleep(20); - if (this.isStop == true) { + if (this.isStop) { return; } } diff --git a/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java b/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java index f19bbb00299a11282b2dc68026747a31674ad448..4008780cffe788ad9a1ea7db446500098a1ab437 100644 --- a/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java +++ b/src/main/java/cn/uncode/schedule/core/IScheduleDataManager.java @@ -13,7 +13,7 @@ public interface IScheduleDataManager{ /** * 发送心跳信息 - * + * * @param server * @throws Exception */ @@ -29,7 +29,8 @@ public interface IScheduleDataManager{ public boolean isLeader(String uuid,List serverList); - + + public void unRegisterScheduleServer(ScheduleServer server) throws Exception; public void clearExpireScheduleServer() throws Exception; diff --git a/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java b/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java index dd4ea4604a4e18da0b694d04e89d038cd1121147..625f59216a672752b47305842f687d4ce978ae02 100644 --- a/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java +++ b/src/main/java/cn/uncode/schedule/core/ScheduledMethodRunnable.java @@ -4,6 +4,7 @@ import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.lang.reflect.UndeclaredThrowableException; +import cn.uncode.schedule.DynamicTaskManager; import org.springframework.util.ReflectionUtils; public class ScheduledMethodRunnable implements Runnable { diff --git a/src/main/java/cn/uncode/schedule/core/TaskDefine.java b/src/main/java/cn/uncode/schedule/core/TaskDefine.java index 0e64cba1ffdb88fb3463cb0007a844622b146ebc..9bd05e5a6661cc3ffdd42a4b2ef4372e15c4f792 100644 --- a/src/main/java/cn/uncode/schedule/core/TaskDefine.java +++ b/src/main/java/cn/uncode/schedule/core/TaskDefine.java @@ -46,11 +46,8 @@ public class TaskDefine { */ private String type; - public boolean begin(Date sysTime){ - if(null == sysTime){ - return false; - } - return sysTime.after(startTime); + public boolean begin(Date sysTime) { + return null != sysTime && sysTime.after(startTime); } public String getTargetBean() { diff --git a/src/main/java/cn/uncode/schedule/core/Version.java b/src/main/java/cn/uncode/schedule/core/Version.java index 051ca1eaa90c65b5bf374900ca0ce6b7a726e281..e8a1ecafafcfbb4cc97b1e25b4ac388b13e776a4 100644 --- a/src/main/java/cn/uncode/schedule/core/Version.java +++ b/src/main/java/cn/uncode/schedule/core/Version.java @@ -7,17 +7,13 @@ package cn.uncode.schedule.core; */ public class Version { - public final static String version="uncode-schedule-1.0.0"; + private final static String version="uncode-schedule-1.0.0"; public static String getVersion(){ return version; } public static boolean isCompatible(String dataVersion){ - if(version.compareTo(dataVersion)>=0){ - return true; - }else{ - return false; - } + return version.compareTo(dataVersion) >= 0; } } diff --git a/src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java b/src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java deleted file mode 100644 index 47225a3327b35620fc4e560b56bf6e99b71ccedc..0000000000000000000000000000000000000000 --- a/src/main/java/cn/uncode/schedule/local/DynamicTaskManager.java +++ /dev/null @@ -1,130 +0,0 @@ -package cn.uncode.schedule.local; - -import java.lang.reflect.Method; -import java.util.Date; -import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledFuture; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.aop.framework.AopProxyUtils; -import org.springframework.aop.support.AopUtils; -import org.springframework.scheduling.Trigger; -import org.springframework.scheduling.support.CronTrigger; -import org.springframework.scheduling.support.ScheduledMethodRunnable; -import org.springframework.util.ReflectionUtils; - -import cn.uncode.schedule.ConsoleManager; -import cn.uncode.schedule.zk.TaskDefine; - - - -public class DynamicTaskManager { - - private static final transient Logger LOGGER = LoggerFactory.getLogger(DynamicTaskManager.class); - - - private static final Map> SCHEDULE_FUTURES = new ConcurrentHashMap>(); - - - /** - * 启动定时任务 - * @param taskDefine - * @param currentTime - */ - public static void scheduleTask(TaskDefine taskDefine, Date currentTime){ - scheduleTask(taskDefine.getTargetBean(), taskDefine.getTargetMethod(), - taskDefine.getCronExpression(), taskDefine.getStartTime(), taskDefine.getPeriod()); - } - - public static void clearLocalTask(List existsTaskName){ - for(String name:SCHEDULE_FUTURES.keySet()){ - if(!existsTaskName.contains(name)){ - SCHEDULE_FUTURES.get(name).cancel(true); - SCHEDULE_FUTURES.remove(name); - } - } - } - - /** - * 启动定时任务 - * 支持: - * 1 cron时间表达式,立即执行 - * 2 startTime + period,指定时间,定时进行 - * 3 period,定时进行,立即开始 - * 4 startTime,指定时间执行 - * - * @param targetBean - * @param targetMethod - * @param cronExpression - * @param startTime - * @param period - */ - public static void scheduleTask(String targetBean, String targetMethod, String cronExpression, Date startTime, long period){ - String scheduleKey = buildScheduleKey(targetBean, targetMethod); - try { - ScheduledFuture scheduledFuture = null; - ScheduledMethodRunnable scheduledMethodRunnable = buildScheduledRunnable(targetBean, targetMethod); - if(scheduledMethodRunnable != null){ - if (!SCHEDULE_FUTURES.containsKey(scheduleKey)) { - if(StringUtils.isNotEmpty(cronExpression)){ - Trigger trigger = new CronTrigger(cronExpression); - scheduledFuture = ConsoleManager.getScheduleManager().schedule(scheduledMethodRunnable, trigger); - }else if(startTime != null){ - if(period > 0){ - scheduledFuture = ConsoleManager.getScheduleManager().scheduleAtFixedRate(scheduledMethodRunnable, startTime, period); - }else{ - scheduledFuture = ConsoleManager.getScheduleManager().schedule(scheduledMethodRunnable, startTime); - } - }else if(period > 0){ - scheduledFuture = ConsoleManager.getScheduleManager().scheduleAtFixedRate(scheduledMethodRunnable, period); - } - SCHEDULE_FUTURES.put(scheduleKey, scheduledFuture); - LOGGER.debug("Building new schedule task, target bean "+ targetBean + " target method " + targetMethod + "."); - } - }else{ - LOGGER.debug("Bean name is not exists."); - } - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - } - } - - - private static String buildScheduleKey(String targetBean, String targetMethod){ - return targetBean + "#" + targetMethod; - } - - /** - * 封装任务对象 - * @param targetBean - * @param targetMethod - * @return - */ - private static ScheduledMethodRunnable buildScheduledRunnable(String targetBean, String targetMethod){ - Object bean = null; - Method method = null; - ScheduledMethodRunnable scheduledMethodRunnable = null; - try { - bean = ConsoleManager.getScheduleManager().getApplicationcontext().getBean(targetBean); - if(bean != null){ - if(AopUtils.isAopProxy(bean)){ - method = ReflectionUtils.findMethod(AopProxyUtils.ultimateTargetClass(bean), targetMethod); - }else{ - method = ReflectionUtils.findMethod(bean.getClass(), targetMethod); - } - if(method != null){ - scheduledMethodRunnable = new ScheduledMethodRunnable(bean, method); - } - } - } catch (Exception e) { - LOGGER.debug(e.getLocalizedMessage(), e); - } - return scheduledMethodRunnable; - } - - -} diff --git a/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java b/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java index d43fbfae1e3b4664f39f129d9a94c9e906a2c763..6f82cbd96850174c7e8b5eecd9575372b698844d 100644 --- a/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java +++ b/src/main/java/cn/uncode/schedule/util/ScheduleUtil.java @@ -56,13 +56,13 @@ public class ScheduleUtil { return FORMAT.parse(d); } public static String getTaskTypeByBaseAndOwnSign(String baseType,String ownSign){ - if(ownSign.equals(OWN_SIGN_BASE) == true){ + if(ownSign.equals(OWN_SIGN_BASE)){ return baseType; } return baseType+"$" + ownSign; } public static String splitBaseTaskTypeFromTaskType(String taskType){ - if(taskType.indexOf("$") >=0){ + if(taskType.contains("$")){ return taskType.substring(0,taskType.indexOf("$")); }else{ return taskType; @@ -70,7 +70,7 @@ public class ScheduleUtil { } public static String splitOwnsignFromTaskType(String taskType){ - if(taskType.indexOf("$") >=0){ + if(taskType.contains("$")){ return taskType.substring(taskType.indexOf("$")+1); }else{ return OWN_SIGN_BASE; diff --git a/src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java b/src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java deleted file mode 100644 index 377e4a6642723310f3f08cbccdb8c932778475de..0000000000000000000000000000000000000000 --- a/src/main/java/cn/uncode/schedule/zk/IScheduleDataManager.java +++ /dev/null @@ -1,53 +0,0 @@ -package cn.uncode.schedule.zk; - -import java.util.List; - - -/** - * 调度配置中心客户端接口,可以有基于数据库的实现,可以有基于ConfigServer的实现 - * - * @author juny.ye - * - */ -public interface IScheduleDataManager{ - - /** - * 发送心跳信息 - * - * @param server - * @throws Exception - */ - public boolean refreshScheduleServer(ScheduleServer server) throws Exception; - - /** - * 注册服务器 - * - * @param server - * @throws Exception - */ - public void registerScheduleServer(ScheduleServer server) throws Exception; - - - public boolean isLeader(String uuid,List serverList); - - - public void clearExpireScheduleServer() throws Exception; - - - public List loadScheduleServerNames() throws Exception; - - public void assignTask(String currentUuid, List taskServerList) throws Exception; - - public boolean isOwner(String name, String uuid)throws Exception; - - public void addTask(TaskDefine taskDefine)throws Exception; - - public void delTask(String targetBean, String targetMethod)throws Exception; - - public List selectTask()throws Exception; - - public boolean checkLocalTask(String currentUuid)throws Exception; - - - -} \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java index 16b4e089c22976fcc12d3093c690d9fd75d03b10..e461b1d008dff38ee2c54f270b2c94eab58132f7 100644 --- a/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java +++ b/src/main/java/cn/uncode/schedule/zk/ScheduleDataManager4ZK.java @@ -103,11 +103,11 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public void registerScheduleServer(ScheduleServer server) throws Exception { - if(server.isRegister() == true){ + if(server.isRegister()){ throw new Exception(server.getUuid() + " 被重复注册"); } //clearExpireScheduleServer(); - String realPath = null; + String realPath; //此处必须增加UUID作为唯一性保障 StringBuffer id = new StringBuffer(); id.append(server.getIp()).append("$") @@ -149,20 +149,38 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { } } } - - public List loadScheduleServerNames(String taskType)throws Exception { - String zkPath = this.pathServer; - if (this.getZooKeeper().exists(zkPath, false) == null) { - return new ArrayList(); - } - List serverList = this.getZooKeeper().getChildren(zkPath, false); - Collections.sort(serverList, new Comparator() { - public int compare(String u1, String u2) { - return u1.substring(u1.lastIndexOf("$") + 1).compareTo( - u2.substring(u2.lastIndexOf("$") + 1)); + + + @Override + public void unRegisterScheduleServer(ScheduleServer server) throws Exception { + List serverList = this.loadScheduleServerNames(); + + if(server.isRegister() && this.isLeader(server.getUuid(), serverList)){ + //delete task + String zkPath = this.pathTask; + String serverPath = this.pathServer; + + if(this.getZooKeeper().exists(zkPath,false)== null){ + this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); } - }); - return serverList; + + //get all task + List children = this.getZooKeeper().getChildren(zkPath, false); + if(null != children && children.size() > 0){ + for (String taskName : children) { + String taskPath = zkPath + "/" + taskName; + if (this.getZooKeeper().exists(taskPath, false) != null) { + ZKTools.deleteTree(this.getZooKeeper(), taskPath + "/" + server.getUuid()); + } + } + } + + //删除 + if (this.getZooKeeper().exists(this.pathServer, false) == null) { + ZKTools.deleteTree(this.getZooKeeper(), serverPath + serverPath + "/" + server.getUuid()); + } + server.setRegister(false); + } } public List loadScheduleServerNames() throws Exception { @@ -180,11 +198,11 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { }); return serverList; } - + @Override public void assignTask(String currentUuid, List taskServerList) throws Exception { - if(this.isLeader(currentUuid,taskServerList)==false){ + if(!this.isLeader(currentUuid, taskServerList)){ if(LOG.isDebugEnabled()){ LOG.debug(currentUuid +":不是负责任务分配的Leader,直接返回"); } @@ -204,30 +222,29 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { } List children = this.getZooKeeper().getChildren(zkPath, false); if(null != children && children.size() > 0){ - for(int i = 0; i < children.size(); i++){ - String taskName = children.get(i); + for (String taskName : children) { String taskPath = zkPath + "/" + taskName; - if(this.getZooKeeper().exists(taskPath, false) == null){ - this.getZooKeeper().create(taskPath, null, this.zkManager.getAcl(),CreateMode.PERSISTENT); + if (this.getZooKeeper().exists(taskPath, false) == null) { + this.getZooKeeper().create(taskPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); } - + List taskServerIds = this.getZooKeeper().getChildren(taskPath, false); - if(null == taskServerIds || taskServerIds.size() == 0){ + if (null == taskServerIds || taskServerIds.size() == 0) { assignServer2Task(taskServerList, taskPath); - }else{ + } else { boolean hasAssignSuccess = false; - for(String serverId:taskServerIds){ - if(taskServerList.contains(serverId)){ + for (String serverId : taskServerIds) { + if (taskServerList.contains(serverId)) { hasAssignSuccess = true; continue; } ZKTools.deleteTree(this.getZooKeeper(), taskPath + "/" + serverId); } - if(hasAssignSuccess == false){ + if (!hasAssignSuccess) { assignServer2Task(taskServerList, taskPath); } } - + } }else{ if(LOG.isDebugEnabled()){ @@ -243,9 +260,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { String serverId = taskServerList.get(index); this.getZooKeeper().create(taskPath + "/" + serverId, null, this.zkManager.getAcl(),CreateMode.PERSISTENT); if(LOG.isDebugEnabled()){ - StringBuffer buffer = new StringBuffer(); - buffer.append("Assign server [").append(serverId).append("]").append(" to task [").append(taskPath).append("]"); - LOG.debug(buffer.toString()); + LOG.debug("Assign server [" + serverId + "]" + " to task [" + taskPath + "]"); } } @@ -253,7 +268,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { return uuid.equals(getLeader(serverList)); } - public String getLeader(List serverList){ + private String getLeader(List serverList){ if(serverList == null || serverList.size() ==0){ return ""; } @@ -270,11 +285,11 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { return leader; } - public long getSystemTime(){ + private long getSystemTime(){ return this.zkBaseTime + ( System.currentTimeMillis() - this.loclaBaseTime); } - class TimestampTypeAdapter implements JsonSerializer, JsonDeserializer{ + private class TimestampTypeAdapter implements JsonSerializer, JsonDeserializer{ public JsonElement serialize(Timestamp src, Type arg1, JsonSerializationContext arg2) { DateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String dateFormatAsString = format.format(new Date(src.getTime())); @@ -320,18 +335,12 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { @Override public boolean isExistsTask(TaskDefine taskDefine) throws Exception{ String zkPath = this.pathTask+ "/" + taskDefine.stringKey(); - if(this.getZooKeeper().exists(zkPath, false) != null){ - return true; - } - return false; + return this.getZooKeeper().exists(zkPath, false) != null; } @Override public void addTask(TaskDefine taskDefine) throws Exception { String zkPath = this.pathTask; - if(this.getZooKeeper().exists(zkPath,false)== null){ - this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); - } zkPath = zkPath + "/" + taskDefine.stringKey(); if(this.getZooKeeper().exists(zkPath, false) == null){ this.getZooKeeper().create(zkPath, null, this.zkManager.getAcl(), CreateMode.PERSISTENT); @@ -368,8 +377,8 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { String zkPath = this.pathTask; List taskDefines = new ArrayList(); if(this.getZooKeeper().exists(zkPath,false) != null){ - List childrens = this.getZooKeeper().getChildren(zkPath, false); - for(String child:childrens){ + List childes = this.getZooKeeper().getChildren(zkPath, false); + for(String child:childes){ byte[] data = this.getZooKeeper().getData(zkPath+"/"+child, null, null); TaskDefine taskDefine = null; if (null != data) { @@ -378,15 +387,16 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { taskDefine.setType("uncode task"); }else{ String[] names = child.split("#"); - if(names != null && StringUtils.isNotEmpty(names[0])){ + if(StringUtils.isNotEmpty(names[0])){ taskDefine = new TaskDefine(); taskDefine.setTargetBean(names[0]); taskDefine.setTargetMethod(names[1]); taskDefine.setType("quartz/spring task"); } } + List sers = this.getZooKeeper().getChildren(zkPath+"/"+child, false); - if(sers != null && sers.size() > 0){ + if(taskDefine != null && sers != null && sers.size() > 0){ taskDefine.setCurrentServer(sers.get(0)); } taskDefines.add(taskDefine); @@ -402,8 +412,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { List children = this.getZooKeeper().getChildren(zkPath, false); List ownerTask = new ArrayList(); if(null != children && children.size() > 0){ - for(int i = 0; i < children.size(); i++){ - String taskName = children.get(i); + for (String taskName : children) { if (isOwner(taskName, currentUuid)) { String taskPath = zkPath + "/" + taskName; byte[] data = this.getZooKeeper().getData(taskPath, null, null); @@ -413,7 +422,7 @@ public class ScheduleDataManager4ZK implements IScheduleDataManager { ownerTask.add(taskName); DynamicTaskManager.scheduleTask(taskDefine, new Date(getSystemTime())); } - } + } } } DynamicTaskManager.clearLocalTask(ownerTask); diff --git a/src/main/java/cn/uncode/schedule/zk/ScheduleServer.java b/src/main/java/cn/uncode/schedule/zk/ScheduleServer.java deleted file mode 100644 index 7ec1ea669a556d76f30bfa944fe4b7454d304572..0000000000000000000000000000000000000000 --- a/src/main/java/cn/uncode/schedule/zk/ScheduleServer.java +++ /dev/null @@ -1,195 +0,0 @@ -package cn.uncode.schedule.zk; - -import java.sql.Timestamp; -import java.util.UUID; - -import cn.uncode.schedule.util.ScheduleUtil; - - -/** - * 调度服务器信息定义 - * - * @author juny.ye - * - */ -public class ScheduleServer { - /** - * 全局唯一编号 - */ - private String uuid; - - - private String ownSign; - /** - * 机器IP地址 - */ - private String ip; - - /** - * 机器名称 - */ - private String hostName; - - /** - * 服务开始时间 - */ - private Timestamp registerTime; - /** - * 最后一次心跳通知时间 - */ - private Timestamp heartBeatTime; - /** - * 最后一次取数据时间 - */ - private Timestamp lastFetchDataTime; - /** - * 处理描述信息,例如读取的任务数量,处理成功的任务数量,处理失败的数量,处理耗时 - * FetchDataCount=4430,FetcheDataNum=438570,DealDataSucess=438570,DealDataFail=0,DealSpendTime=651066 - */ - private String dealInfoDesc; - - private String nextRunStartTime; - - private String nextRunEndTime; - /** - * 配置中心的当前时间 - */ - private Timestamp centerServerTime; - - /** - * 数据版本号 - */ - private long version; - - private boolean isRegister; - - public ScheduleServer() { - - } - - public static ScheduleServer createScheduleServer(String aOwnSign){ - long currentTime = System.currentTimeMillis(); - ScheduleServer result = new ScheduleServer(); - result.ownSign = aOwnSign; - result.ip = ScheduleUtil.getLocalIP(); - result.hostName = ScheduleUtil.getLocalHostName(); - result.registerTime = new Timestamp(currentTime); - result.heartBeatTime = null; - result.dealInfoDesc = "调度初始化"; - result.version = 0; - result.uuid = result.ip - + "$" - + (UUID.randomUUID().toString().replaceAll("-", "") - .toUpperCase()); - return result; - } - - public String getUuid() { - return uuid; - } - - public void setUuid(String uuid) { - this.uuid = uuid; - } - - public long getVersion() { - return version; - } - - public void setVersion(long version) { - this.version = version; - } - - - public Timestamp getRegisterTime() { - return registerTime; - } - - public void setRegisterTime(Timestamp registerTime) { - this.registerTime = registerTime; - } - - public Timestamp getHeartBeatTime() { - return heartBeatTime; - } - - public void setHeartBeatTime(Timestamp heartBeatTime) { - this.heartBeatTime = heartBeatTime; - } - - public Timestamp getLastFetchDataTime() { - return lastFetchDataTime; - } - - public void setLastFetchDataTime(Timestamp lastFetchDataTime) { - this.lastFetchDataTime = lastFetchDataTime; - } - - public String getDealInfoDesc() { - return dealInfoDesc; - } - - public void setDealInfoDesc(String dealInfoDesc) { - this.dealInfoDesc = dealInfoDesc; - } - - public String getIp() { - return ip; - } - - public void setIp(String ip) { - this.ip = ip; - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - - public Timestamp getCenterServerTime() { - return centerServerTime; - } - - public void setCenterServerTime(Timestamp centerServerTime) { - this.centerServerTime = centerServerTime; - } - - public String getNextRunStartTime() { - return nextRunStartTime; - } - - public void setNextRunStartTime(String nextRunStartTime) { - this.nextRunStartTime = nextRunStartTime; - } - - public String getNextRunEndTime() { - return nextRunEndTime; - } - - public void setNextRunEndTime(String nextRunEndTime) { - this.nextRunEndTime = nextRunEndTime; - } - - public String getOwnSign() { - return ownSign; - } - - public void setOwnSign(String ownSign) { - this.ownSign = ownSign; - } - - - public void setRegister(boolean isRegister) { - this.isRegister = isRegister; - } - - public boolean isRegister() { - return isRegister; - } - - -} \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/zk/TaskDefine.java b/src/main/java/cn/uncode/schedule/zk/TaskDefine.java deleted file mode 100644 index 5cb3a00bab632f71b478f620897102821433e3d5..0000000000000000000000000000000000000000 --- a/src/main/java/cn/uncode/schedule/zk/TaskDefine.java +++ /dev/null @@ -1,101 +0,0 @@ -package cn.uncode.schedule.zk; - -import java.util.Date; - -/** - * 任务定义,提供关键信息给使用者 - * @author juny.ye - * - */ -public class TaskDefine { - - /** - * 目标bean - */ - private String targetBean; - - /** - * 目标方法 - */ - private String targetMethod; - - /** - * cron表达式 - */ - private String cronExpression; - - /** - * 开始时间 - */ - private Date startTime; - - /** - * 周期(秒) - */ - private long period; - - private String currentServer; - - - - public boolean begin(Date sysTime){ - if(null == sysTime){ - return false; - } - return sysTime.after(startTime); - } - - public String getTargetBean() { - return targetBean; - } - - public void setTargetBean(String targetBean) { - this.targetBean = targetBean; - } - - public String getTargetMethod() { - return targetMethod; - } - - public void setTargetMethod(String targetMethod) { - this.targetMethod = targetMethod; - } - - public String getCronExpression() { - return cronExpression; - } - - public void setCronExpression(String cronExpression) { - this.cronExpression = cronExpression; - } - - public Date getStartTime() { - return startTime; - } - - public void setStartTime(Date startTime) { - this.startTime = startTime; - } - - public long getPeriod() { - return period; - } - - public void setPeriod(long period) { - this.period = period; - } - - public String getCurrentServer() { - return currentServer; - } - - public void setCurrentServer(String currentServer) { - this.currentServer = currentServer; - } - - - - - - -} \ No newline at end of file diff --git a/src/main/java/cn/uncode/schedule/zk/Version.java b/src/main/java/cn/uncode/schedule/zk/Version.java deleted file mode 100644 index 1c0663abb085b5f402833eabe74fe137fccdcdbc..0000000000000000000000000000000000000000 --- a/src/main/java/cn/uncode/schedule/zk/Version.java +++ /dev/null @@ -1,23 +0,0 @@ -package cn.uncode.schedule.zk; - -/** - * - * @author juny.ye - * - */ -public class Version { - - public final static String version="uncode-schedule-1.0.0"; - - public static String getVersion(){ - return version; - } - public static boolean isCompatible(String dataVersion){ - if(version.compareTo(dataVersion)>=0){ - return true; - }else{ - return false; - } - } - -} diff --git a/src/main/java/cn/uncode/schedule/zk/ZKManager.java b/src/main/java/cn/uncode/schedule/zk/ZKManager.java index 4dbcad7adee0af7df573b1aad951822beacb773e..b06e19b3de9985b0d977e05291f3422ce8fedbb9 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKManager.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKManager.java @@ -35,8 +35,8 @@ public class ZKManager{ private ZooKeeper zk; private List acl = new ArrayList(); private Properties properties; - private boolean isCheckParentPath = true; - public enum keys { + + private enum keys { zkConnectString, rootPath, userName, password, zkSessionTimeout, autoRegisterTask, ipBlacklist } @@ -49,7 +49,7 @@ public class ZKManager{ * 重连zookeeper * @throws Exception */ - public synchronized void reConnection() throws Exception{ + private synchronized void reConnection() throws Exception{ if (this.zk != null) { this.zk.close(); this.zk = null; @@ -100,9 +100,10 @@ public class ZKManager{ this.zk.close(); } - public String getRootPath(){ + String getRootPath(){ return this.properties.getProperty(keys.rootPath.toString()); } + public List getIpBlacklist(){ List ips = new ArrayList(); String list = this.properties.getProperty(keys.ipBlacklist.toString()); @@ -114,67 +115,63 @@ public class ZKManager{ public String getConnectStr(){ return this.properties.getProperty(keys.zkConnectString.toString()); } - public boolean isAutoRegisterTask(){ + + boolean isAutoRegisterTask(){ String autoRegisterTask = this.properties.getProperty(keys.autoRegisterTask.toString()); if(StringUtils.isNotEmpty(autoRegisterTask)){ return Boolean.valueOf(autoRegisterTask); } return true; } + public boolean checkZookeeperState() throws Exception{ return zk != null && zk.getState() == States.CONNECTED; } + public void initial() throws Exception { //当zk状态正常后才能调用 + checkParent(zk,this.getRootPath()); if(zk.exists(this.getRootPath(), false) == null){ ZKTools.createPath(zk, this.getRootPath(), CreateMode.PERSISTENT, acl); - if(isCheckParentPath == true){ - checkParent(zk,this.getRootPath()); - } //设置版本信息 zk.setData(this.getRootPath(),Version.getVersion().getBytes(),-1); }else{ //先校验父亲节点,本身是否已经是schedule的目录 - if(isCheckParentPath == true){ - checkParent(zk,this.getRootPath()); - } byte[] value = zk.getData(this.getRootPath(), false, null); if(value == null){ zk.setData(this.getRootPath(),Version.getVersion().getBytes(),-1); }else{ String dataVersion = new String(value); - if(Version.isCompatible(dataVersion)==false){ + if(!Version.isCompatible(dataVersion)){ throw new Exception("TBSchedule程序版本 "+ Version.getVersion() +" 不兼容Zookeeper中的数据版本 " + dataVersion ); } log.info("当前的程序版本:" + Version.getVersion() + " 数据版本: " + dataVersion); } } } - public static void checkParent(ZooKeeper zk, String path) throws Exception { + private static void checkParent(ZooKeeper zk, String path) throws Exception { String[] list = path.split("/"); String zkPath = ""; for (int i =0;i< list.length -1;i++){ String str = list[i]; - if (str.equals("") == false) { + if (StringUtils.isNotEmpty(str)) { zkPath = zkPath + "/" + str; if (zk.exists(zkPath, false) != null) { byte[] value = zk.getData(zkPath, false, null); - if(value != null){ - String tmpVersion = new String(value); - if(tmpVersion.indexOf("uncode-schedule-") >=0){ + if(value != null && new String(value).contains("uncode-schedule-")){ throw new Exception("\"" + zkPath +"\" is already a schedule instance's root directory, its any subdirectory cannot as the root directory of others"); } } } - } } } - public List getAcl() { + List getAcl() { return acl; } - public ZooKeeper getZooKeeper() throws Exception { - if(this.checkZookeeperState()==false){ + + ZooKeeper getZooKeeper() throws Exception { + if(!this.checkZookeeperState()){ reConnection(); } return this.zk; diff --git a/src/main/java/cn/uncode/schedule/zk/ZKTools.java b/src/main/java/cn/uncode/schedule/zk/ZKTools.java index 63044262b2cc66f2f67e6dd037617a21142ad92a..8695fa9425b8d9b26f56b6aaf87a6632c0db0b12 100644 --- a/src/main/java/cn/uncode/schedule/zk/ZKTools.java +++ b/src/main/java/cn/uncode/schedule/zk/ZKTools.java @@ -5,6 +5,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; +import org.apache.commons.lang3.StringUtils; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.data.ACL; @@ -17,11 +18,11 @@ import org.apache.zookeeper.data.Stat; * */ public class ZKTools { - public static void createPath(ZooKeeper zk, String path,CreateMode createMode, List acl) throws Exception { + static void createPath(ZooKeeper zk, String path, CreateMode createMode, List acl) throws Exception { String[] list = path.split("/"); String zkPath = ""; for (String str : list) { - if (str.equals("") == false) { + if (StringUtils.isNotEmpty(str)) { zkPath = zkPath + "/" + str; if (zk.exists(zkPath, false) == null) { zk.create(zkPath, null, acl, createMode); @@ -49,7 +50,7 @@ public class ZKTools { } } - public static String[] getTree(ZooKeeper zk,String path) throws Exception{ + private static String[] getTree(ZooKeeper zk, String path) throws Exception{ if(zk.exists(path, false) == null){ return new String[0]; } @@ -59,15 +60,15 @@ public class ZKTools { while(index < dealList.size()){ String tempPath = dealList.get(index); List children = zk.getChildren(tempPath, false); - if(tempPath.equalsIgnoreCase("/") == false){ + if(!tempPath.equalsIgnoreCase("/")){ tempPath = tempPath +"/"; } Collections.sort(children); - for(int i = children.size() -1;i>=0;i--){ - dealList.add(index+1, tempPath + children.get(i)); + for (int i = children.size() - 1; i >= 0; i--) { + dealList.add(index + 1, tempPath + children.get(i)); } index++; } - return (String[])dealList.toArray(new String[0]); + return dealList.toArray(new String[0]); } } diff --git a/src/test/java/cn/uncode/schedule/ZookeeperTest.java b/src/test/java/cn/uncode/schedule/ZookeeperTest.java index 08453297e9aff857bbe1b8ca4c29d8d7ea9db4e4..eeed379afa2d0799c4c47390b3aaf23ef2d73bdb 100644 --- a/src/test/java/cn/uncode/schedule/ZookeeperTest.java +++ b/src/test/java/cn/uncode/schedule/ZookeeperTest.java @@ -15,7 +15,7 @@ import org.apache.zookeeper.server.auth.DigestAuthenticationProvider; import org.junit.Test; import org.springframework.context.support.ClassPathXmlApplicationContext; -import cn.uncode.schedule.zk.TaskDefine; +import cn.uncode.schedule.core.TaskDefine; import cn.uncode.schedule.zk.ZKTools; @@ -31,8 +31,7 @@ public class ZookeeperTest { try { StringWriter writer = new StringWriter(); ZKTools.printTree(zk, "/uncode/schedule", writer, ""); - System.out - .println(i++ + "----" + writer.getBuffer().toString()); + System.out.println(i++ + "----" + writer.toString()); Thread.sleep(2000); } catch (Exception e) { System.out.println(e.getMessage()); @@ -45,7 +44,7 @@ public class ZookeeperTest { ZooKeeper zk = new ZooKeeper("localhost:2181", 3000, null); StringWriter writer = new StringWriter(); ZKTools.printTree(zk, "/", writer, "\n"); - System.out.println(writer.getBuffer().toString()); + System.out.println(writer.toString()); } @Test @@ -65,8 +64,7 @@ public class ZookeeperTest { List acls = new ArrayList(); zk.addAuthInfo("digest", "ScheduleAdmin:password".getBytes()); acls.add(new ACL(ZooDefs.Perms.ALL, new Id("digest", - DigestAuthenticationProvider - .generateDigest("ScheduleAdmin:password")))); + DigestAuthenticationProvider.generateDigest("ScheduleAdmin:password")))); acls.add(new ACL(ZooDefs.Perms.READ, Ids.ANYONE_ID_UNSAFE)); zk.create("/uncode/schedule/task/taskObj#print", new byte[0], acls, CreateMode.PERSISTENT); zk.getData("/uncode/schedule/task/taskObj#print", false, null);