diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index 34d789dd..6778034a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -61,6 +61,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { triggerParam.setJobId(jobInfo.getId()); triggerParam.setExecutorHandler(jobInfo.getExecutorHandler()); triggerParam.setExecutorParams(jobInfo.getExecutorParam()); + triggerParam.setExecutorBlockStrategy(jobInfo.getExecutorBlockStrategy()); triggerParam.setGlueType(jobInfo.getGlueType()); triggerParam.setGlueSource(jobInfo.getGlueSource()); triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java index 2776a832..c10f0582 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/impl/ExecutorBizImpl.java @@ -4,6 +4,7 @@ import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.model.LogResult; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.executor.XxlJobExecutor; import com.xxl.job.core.glue.GlueFactory; import com.xxl.job.core.glue.GlueTypeEnum; @@ -128,8 +129,9 @@ public class ExecutorBizImpl implements ExecutorBiz { } // push data to queue - jobThread.pushTriggerQueue(triggerParam); - return ReturnT.SUCCESS; + ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null); + ReturnT pushResult = jobThread.pushTriggerQueue(triggerParam, blockStrategy); + return pushResult; } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java index abf24db3..48bf0569 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -13,6 +13,7 @@ public class TriggerParam implements Serializable{ private String executorHandler; private String executorParams; + private String executorBlockStrategy; private String glueType; private String glueSource; @@ -47,6 +48,14 @@ public class TriggerParam implements Serializable{ this.executorParams = executorParams; } + public String getExecutorBlockStrategy() { + return executorBlockStrategy; + } + + public void setExecutorBlockStrategy(String executorBlockStrategy) { + this.executorBlockStrategy = executorBlockStrategy; + } + public String getGlueType() { return glueType; } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index 2fc4d3d0..eaf85f7c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -3,6 +3,7 @@ package com.xxl.job.core.thread; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; +import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.log.XxlJobFileAppender; import com.xxl.job.core.log.XxlJobLogger; @@ -31,6 +32,9 @@ public class JobThread extends Thread{ private boolean toStop = false; private String stopReason; + private boolean running = false; + + public JobThread(IJobHandler handler) { this.handler = handler; triggerQueue = new LinkedBlockingQueue(); @@ -40,14 +44,33 @@ public class JobThread extends Thread{ return handler; } - public void pushTriggerQueue(TriggerParam triggerParam) { + public ReturnT pushTriggerQueue(TriggerParam triggerParam, ExecutorBlockStrategyEnum blockStrategy) { + // avoid repeat if (triggerLogIdSet.contains(triggerParam.getLogId())) { logger.debug("repeate trigger job, logId:{}", triggerParam.getLogId()); - return; + return new ReturnT(ReturnT.FAIL_CODE, "repeate trigger job, logId:" + triggerParam.getLogId()); + } + + // block strategy + if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) { + // discard when running + if (running) { + return new ReturnT(ReturnT.FAIL_CODE, "任务阻塞:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle()); + } + } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) { + // kill running old and clear queue + if (running) { + this.interrupt(); + } + triggerQueue.clear(); + triggerLogIdSet.clear(); + } else { + // just add to queue } triggerLogIdSet.add(triggerParam.getLogId()); triggerQueue.add(triggerParam); + return ReturnT.SUCCESS; } public void toStop(String stopReason) { @@ -59,15 +82,17 @@ public class JobThread extends Thread{ this.toStop = true; this.stopReason = stopReason; } - - int i = 1; + + @Override public void run() { while(!toStop){ + running = false; try { // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); if (triggerParam!=null) { + running = true; triggerLogIdSet.remove(triggerParam.getLogId()); // parse param