From 8658f8d853a3e56dc042f9eb0a3a38857838137a Mon Sep 17 00:00:00 2001 From: xuxueli <931591021@qq.com> Date: Wed, 6 Mar 2019 21:10:52 +0800 Subject: [PATCH] =?UTF-8?q?-=20Quartz=E8=A7=A6=E5=8F=91=E7=BA=BF=E7=A8=8B?= =?UTF-8?q?=E6=B1=A0=E5=BA=9F=E5=BC=83=E5=B9=B6=E6=9B=BF=E6=8D=A2=E4=B8=BA?= =?UTF-8?q?=20"XxlJobThreadPool"=EF=BC=8C=E9=99=8D=E4=BD=8E=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=E5=88=87=E6=8D=A2=E3=80=81=E5=86=85=E5=AD=98=E5=8D=A0?= =?UTF-8?q?=E7=94=A8=E5=B8=A6=E6=9D=A5=E7=9A=84=E6=B6=88=E8=80=97=EF=BC=8C?= =?UTF-8?q?=E6=8F=90=E9=AB=98=E8=B0=83=E5=BA=A6=E6=80=A7=E8=83=BD=EF=BC=9B?= =?UTF-8?q?=20-=20=E8=B0=83=E5=BA=A6=E7=BA=BF=E7=A8=8B=E6=B1=A0=E9=9A=94?= =?UTF-8?q?=E7=A6=BB=EF=BC=8C=E6=8B=86=E5=88=86=E4=B8=BA"Fast"=E5=92=8C"Sl?= =?UTF-8?q?ow"=E4=B8=A4=E4=B8=AA=E7=BA=BF=E7=A8=8B=E6=B1=A0=EF=BC=8C1?= =?UTF-8?q?=E5=88=86=E9=92=9F=E7=AA=97=E5=8F=A3=E6=9C=9F=E5=86=85=E4=BB=BB?= =?UTF-8?q?=E5=8A=A1=E8=80=97=E6=97=B6=E8=BE=BE500ms=E8=B6=85=E8=BF=8710?= =?UTF-8?q?=E6=AC=A1=EF=BC=8C=E8=AF=A5=E7=AA=97=E5=8F=A3=E6=9C=9F=E5=86=85?= =?UTF-8?q?=E5=88=A4=E5=AE=9A=E4=B8=BA=E6=85=A2=E4=BB=BB=E5=8A=A1=EF=BC=8C?= =?UTF-8?q?=E6=85=A2=E4=BB=BB=E5=8A=A1=E8=87=AA=E5=8A=A8=E9=99=8D=E7=BA=A7?= =?UTF-8?q?=E8=BF=9B=E5=85=A5"Slow"=E7=BA=BF=E7=A8=8B=E6=B1=A0=EF=BC=8C?= =?UTF-8?q?=E9=81=BF=E5=85=8D=E8=80=97=E5=B0=BD=E8=B0=83=E5=BA=A6=E7=BA=BF?= =?UTF-8?q?=E7=A8=8B=EF=BC=8C=E6=8F=90=E9=AB=98=E7=B3=BB=E7=BB=9F=E7=A8=B3?= =?UTF-8?q?=E5=AE=9A=E6=80=A7=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 1 + doc/XXL-JOB官方文档.md | 7 +- .../admin/core/quartz/XxlJobThreadPool.java | 58 ++++++++++++++ .../core/thread/JobTriggerPoolHelper.java | 80 ++++++++++++++++--- .../src/main/resources/quartz.properties | 11 ++- 5 files changed, 139 insertions(+), 18 deletions(-) create mode 100644 xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java diff --git a/README.md b/README.md index 9646d17e..8a6b8fbe 100644 --- a/README.md +++ b/README.md @@ -78,6 +78,7 @@ XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是 - 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台; - 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文; - 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用; +- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;; ## Development diff --git a/doc/XXL-JOB官方文档.md b/doc/XXL-JOB官方文档.md index 70790885..38d6662d 100644 --- a/doc/XXL-JOB官方文档.md +++ b/doc/XXL-JOB官方文档.md @@ -47,6 +47,7 @@ XXL-JOB是一个轻量级分布式任务调度平台,其核心设计目标是 - 30、跨平台:原生提供通用HTTP任务Handler(Bean任务,"HttpJobHandler");业务方只需要提供HTTP链接即可,不限制语言、平台; - 31、国际化:调度中心支持国际化设置,提供中文、英文两种可选语言,默认为中文; - 32、容器化:提供官方docker镜像,并实时更新推送dockerhub,进一步实现产品开箱即用; +- 33、线程池隔离:调度线程池进行隔离拆分,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性;; ### 1.3 发展 @@ -1442,9 +1443,9 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段 - 19、执行器优雅停机优化; - 20、连接池配置优化,增强连接有效性验证; - 21、JobHandler#msg长度限制,修复异常情况下日志超长导致内存溢出的问题; -- 22、[迭代中]任务线程隔离: - - 执行器测异步响应,不存在阻塞不需要隔离; - - 调度中心共用单一调度线程池,可能导致调度阻塞需要线程隔离;调度线程池拆分为Fast/Slow两个,针对调度较慢的执行器地址请求,降级使用Slow线程池;考虑是否可以任务级隔离线程池; +- 22、Quartz触发线程池废弃并替换为 "XxlJobThreadPool",降低线程切换、内存占用带来的消耗,提高调度性能; +- 23、调度线程池隔离,拆分为"Fast"和"Slow"两个线程池,1分钟窗口期内任务耗时达500ms超过10次,该窗口期内判定为慢任务,慢任务自动降级进入"Slow"线程池,避免耗尽调度线程,提高系统稳定性; + ### TODO LIST - 1、任务分片路由:分片采用一致性Hash算法计算出尽量稳定的分片顺序,即使注册机器存在波动也不会引起分批分片顺序大的波动;目前采用IP自然排序,可以满足需求,待定; diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java new file mode 100644 index 00000000..07d44f1c --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/quartz/XxlJobThreadPool.java @@ -0,0 +1,58 @@ +package com.xxl.job.admin.core.quartz; + +import org.quartz.SchedulerConfigException; +import org.quartz.spi.ThreadPool; + +/** + * single thread pool, for async trigger + * + * @author xuxueli 2019-03-06 + */ +public class XxlJobThreadPool implements ThreadPool { + + @Override + public boolean runInThread(Runnable runnable) { + + // async run + runnable.run(); + return true; + + //return false; + } + + @Override + public int blockForAvailableThreads() { + return 1; + } + + @Override + public void initialize() throws SchedulerConfigException { + + } + + @Override + public void shutdown(boolean waitForJobsToComplete) { + + } + + @Override + public int getPoolSize() { + return 1; + } + + @Override + public void setInstanceId(String schedInstId) { + + } + + @Override + public void setInstanceName(String schedName) { + + } + + // support + public void setThreadCount(int count) { + // + } + +} diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java index 7baef43e..7a3a733c 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobTriggerPoolHelper.java @@ -5,10 +5,9 @@ import com.xxl.job.admin.core.trigger.XxlJobTrigger; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; /** * job trigger thread pool helper @@ -21,32 +20,91 @@ public class JobTriggerPoolHelper { // ---------------------- trigger pool ---------------------- - private ThreadPoolExecutor triggerPool = new ThreadPoolExecutor( - 32, - 256, + // fast/slow thread pool + private ThreadPoolExecutor fastTriggerPool = new ThreadPoolExecutor( + 8, + 200, 60L, TimeUnit.SECONDS, new LinkedBlockingQueue(1000), new ThreadFactory() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-triggerPool-" + r.hashCode()); + return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-fastTriggerPool-" + r.hashCode()); + } + }); + + private ThreadPoolExecutor slowTriggerPool = new ThreadPoolExecutor( + 0, + 100, + 60L, + TimeUnit.SECONDS, + new LinkedBlockingQueue(2000), + new ThreadFactory() { + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "xxl-job, admin JobTriggerPoolHelper-slowTriggerPool-" + r.hashCode()); } }); + // job timeout count + private volatile long minTim = System.currentTimeMillis()/60000; // ms > min + private volatile Map jobTimeoutCountMap = new ConcurrentHashMap<>(); + + + /** + * add trigger + */ public void addTrigger(final int jobId, final TriggerTypeEnum triggerType, final int failRetryCount, final String executorShardingParam, final String executorParam) { - triggerPool.execute(new Runnable() { + + // choose thread pool + ThreadPoolExecutor triggerPool_ = fastTriggerPool; + AtomicInteger jobTimeoutCount = jobTimeoutCountMap.get(jobId); + if (jobTimeoutCount!=null && jobTimeoutCount.get() > 10) { // job-timeout 10 times in 1 min + triggerPool_ = slowTriggerPool; + } + + // trigger + triggerPool_.execute(new Runnable() { @Override public void run() { - XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + + long start = System.currentTimeMillis(); + + try { + // do trigger + XxlJobTrigger.trigger(jobId, triggerType, failRetryCount, executorShardingParam, executorParam); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } finally { + + // check timeout-count-map + long minTim_now = System.currentTimeMillis()/60000; + if (minTim != minTim_now) { + minTim = minTim_now; + jobTimeoutCountMap.clear(); + } + + // incr timeout-count-map + long cost = System.currentTimeMillis()-start; + if (cost > 500) { // ob-timeout threshold 500ms + AtomicInteger timeoutCount = jobTimeoutCountMap.put(jobId, new AtomicInteger(1)); + if (timeoutCount != null) { + timeoutCount.incrementAndGet(); + } + } + + } + } }); } public void stop() { //triggerPool.shutdown(); - triggerPool.shutdownNow(); + fastTriggerPool.shutdownNow(); + slowTriggerPool.shutdownNow(); logger.info(">>>>>>>>> xxl-job trigger thread pool shutdown success."); } diff --git a/xxl-job-admin/src/main/resources/quartz.properties b/xxl-job-admin/src/main/resources/quartz.properties index 7a2c53d1..ebe1192b 100644 --- a/xxl-job-admin/src/main/resources/quartz.properties +++ b/xxl-job-admin/src/main/resources/quartz.properties @@ -9,16 +9,19 @@ org.quartz.scheduler.rmi.export: false org.quartz.scheduler.rmi.proxy: false org.quartz.scheduler.wrapJobExecutionInUserTransaction: false -org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool -org.quartz.threadPool.threadCount: 50 -org.quartz.threadPool.threadPriority: 5 -org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true +#org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool +#org.quartz.threadPool.threadCount: 5 +#org.quartz.threadPool.threadPriority: 5 +#org.quartz.threadPool.threadsInheritContextClassLoaderOfInitializingThread: true org.quartz.jobStore.misfireThreshold: 60000 org.quartz.jobStore.maxMisfiresToHandleAtATime: 1 #org.quartz.jobStore.class: org.quartz.simpl.RAMJobStore +# for async trigger +org.quartz.threadPool.class: com.xxl.job.admin.core.quartz.XxlJobThreadPool + # for cluster org.quartz.jobStore.tablePrefix: XXL_JOB_QRTZ_ org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX