diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java index a4dfe133..5be779ca 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java @@ -69,7 +69,7 @@ public class AdminBizImpl implements AdminBiz { handleMsg.append(log.getHandleMsg()).append("
"); } if (handleCallbackParam.getExecuteResult().getMsg() != null) { - handleMsg.append("执行备注:").append(handleCallbackParam.getExecuteResult().getMsg()); + handleMsg.append(handleCallbackParam.getExecuteResult().getMsg()); } if (childTriggerMsg !=null) { handleMsg.append("
子任务触发备注:").append(childTriggerMsg); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index c10f0582..0b408b29 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -33,12 +33,8 @@ public class ExecutorBizImpl implements ExecutorBiz { public ReturnT kill(int jobId) { // kill handlerThread, and create new one JobThread jobThread = XxlJobExecutor.loadJobThread(jobId); - if (jobThread != null) { - IJobHandler handler = jobThread.getHandler(); - jobThread.toStop("人工手动终止"); - jobThread.interrupt(); - XxlJobExecutor.removeJobThread(jobId); + XxlJobExecutor.removeJobThread(jobId, "人工手动终止"); return ReturnT.SUCCESS; } @@ -56,81 +52,103 @@ public class ExecutorBizImpl implements ExecutorBiz { @Override public ReturnT run(TriggerParam triggerParam) { - // load old thread + // load old:jobHandler + jobThread JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId()); + IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null; + String removeOldReason = null; + // valid:jobHandler + jobThread if (GlueTypeEnum.BEAN==GlueTypeEnum.match(triggerParam.getGlueType())) { - // valid handler - IJobHandler jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); - if (jobHandler==null) { - return new ReturnT(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); - } + // valid old jobThread + if (jobThread != null && jobHandler!=null && jobThread.getHandler() != jobHandler) { + // change handler, need kill old thread + removeOldReason = "更新JobHandler或更换任务模式,终止旧任务线程"; - // valid exists job thread:change handler, need kill old thread - if (jobThread != null && jobThread.getHandler() != jobHandler) { - // kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - XxlJobExecutor.removeJobThread(triggerParam.getJobId()); jobThread = null; + jobHandler = null; } - // make thread: new or exists invalid - if (jobThread == null) { - jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler); + // valid handler + if (jobHandler == null) { + jobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler()); + if (jobHandler == null) { + return new ReturnT(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found."); + } } } else if (GlueTypeEnum.GLUE_GROOVY==GlueTypeEnum.match(triggerParam.getGlueType())) { - // valid exists job thread:change handler or gluesource updated, need kill old thread + // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof GlueJobHandler && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { - // change glue model or gluesource updated, kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - XxlJobExecutor.removeJobThread(triggerParam.getJobId()); + // change handler or gluesource updated, need kill old thread + removeOldReason = "更新任务逻辑或更换任务模式,终止旧任务线程"; + jobThread = null; + jobHandler = null; } - // make thread: new or exists invalid - if (jobThread == null) { - IJobHandler jobHandler = null; + // valid handler + if (jobHandler == null) { try { - jobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); + IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource()); + jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime()); } catch (Exception e) { logger.error("", e); return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); } - jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), new GlueJobHandler(jobHandler, triggerParam.getGlueUpdatetime())); } } else if (GlueTypeEnum.GLUE_SHELL==GlueTypeEnum.match(triggerParam.getGlueType()) || GlueTypeEnum.GLUE_PYTHON==GlueTypeEnum.match(triggerParam.getGlueType()) ) { - // valid exists job thread:change script or gluesource updated, need kill old thread + // valid old jobThread if (jobThread != null && !(jobThread.getHandler() instanceof ScriptJobHandler && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) { - // change glue model or gluesource updated, kill old job thread - jobThread.toStop("更换任务模式或JobHandler,终止旧任务线程"); - jobThread.interrupt(); - XxlJobExecutor.removeJobThread(triggerParam.getJobId()); + // change script or gluesource updated, need kill old thread + removeOldReason = "更新任务逻辑或更换任务模式,终止旧任务线程"; + jobThread = null; + jobHandler = null; } - // make thread: new or exists invalid - if (jobThread == null) { - ScriptJobHandler scriptJobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); - jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), scriptJobHandler); + // valid handler + if (jobHandler == null) { + jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType())); } } else { return new ReturnT(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid."); } + // executor block strategy + if (jobThread != null) { + ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); + if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + // discard when running + if (jobThread.isRunningOrHasQueue()) { + return new ReturnT(ReturnT.FAIL_CODE, "阻塞处理策略-生效:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); + } + } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + // kill running jobThread + if (jobThread.isRunningOrHasQueue()) { + removeOldReason = "阻塞处理策略-生效:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle(); + + jobThread = null; + } + } else { + // just queue trigger + } + } + + // replace thread (new or exists invalid) + if (jobThread == null) { + jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason); + } + // push data to queue - ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); - ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam, blockStrategy); + ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam); return pushResult; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index a5cb447e..1c095442 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -121,19 +121,29 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe // ---------------------------------- job thread repository private static ConcurrentHashMap JobThreadRepository = new ConcurrentHashMap(); - public static JobThread registJobThread(int jobId, IJobHandler handler){ - JobThread jobThread = new JobThread(handler); - jobThread.start(); + public static JobThread registJobThread(int jobId, IJobHandler handler, String removeOldReason){ + JobThread newJobThread = new JobThread(handler); + newJobThread.start(); logger.info(">>>>>>>>>>> xxl-job regist JobThread success, jobId:{}, handler:{}", new Object[]{jobId, handler}); - JobThreadRepository.put(jobId, jobThread); // putIfAbsent | oh my god, map's put method return the old value!!! - return jobThread; + + JobThread oldJobThread = JobThreadRepository.put(jobId, newJobThread); // putIfAbsent | oh my god, map's put method return the old value!!! + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + } + + return newJobThread; + } + public static void removeJobThread(int jobId, String removeOldReason){ + JobThread oldJobThread = JobThreadRepository.remove(jobId); + if (oldJobThread != null) { + oldJobThread.toStop(removeOldReason); + oldJobThread.interrupt(); + } } public static JobThread loadJobThread(int jobId){ JobThread jobThread = JobThreadRepository.get(jobId); return jobThread; } - public static void removeJobThread(int jobId){ - JobThreadRepository.remove(jobId); - } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index eaf85f7c..0cf88862 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -3,7 +3,6 @@ package com.xxl.job.core.thread; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; -import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; @@ -32,7 +31,7 @@ public class JobThread extends Thread{ private boolean toStop = false; private String stopReason; - private boolean running = false; + private boolean running = false; // if running job public JobThread(IJobHandler handler) { @@ -44,35 +43,29 @@ public class JobThread extends Thread{ return handler; } - public ReturnT pushTriggerQueue(TriggerParam triggerParam, ExecutorBlockStrategyEnum blockStrategy) { + /** + * new trigger to queue + * + * @param triggerParam + * @return + */ + public ReturnT pushTriggerQueue(TriggerParam triggerParam) { // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId()); return new ReturnT(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); } - // block strategy - if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { - // discard when running - if (running) { - return new ReturnT(ReturnT.FAIL_CODE, "任务阻塞:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); - } - } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { - // kill running old and clear queue - if (running) { - this.interrupt(); - } - triggerQueue.clear(); - triggerLogIdSet.clear(); - } else { - // just add to queue - } - triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); return ReturnT.SUCCESS; } + /** + * kill job thread + * + * @param stopReason + */ public void toStop(String stopReason) { /** * Thread.interrupt只支持终止线程的阻塞状态(wait、join、sleep), @@ -83,8 +76,15 @@ public class JobThread extends Thread{ this.stopReason = stopReason; } + /** + * is running job + * @return + */ + public boolean isRunningOrHasQueue() { + return running || triggerQueue.size()>0; + } - @Override + @Override public void run() { while(!toStop){ running = false;