From 04abd8847513bb18f4e5421da4787c5daafae8cb Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Tue, 3 Jul 2018 22:11:12 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E5=BA=A6=E5=85=A8=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E5=A4=84=E7=90=86=EF=BC=9A=E4=BB=BB=E5=8A=A1=E8=A7=A6=E5=8F=91?= =?UTF-8?q?=E4=B9=8B=E5=90=8E=EF=BC=8C=E6=8E=A8=E9=80=81=E5=88=B0=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E9=98=9F=E5=88=97=EF=BC=8C=E5=A4=9A=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E5=B9=B6=E5=8F=91=E5=A4=84=E7=90=86=E8=B0=83=E5=BA=A6=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=EF=BC=8C=E6=8F=90=E9=AB=98=E4=BB=BB=E5=8A=A1=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E9=80=9F=E7=8E=87=E7=9A=84=E5=90=8C=E6=97=B6=EF=BC=8C?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E5=9B=A0=E7=BD=91=E7=BB=9C=E9=97=AE=E9=A2=98?= =?UTF-8?q?=E5=AF=BC=E8=87=B4quartz=E8=B0=83=E5=BA=A6=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E9=98=BB=E5=A1=9E=E7=9A=84=E9=97=AE=E9=A2=98=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/XXL-JOB官方文档.md | 1 + .../admin/core/jobbean/RemoteHttpJobBean.java | 5 +- .../core/schedule/XxlJobDynamicScheduler.java | 5 ++ .../core/thread/JobTriggerPoolHelper.java | 59 +++++++++++++++++++ .../admin/service/impl/XxlJobServiceImpl.java | 10 +++- 5 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index f3999883..7c3f2f95 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -1236,6 +1236,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 17、自研Log组件参数占位符改为"{}",并修复打印有参日志时参数不匹配导致报错的问题; - 18、核心依赖Core内部国际化处理; - 19、默认Quartz线程数调整为50; +- 20、调度全异步处理:任务触发之后,推送到调度队列,多线程并发处理调度请求,提高任务调度速率的同时,避免因网络问题导致quartz调度线程阻塞的问题; ### TODO LIST diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index dd6d3754..1aa4e407 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -1,6 +1,6 @@ package com.xxl.job.admin.core.jobbean; -import com.xxl.job.admin.core.trigger.XxlJobTrigger; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import org.quartz.JobExecutionContext; import org.quartz.JobExecutionException; import org.quartz.JobKey; @@ -26,7 +26,8 @@ public class RemoteHttpJobBean extends QuartzJobBean { Integer jobId = Integer.valueOf(jobKey.getName()); // trigger - XxlJobTrigger.trigger(jobId); + //XxlJobTrigger.trigger(jobId); + JobTriggerPoolHelper.trigger(jobId); } } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java index a6b6c2ef..124c0c3a 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java @@ -4,6 +4,7 @@ import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.thread.JobFailMonitorHelper; import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; @@ -93,6 +94,10 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware { } public void destroy(){ + + // admin trigger pool stop + JobTriggerPoolHelper.toStop(); + // admin registry stop JobRegistryMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java new file mode 100644 index 00000000..be1beff8 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -0,0 +1,59 @@ +package com.xxl.job.admin.core.thread; + +import com.xxl.job.admin.core.trigger.XxlJobTrigger; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +/** + * job trigger thread pool helper + * + * @author xuxueli 2018-07-03 21:08:07 + */ +public class JobTriggerPoolHelper { + private static Logger logger = LoggerFactory.getLogger(JobTriggerPoolHelper.class); + + + // ---------------------- trigger pool ---------------------- + + private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor( + 50, + 500, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(100000), + new ThreadPoolExecutor.CallerRunsPolicy()); + + + public void addTrigger(final int jobId){ + triggerPool.execute(new Runnable() { + @Override + public void run() { + XxlJobTrigger.trigger(jobId); + } + }); + } + + public void stop(){ + //triggerPool.shutdown(); + triggerPool.shutdownNow(); + logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); + } + + // ---------------------- helper ---------------------- + + private static JobTriggerPoolHelper helper = new JobTriggerPoolHelper(); + + + public static void trigger(int jobId) { + helper.addTrigger(jobId); + } + + public static void toStop(){ + helper.stop(); + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 30ce3c46..6d7430eb 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -5,6 +5,7 @@ import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; +import com.xxl.job.admin.core.thread.JobTriggerPoolHelper; import com.xxl.job.admin.core.util.I18nUtil; import com.xxl.job.admin.dao.XxlJobGroupDao; import com.xxl.job.admin.dao.XxlJobInfoDao; @@ -275,7 +276,11 @@ public class XxlJobServiceImpl implements XxlJobService { @Override public ReturnT triggerJob(int id) { - XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id); + + JobTriggerPoolHelper.trigger(id); + return ReturnT.SUCCESS; + + /*XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(id); if (xxlJobInfo == null) { return new ReturnT(ReturnT.FAIL_CODE, (I18nUtil.getString("jobinfo_field_id")+I18nUtil.getString("system_unvalid")) ); } @@ -289,7 +294,8 @@ public class XxlJobServiceImpl implements XxlJobService { } catch (SchedulerException e) { logger.error(e.getMessage(), e); return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); - } + }*/ + } @Override