diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index f3999883..7c3f2f95 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1236,6 +1236,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 17、自研Log组件参数占位符改为"{}",并修复打印有参日志时参数不匹配导致报错的问题; - 18、核心依赖Core内部国际化处理; - 19、默认Quartz线程数调整为50; +- 20、调度全异步处理:任务触发之后,推送到调度队列,多线程并发处理调度请求,提高任务调度速率的同时,避免因网络问题导致quartz调度线程阻塞的问题; ### TODO LIST diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index dd6d3754..1aa4e407 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -1,6 +1,6 @@ package com.xxl.job.admin.core.jobbean; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; @@ -26,7 +26,8 @@ public class RemoteHttpJobBean extends QuartzJobBean { Integer jobId = Integer.valueOf(jobKey.getName()); // trigger - XxlJobTrigger.trigger(jobId); + //XxlJobTrigger.trigger(jobId); + JobTriggerPoolHelper.trigger(jobId); } } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java index a6b6c2ef..124c0c3a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java @@ -4,6 +4,7 @@ import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.thread.JobFailMonitorHelper; import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; @@ -93,6 +94,10 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware { } public void destroy(){ + + // admin trigger pool stop + JobTriggerPoolHelper.toStop(); + // admin registry stop JobRegistryMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java new file mode 100644 index 00000000..be1beff8 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -0,0 +1,59 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.trigger.XxlJobTrigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * job trigger thread pool helper + * + * @author xuxueli 2018-07-03 21:08:07 + */ +public class JobTriggerPoolHelper { + private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); + + + // ---------------------- trigger pool ---------------------- + + private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor( + 50, + 500, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(100000), + new ThreadPoolExecutor.CallerRunsPolicy()); + + + public void addTrigger(final int jobId){ + triggerPool.execute(new Runnable() { + @Override + public void run() { + XxlJobTrigger.trigger(jobId); + } + }); + } + + public void stop(){ + //triggerPool.shutdown(); + triggerPool.shutdownNow(); + logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); + } + + // ---------------------- helper ---------------------- + + private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); + + + public static void trigger(int jobId) { + helper.addTrigger(jobId); + } + + public static void toStop(){ + helper.stop(); + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 30ce3c46..6d7430eb 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; @@ -275,7 +276,11 @@ public class XxlJobServiceImpl implements XxlJobService { @Override public ReturnT triggerJob(int id) { - XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id); + + JobTriggerPoolHelper.trigger(id); + return ReturnT.SUCCESS; + + /*XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id); if (xxlJobInfo == null) { return new ReturnT(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_id")+I18nUtil.getString("system_unvalid")) ); } @@ -289,7 +294,8 @@ public class XxlJobServiceImpl implements XxlJobService { } catch (SchedulerException e) { logger.error(e.getMessage(), e); return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); - } + }*/ + } @Override