From 8a6d462d3b6c118378938863e79aaf3f3d9aad41 Mon Sep 17 00:00:00 2001 From: "xueli.xue" Date: Wed, 10 May 2017 23:09:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=89=A7=E8=A1=8C=E5=99=A8=E4=B8=8E=E6=95=B0?= =?UTF-8?q?=E6=8D=AE=E5=BA=93=E5=BD=BB=E5=BA=95=E8=A7=A3=E8=80=A6=EF=BC=8C?= =?UTF-8?q?=E4=BD=86=E6=98=AF=E6=89=A7=E8=A1=8C=E5=99=A8=E9=9C=80=E8=A6=81?= =?UTF-8?q?=E9=85=8D=E7=BD=AE=E8=B0=83=E5=BA=A6=E4=B8=AD=E5=BF=83=E9=9B=86?= =?UTF-8?q?=E7=BE=A4=E5=9C=B0=E5=9D=80=E3=80=82=E8=B0=83=E5=BA=A6=E4=B8=AD?= =?UTF-8?q?=E5=BF=83=E6=8F=90=E4=BE=9BAPI=E4=BE=9B=E6=89=A7=E8=A1=8C?= =?UTF-8?q?=E5=99=A8=E5=9B=9E=E8=B0=83=E5=92=8C=E5=BF=83=E8=B7=B3=E6=B3=A8?= =?UTF-8?q?=E5=86=8C=E6=9C=8D=E5=8A=A1=EF=BC=8C=E5=8F=96=E6=B6=88=E8=B0=83?= =?UTF-8?q?=E5=BA=A6=E4=B8=AD=E5=BF=83=E5=86=85=E9=83=A8jetty=EF=BC=8C?= =?UTF-8?q?=E5=BF=83=E8=B7=B3=E5=91=A8=E6=9C=9F=E8=B0=83=E6=95=B4=E4=B8=BA?= =?UTF-8?q?30s=EF=BC=8C=E5=BF=83=E8=B7=B3=E5=A4=B1=E6=95=88=E4=B8=BA?= =?UTF-8?q?=E4=B8=89=E5=80=8D=E5=BF=83=E8=B7=B3=EF=BC=9B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../JobApiController.java} | 59 +++++++-- .../admin/controller/JobGroupController.java | 10 +- .../admin/core/jobbean/RemoteHttpJobBean.java | 22 +--- .../core/schedule/XxlJobDynamicScheduler.java | 37 +----- ...per.java => JobRegistryMonitorHelper.java} | 22 ++-- .../admin/service/impl/XxlJobServiceImpl.java | 11 +- .../applicationcontext-xxl-job-admin.xml | 4 - .../resources/spring/springmvc-context.xml | 2 +- .../main/resources/xxl-job-admin.properties | 4 - .../template/jobgroup/jobgroup.index.ftl | 2 - .../com/xxl/job/dao/impl/AdminApiTest.java | 23 ++++ .../java/com/xxl/job/core/biz/AdminBiz.java | 13 -- .../core/biz/model/HandleCallbackParam.java | 23 ++-- .../xxl/job/core/biz/model/RegistryParam.java | 54 ++++++++ .../com/xxl/job/core/biz/model/ReturnT.java | 3 +- .../xxl/job/core/biz/model/TriggerParam.java | 22 ++-- .../xxl/job/core/enums/RegistryConfig.java | 13 ++ .../xxl/job/core/executor/XxlJobExecutor.java | 9 +- .../xxl/job/core/registry/RegistHelper.java | 13 -- .../core/registry/impl/DbRegistHelper.java | 28 ----- .../core/rpc/netcom/NetComServerFactory.java | 5 +- .../rpc/netcom/jetty/server/JettyServer.java | 5 +- .../core/thread/ExecutorRegistryThread.java | 43 ++++--- .../com/xxl/job/core/thread/JobThread.java | 6 +- .../core/thread/TriggerCallbackThread.java | 22 ++-- .../com/xxl/job/core/util/AdminApiUtil.java | 116 ++++++++++++++++++ .../com/xxl/job/core/util/HttpClientUtil.java | 2 +- .../com/xxl/job/core/util/JacksonUtil.java | 93 ++++++++++++++ .../resources/applicationcontext-xxl-job.xml | 29 +---- .../resources/xxl-job-executor.properties | 7 +- 30 files changed, 451 insertions(+), 251 deletions(-) rename xxl-job-admin/src/main/java/com/xxl/job/admin/{core/biz/AdminBizImpl.java => controller/JobApiController.java} (60%) rename xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/{JobRegistryHelper.java => JobRegistryMonitorHelper.java} (72%) create mode 100644 xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java delete mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/registry/impl/DbRegistHelper.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java create mode 100644 xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java similarity index 60% rename from xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java rename to xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java index 5be779ca..e2fe2674 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/biz/AdminBizImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobApiController.java @@ -1,30 +1,53 @@ -package com.xxl.job.admin.core.biz; +package com.xxl.job.admin.controller; +import com.xxl.job.admin.controller.annotation.PermessionLimit; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.core.biz.AdminBiz; +import com.xxl.job.admin.dao.IXxlJobInfoDao; +import com.xxl.job.admin.dao.IXxlJobLogDao; +import com.xxl.job.admin.dao.IXxlJobRegistryDao; import com.xxl.job.core.biz.model.HandleCallbackParam; +import com.xxl.job.core.biz.model.RegistryParam; import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.util.AdminApiUtil; import org.apache.commons.lang.StringUtils; import org.quartz.SchedulerException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Controller; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RequestMethod; +import org.springframework.web.bind.annotation.ResponseBody; +import javax.annotation.Resource; import java.text.MessageFormat; import java.util.Date; /** - * Created by xuxueli on 17/3/1. + * Created by xuxueli on 17/5/10. */ -public class AdminBizImpl implements AdminBiz { - private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class); +@Controller +public class JobApiController { + private static Logger logger = LoggerFactory.getLogger(JobApiController.class); + + @Resource + public IXxlJobLogDao xxlJobLogDao; + @Resource + private IXxlJobInfoDao xxlJobInfoDao; + @Resource + private IXxlJobRegistryDao xxlJobRegistryDao; + + + @RequestMapping(value= AdminApiUtil.CALLBACK, method = RequestMethod.POST, consumes = "application/json") + @ResponseBody + @PermessionLimit(limit=false) + public ReturnT callback(@RequestBody HandleCallbackParam handleCallbackParam){ - @Override - public ReturnT callback(HandleCallbackParam handleCallbackParam) { // valid log item - XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(handleCallbackParam.getLogId()); + XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId()); if (log == null) { return new ReturnT(ReturnT.FAIL_CODE, "log item not found."); } @@ -32,14 +55,14 @@ public class AdminBizImpl implements AdminBiz { // trigger success, to trigger child job, and avoid repeat trigger child job String childTriggerMsg = null; if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) { - XxlJobInfo xxlJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId()); + XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId()); if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) { childTriggerMsg = "
"; String[] childJobKeys = xxlJobInfo.getChildJobKey().split(","); for (int i = 0; i < childJobKeys.length; i++) { String[] jobKeyArr = childJobKeys[i].split("_"); if (jobKeyArr!=null && jobKeyArr.length==2) { - XxlJobInfo childJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1])); + XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1])); if (childJobInfo!=null) { try { boolean ret = XxlJobDynamicScheduler.triggerJob(String.valueOf(childJobInfo.getId()), String.valueOf(childJobInfo.getJobGroup())); @@ -79,9 +102,21 @@ public class AdminBizImpl implements AdminBiz { log.setHandleTime(new Date()); log.setHandleCode(handleCallbackParam.getExecuteResult().getCode()); log.setHandleMsg(handleMsg.toString()); - XxlJobDynamicScheduler.xxlJobLogDao.updateHandleInfo(log); + xxlJobLogDao.updateHandleInfo(log); - return new ReturnT(ReturnT.SUCCESS_CODE, null); + return ReturnT.SUCCESS; + } + + + @RequestMapping(value=AdminApiUtil.REGISTRY, method = RequestMethod.POST, consumes = "application/json") + @ResponseBody + @PermessionLimit(limit=false) + public ReturnT registry(@RequestBody RegistryParam registryParam){ + int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); + if (ret < 1) { + xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue()); + } + return ReturnT.SUCCESS; } } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java index b5b3524b..7b6ec968 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/controller/JobGroupController.java @@ -1,11 +1,11 @@ package com.xxl.job.admin.controller; import com.xxl.job.admin.core.model.XxlJobGroup; -import com.xxl.job.admin.core.thread.JobRegistryHelper; +import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.enums.RegistryConfig; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.springframework.stereotype.Controller; @@ -33,9 +33,6 @@ public class JobGroupController { @RequestMapping public String index(Model model) { - // job admin - List adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name()); - // job group (executor) List list = xxlJobGroupDao.findAll(); @@ -43,7 +40,7 @@ public class JobGroupController { for (XxlJobGroup group: list) { List registryList = null; if (group.getAddressType() == 0) { - registryList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName()); + registryList = JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName()); } else { if (StringUtils.isNotBlank(group.getAddressList())) { registryList = Arrays.asList(group.getAddressList().split(",")); @@ -53,7 +50,6 @@ public class JobGroupController { } } - model.addAttribute("adminAddressList", adminAddressList); model.addAttribute("list", list); return "jobgroup/jobgroup.index"; } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java index a4d90b10..0c170357 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/jobbean/RemoteHttpJobBean.java @@ -7,12 +7,12 @@ import com.xxl.job.admin.core.model.XxlJobLog; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; import com.xxl.job.admin.core.thread.JobMonitorHelper; -import com.xxl.job.admin.core.thread.JobRegistryHelper; +import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.biz.model.TriggerParam; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; -import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.rpc.netcom.NetComClientProxy; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; @@ -68,7 +68,6 @@ public class RemoteHttpJobBean extends QuartzJobBean { triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime()); triggerParam.setLogId(jobLog.getId()); triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime()); - triggerParam.setLogAddress(findCallbackAddressList()); // callback address list // do trigger ReturnT triggerResult = doTrigger(triggerParam, jobInfo, jobLog); @@ -100,7 +99,7 @@ public class RemoteHttpJobBean extends QuartzJobBean { XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup()); if (group.getAddressType() == 0) { triggerSb.append("注册方式:自动注册"); - addressList = (ArrayList) JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName()); + addressList = (ArrayList) JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName()); } else { triggerSb.append("注册方式:手动录入"); if (StringUtils.isNotBlank(group.getAddressList())) { @@ -212,19 +211,4 @@ public class RemoteHttpJobBean extends QuartzJobBean { return runResult; } - /** - * find callback address list - * @return - */ - public Set findCallbackAddressList(){ - Set adminAddressSet = new HashSet(); - adminAddressSet.add(XxlJobDynamicScheduler.getCallbackAddress()); - - List adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name()); - if (adminAddressList!=null) { - adminAddressSet.addAll(adminAddressList); - } - return adminAddressSet; - } - } \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java index d81d7866..69b2a91f 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/schedule/XxlJobDynamicScheduler.java @@ -3,15 +3,12 @@ package com.xxl.job.admin.core.schedule; import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.thread.JobMonitorHelper; -import com.xxl.job.admin.core.thread.JobRegistryHelper; +import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; import com.xxl.job.admin.dao.IXxlJobGroupDao; import com.xxl.job.admin.dao.IXxlJobInfoDao; import com.xxl.job.admin.dao.IXxlJobLogDao; import com.xxl.job.admin.dao.IXxlJobRegistryDao; -import com.xxl.job.core.biz.AdminBiz; -import com.xxl.job.admin.core.biz.AdminBizImpl; import com.xxl.job.core.rpc.netcom.NetComServerFactory; -import com.xxl.job.core.util.IpUtil; import org.quartz.*; import org.quartz.Trigger.TriggerState; import org.quartz.impl.matchers.GroupMatcher; @@ -39,37 +36,11 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In XxlJobDynamicScheduler.scheduler = scheduler; } - // trigger callback address - private String callBackIp; - private int callBackPort = 8888; - private static String callbackAddress; - - public void setCallBackIp(String callBackIp) { - this.callBackIp = callBackIp; - } - public void setCallBackPort(int callBackPort) { - this.callBackPort = callBackPort; - } - public static String getCallbackAddress(){ - return callbackAddress; - } - // init private NetComServerFactory serverFactory = new NetComServerFactory(); public void init() throws Exception { - // server - NetComServerFactory.putService(AdminBiz.class, new AdminBizImpl()); - serverFactory.start(callBackPort, callBackIp, null, null); - - // init callbackAddress - if (callBackIp!=null && callBackIp.trim().length()>0) { - callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort)); - } else { - callbackAddress = IpUtil.getIpPort(callBackPort);; - } - - // admin registry run - JobRegistryHelper.getInstance().start(); + // admin registry monitor run + JobRegistryMonitorHelper.getInstance().start(); // admin monitor run JobMonitorHelper.getInstance().start(); @@ -78,7 +49,7 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In // destroy public void destroy(){ // admin registry stop - JobRegistryHelper.getInstance().toStop(); + JobRegistryMonitorHelper.getInstance().toStop(); // admin monitor stop JobMonitorHelper.getInstance().toStop(); diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java similarity index 72% rename from xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java rename to xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java index 564f434b..c609edb9 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryHelper.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/core/thread/JobRegistryMonitorHelper.java @@ -2,7 +2,7 @@ package com.xxl.job.admin.core.thread; import com.xxl.job.admin.core.model.XxlJobRegistry; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.enums.RegistryConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -15,11 +15,11 @@ import java.util.concurrent.TimeUnit; * job registry instance * @author xuxueli 2016-10-02 19:10:24 */ -public class JobRegistryHelper { - private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class); +public class JobRegistryMonitorHelper { + private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class); - private static JobRegistryHelper instance = new JobRegistryHelper(); - public static JobRegistryHelper getInstance(){ + private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper(); + public static JobRegistryMonitorHelper getInstance(){ return instance; } @@ -33,18 +33,12 @@ public class JobRegistryHelper { public void run() { while (!toStop) { try { - // registry admin - int ret = XxlJobDynamicScheduler.xxlJobRegistryDao.registryUpdate(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobDynamicScheduler.getCallbackAddress()); - if (ret < 1) { - XxlJobDynamicScheduler.xxlJobRegistryDao.registrySave(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobDynamicScheduler.getCallbackAddress()); - } - // remove dead admin/executor - XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistHelper.TIMEOUT*2); + XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT); // fresh registry map ConcurrentHashMap> temp = new ConcurrentHashMap>(); - List list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistHelper.TIMEOUT*2); + List list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT); if (list != null) { for (XxlJobRegistry item: list) { String groupKey = makeGroupKey(item.getRegistryGroup(), item.getRegistryKey()); @@ -61,7 +55,7 @@ public class JobRegistryHelper { logger.error("job registry instance error:{}", e); } try { - TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); } catch (InterruptedException e) { logger.error("job registry instance error:{}", e); } diff --git a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java index 1f606472..5f1c86f2 100644 --- a/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java +++ b/xxl-job-admin/src/main/java/com/xxl/job/admin/service/impl/XxlJobServiceImpl.java @@ -5,13 +5,16 @@ import com.xxl.job.admin.core.model.XxlJobGroup; import com.xxl.job.admin.core.model.XxlJobInfo; import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum; import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler; -import com.xxl.job.admin.core.thread.JobRegistryHelper; -import com.xxl.job.admin.dao.*; +import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper; +import com.xxl.job.admin.dao.IXxlJobGroupDao; +import com.xxl.job.admin.dao.IXxlJobInfoDao; +import com.xxl.job.admin.dao.IXxlJobLogDao; +import com.xxl.job.admin.dao.IXxlJobLogGlueDao; import com.xxl.job.admin.service.IXxlJobService; import com.xxl.job.core.biz.model.ReturnT; import com.xxl.job.core.enums.ExecutorBlockStrategyEnum; +import com.xxl.job.core.enums.RegistryConfig; import com.xxl.job.core.glue.GlueTypeEnum; -import com.xxl.job.core.registry.RegistHelper; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.time.DateUtils; @@ -289,7 +292,7 @@ public class XxlJobServiceImpl implements IXxlJobService { for (XxlJobGroup group: groupList) { List registryList = null; if (group.getAddressType() == 0) { - registryList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName()); + registryList = JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName()); } else { if (StringUtils.isNotBlank(group.getAddressList())) { registryList = Arrays.asList(group.getAddressList().split(",")); diff --git a/xxl-job-admin/src/main/resources/spring/applicationcontext-xxl-job-admin.xml b/xxl-job-admin/src/main/resources/spring/applicationcontext-xxl-job-admin.xml index 83589dca..5cb24368 100644 --- a/xxl-job-admin/src/main/resources/spring/applicationcontext-xxl-job-admin.xml +++ b/xxl-job-admin/src/main/resources/spring/applicationcontext-xxl-job-admin.xml @@ -84,10 +84,6 @@ - - - - \ No newline at end of file diff --git a/xxl-job-admin/src/main/resources/spring/springmvc-context.xml b/xxl-job-admin/src/main/resources/spring/springmvc-context.xml index 566cafd2..b1f11c89 100644 --- a/xxl-job-admin/src/main/resources/spring/springmvc-context.xml +++ b/xxl-job-admin/src/main/resources/spring/springmvc-context.xml @@ -10,7 +10,7 @@ http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd"> - + diff --git a/xxl-job-admin/src/main/resources/xxl-job-admin.properties b/xxl-job-admin/src/main/resources/xxl-job-admin.properties index 40f4c236..74134e3a 100644 --- a/xxl-job-admin/src/main/resources/xxl-job-admin.properties +++ b/xxl-job-admin/src/main/resources/xxl-job-admin.properties @@ -4,10 +4,6 @@ xxl.job.db.url=jdbc:mysql://localhost:3306/xxl-job?useUnicode=true&characterEnco xxl.job.db.user=root xxl.job.db.password=root_pwd -### xxl-job callback address -xxl.job.callBackIp= -xxl.job.callBackPort=8888 - ### xxl-job email xxl.job.mail.host=smtp.163.com xxl.job.mail.port=25 diff --git a/xxl-job-admin/src/main/webapp/WEB-INF/template/jobgroup/jobgroup.index.ftl b/xxl-job-admin/src/main/webapp/WEB-INF/template/jobgroup/jobgroup.index.ftl index e3fe878d..4bd8f44d 100644 --- a/xxl-job-admin/src/main/webapp/WEB-INF/template/jobgroup/jobgroup.index.ftl +++ b/xxl-job-admin/src/main/webapp/WEB-INF/template/jobgroup/jobgroup.index.ftl @@ -30,8 +30,6 @@

执行器列表

   -      - 调度中心OnLine机器:<#if adminAddressList?exists><#list adminAddressList as item>${item}
diff --git a/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java new file mode 100644 index 00000000..fcb6ecbd --- /dev/null +++ b/xxl-job-admin/src/test/java/com/xxl/job/dao/impl/AdminApiTest.java @@ -0,0 +1,23 @@ +package com.xxl.job.dao.impl; + +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.AdminApiUtil; + +/** + * Created by xuxueli on 17/5/10. + */ +public class AdminApiTest { + + public static void main(String[] args) { + try { + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "aaa", "112312312312"); + ReturnT registryResult = AdminApiUtil.callApi("http://localhost:8080/xxl-job-admin"+AdminApiUtil.REGISTRY, registryParam); + System.out.println(registryResult); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java deleted file mode 100644 index cb814efc..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/AdminBiz.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.xxl.job.core.biz; - -import com.xxl.job.core.biz.model.HandleCallbackParam; -import com.xxl.job.core.biz.model.ReturnT; - -/** - * Created by xuxueli on 17/3/1. - */ -public interface AdminBiz { - - public ReturnT callback(HandleCallbackParam handleCallbackParam); - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java index 1c4233c5..39ec4db1 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/HandleCallbackParam.java @@ -1,7 +1,6 @@ package com.xxl.job.core.biz.model; import java.io.Serializable; -import java.util.Set; /** * Created by xuxueli on 17/3/2. @@ -10,13 +9,11 @@ public class HandleCallbackParam implements Serializable { private static final long serialVersionUID = 42L; private int logId; - private Set logAddress; - private ReturnT executeResult; - public HandleCallbackParam(int logId, Set logAddress, ReturnT executeResult) { + public HandleCallbackParam(){} + public HandleCallbackParam(int logId, ReturnT executeResult) { this.logId = logId; - this.logAddress = logAddress; this.executeResult = executeResult; } @@ -28,14 +25,6 @@ public class HandleCallbackParam implements Serializable { this.logId = logId; } - public Set getLogAddress() { - return logAddress; - } - - public void setLogAddress(Set logAddress) { - this.logAddress = logAddress; - } - public ReturnT getExecuteResult() { return executeResult; } @@ -43,4 +32,12 @@ public class HandleCallbackParam implements Serializable { public void setExecuteResult(ReturnT executeResult) { this.executeResult = executeResult; } + + @Override + public String toString() { + return "HandleCallbackParam{" + + "logId=" + logId + + ", executeResult=" + executeResult + + '}'; + } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java new file mode 100644 index 00000000..8747a9bd --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/RegistryParam.java @@ -0,0 +1,54 @@ +package com.xxl.job.core.biz.model; + +import java.io.Serializable; + +/** + * Created by xuxueli on 2017-05-10 20:22:42 + */ +public class RegistryParam implements Serializable { + private static final long serialVersionUID = 42L; + + private String registGroup; + private String registryKey; + private String registryValue; + + public RegistryParam(){} + public RegistryParam(String registGroup, String registryKey, String registryValue) { + this.registGroup = registGroup; + this.registryKey = registryKey; + this.registryValue = registryValue; + } + + public String getRegistGroup() { + return registGroup; + } + + public void setRegistGroup(String registGroup) { + this.registGroup = registGroup; + } + + public String getRegistryKey() { + return registryKey; + } + + public void setRegistryKey(String registryKey) { + this.registryKey = registryKey; + } + + public String getRegistryValue() { + return registryValue; + } + + public void setRegistryValue(String registryValue) { + this.registryValue = registryValue; + } + + @Override + public String toString() { + return "RegistryParam{" + + "registGroup='" + registGroup + '\'' + + ", registryKey='" + registryKey + '\'' + + ", registryValue='" + registryValue + '\'' + + '}'; + } +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java index 6eca8cba..112385ee 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/ReturnT.java @@ -18,7 +18,8 @@ public class ReturnT implements Serializable { private int code; private String msg; private T content; - + + public ReturnT(){} public ReturnT(int code, String msg) { this.code = code; this.msg = msg; diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java index 48bf0569..7a408ffd 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/biz/model/TriggerParam.java @@ -1,7 +1,6 @@ package com.xxl.job.core.biz.model; import java.io.Serializable; -import java.util.Set; /** * Created by xuxueli on 16/7/22. @@ -22,8 +21,6 @@ public class TriggerParam implements Serializable{ private int logId; private long logDateTim; - private Set logAddress; - public int getJobId() { return jobId; } @@ -96,11 +93,18 @@ public class TriggerParam implements Serializable{ this.logDateTim = logDateTim; } - public Set getLogAddress() { - return logAddress; - } - - public void setLogAddress(Set logAddress) { - this.logAddress = logAddress; + @Override + public String toString() { + return "TriggerParam{" + + "jobId=" + jobId + + ", executorHandler='" + executorHandler + '\'' + + ", executorParams='" + executorParams + '\'' + + ", executorBlockStrategy='" + executorBlockStrategy + '\'' + + ", glueType='" + glueType + '\'' + + ", glueSource='" + glueSource + '\'' + + ", glueUpdatetime=" + glueUpdatetime + + ", logId=" + logId + + ", logDateTim=" + logDateTim + + '}'; } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java b/xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java new file mode 100644 index 00000000..798beaef --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/enums/RegistryConfig.java @@ -0,0 +1,13 @@ +package com.xxl.job.core.enums; + +/** + * Created by xuxueli on 17/5/10. + */ +public class RegistryConfig { + + public static final int BEAT_TIMEOUT = 30; + public static final int DEAD_TIMEOUT = BEAT_TIMEOUT * 3; + + public enum RegistType{ EXECUTOR, ADMIN } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java index 1c095442..8d94554c 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/executor/XxlJobExecutor.java @@ -4,7 +4,6 @@ import com.xxl.job.core.biz.ExecutorBiz; import com.xxl.job.core.biz.impl.ExecutorBizImpl; import com.xxl.job.core.handler.IJobHandler; import com.xxl.job.core.handler.annotation.JobHander; -import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.rpc.netcom.NetComServerFactory; import com.xxl.job.core.thread.ExecutorRegistryThread; import com.xxl.job.core.thread.JobThread; @@ -30,7 +29,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe private String ip; private int port = 9999; private String appName; - private RegistHelper registHelper; + public static String adminAddresses; public static String logPath; public void setIp(String ip) { @@ -42,8 +41,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe public void setAppName(String appName) { this.appName = appName; } - public void setRegistHelper(RegistHelper registHelper) { - this.registHelper = registHelper; + public void setAdminAddresses(String adminAddresses) { + this.adminAddresses = adminAddresses; } public void setLogPath(String logPath) { this.logPath = logPath; @@ -54,7 +53,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe public void start() throws Exception { // executor start NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl()); - serverFactory.start(port, ip, appName, registHelper); + serverFactory.start(port, ip, appName); // trigger callback thread start TriggerCallbackThread.getInstance().start(); diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java deleted file mode 100644 index 117fc213..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/registry/RegistHelper.java +++ /dev/null @@ -1,13 +0,0 @@ -package com.xxl.job.core.registry; - -/** - * Created by xuxueli on 16/9/30. - */ -public interface RegistHelper { - - public static final int TIMEOUT = 15; - public enum RegistType{ EXECUTOR, ADMIN } - - public int registry(String registGroup, String registryKey, String registryValue); - -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/registry/impl/DbRegistHelper.java b/xxl-job-core/src/main/java/com/xxl/job/core/registry/impl/DbRegistHelper.java deleted file mode 100644 index 55a9e9b5..00000000 --- a/xxl-job-core/src/main/java/com/xxl/job/core/registry/impl/DbRegistHelper.java +++ /dev/null @@ -1,28 +0,0 @@ -package com.xxl.job.core.registry.impl; - -import com.xxl.job.core.registry.RegistHelper; -import com.xxl.job.core.util.DBUtil; - -import javax.sql.DataSource; - -/** - * Created by xuxueli on 16/9/30. - */ -public class DbRegistHelper implements RegistHelper { - - private DataSource dataSource; - public void setDataSource(DataSource dataSource) { - this.dataSource = dataSource; - } - - @Override - public int registry(String registGroup, String registryKey, String registryValue) { - String updateSql = "UPDATE XXL_JOB_QRTZ_TRIGGER_REGISTRY SET `update_time` = NOW() WHERE `registry_group` = ? AND `registry_key` = ? AND `registry_value` = ?"; - String insertSql = "INSERT INTO XXL_JOB_QRTZ_TRIGGER_REGISTRY( `registry_group` , `registry_key` , `registry_value`, `update_time`) VALUES(? , ? , ?, NOW())"; - int ret = DBUtil.update(dataSource, updateSql, new Object[]{registGroup, registryKey, registryValue}); - if (ret<1) { - ret = DBUtil.update(dataSource, insertSql, new Object[]{registGroup, registryKey, registryValue}); - } - return ret; - } -} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java index 5bedde5b..7fc44aca 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/NetComServerFactory.java @@ -1,7 +1,6 @@ package com.xxl.job.core.rpc.netcom; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.rpc.codec.RpcRequest; import com.xxl.job.core.rpc.codec.RpcResponse; import com.xxl.job.core.rpc.netcom.jetty.server.JettyServer; @@ -22,8 +21,8 @@ public class NetComServerFactory { // ---------------------- server start ---------------------- JettyServer server = new JettyServer(); - public void start(int port, String ip, String appName, RegistHelper registHelper) throws Exception { - server.start(port, ip, appName, registHelper); + public void start(int port, String ip, String appName) throws Exception { + server.start(port, ip, appName); } // ---------------------- server destroy ---------------------- diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java index f53949fa..ae0c6e56 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/rpc/netcom/jetty/server/JettyServer.java @@ -1,6 +1,5 @@ package com.xxl.job.core.rpc.netcom.jetty.server; -import com.xxl.job.core.registry.RegistHelper; import com.xxl.job.core.thread.ExecutorRegistryThread; import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.Handler; @@ -20,7 +19,7 @@ public class JettyServer { private Server server; private Thread thread; - public void start(final int port, final String ip, final String appName, final RegistHelper registHelper) throws Exception { + public void start(final int port, final String ip, final String appName) throws Exception { thread = new Thread(new Runnable() { @Override public void run() { @@ -42,7 +41,7 @@ public class JettyServer { // Start the server server.start(); logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port); - ExecutorRegistryThread.getInstance().start(port, ip, appName, registHelper); + ExecutorRegistryThread.getInstance().start(port, ip, appName); server.join(); // block until thread stopped logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port); } catch (Exception e) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java index 485163ef..8ea6dba3 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/ExecutorRegistryThread.java @@ -1,7 +1,12 @@ package com.xxl.job.core.thread; -import com.xxl.job.core.registry.RegistHelper; +import com.xxl.job.core.biz.model.RegistryParam; +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.enums.RegistryConfig; +import com.xxl.job.core.util.AdminApiUtil; import com.xxl.job.core.util.IpUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; @@ -9,6 +14,7 @@ import java.util.concurrent.TimeUnit; * Created by xuxueli on 17/3/2. */ public class ExecutorRegistryThread extends Thread { + private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class); private static ExecutorRegistryThread instance = new ExecutorRegistryThread(); public static ExecutorRegistryThread getInstance(){ @@ -17,27 +23,36 @@ public class ExecutorRegistryThread extends Thread { private Thread registryThread; private boolean toStop = false; - public void start(final int port, final String ip, final String appName, final RegistHelper registHelper){ - if (registHelper==null && appName==null || appName.trim().length()==0) { + public void start(final int port, final String ip, final String appName){ + if (appName==null || appName.trim().length()==0) { + logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail"); return; } + + // executor address (generate addredd = ip:port) + final String executorAddress; + if (ip != null && ip.trim().length()>0) { + executorAddress = ip.trim().concat(":").concat(String.valueOf(port)); + } else { + executorAddress = IpUtil.getIpPort(port); + } + registryThread = new Thread(new Runnable() { @Override public void run() { while (!toStop) { try { - // generate addredd = ip:port - String address = null; - if (ip != null && ip.trim().length()>0) { - address = ip.trim().concat(":").concat(String.valueOf(port)); - } else { - address = IpUtil.getIpPort(port); - } - - registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address); - TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT); + RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress); + ReturnT registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam); + logger.info(">>>>>>>>>>> xxl-job registry, RegistryParam:{}, registryResult:{}", new Object[]{registryParam.toString(), registryResult.toString()}); } catch (Exception e) { - e.printStackTrace(); + logger.error(">>>>>>>>>>> xxl-job ExecutorRegistryThread Exception:", e); + } + + try { + TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); } } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index 0cf88862..b410fe38 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -131,11 +131,11 @@ public class JobThread extends Thread{ // callback handler info if (!toStop) { // commonm - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), executeResult)); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult)); } else { // is killed ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult)); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); } } } catch (Exception e) { @@ -156,7 +156,7 @@ public class JobThread extends Thread{ if (triggerParam!=null) { // is killed ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult)); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); } } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java index 3d30d2bf..8fdecada 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/TriggerCallbackThread.java @@ -1,9 +1,8 @@ package com.xxl.job.core.thread; -import com.xxl.job.core.biz.AdminBiz; import com.xxl.job.core.biz.model.HandleCallbackParam; import com.xxl.job.core.biz.model.ReturnT; -import com.xxl.job.core.rpc.netcom.NetComClientProxy; +import com.xxl.job.core.util.AdminApiUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -33,19 +32,12 @@ public class TriggerCallbackThread { try { HandleCallbackParam callback = getInstance().callBackQueue.take(); if (callback != null) { - for (String address : callback.getLogAddress()) { - try { - // callback - AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, address).getObject(); - ReturnT callbackResult = adminBiz.callback(callback); - - logger.info(">>>>>>>>>>> xxl-job callback , CallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()}); - if (ReturnT.SUCCESS_CODE == callbackResult.getCode()) { - break; - } - } catch (Exception e) { - logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e); - } + // callback + try { + ReturnT callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callback); + logger.info(">>>>>>>>>>> xxl-job callback, HandleCallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()}); + } catch (Exception e) { + logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e); } } } catch (Exception e) { diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java new file mode 100644 index 00000000..4298f3bc --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/AdminApiUtil.java @@ -0,0 +1,116 @@ +package com.xxl.job.core.util; + +import com.xxl.job.core.biz.model.ReturnT; +import com.xxl.job.core.executor.XxlJobExecutor; +import org.apache.http.HttpEntity; +import org.apache.http.HttpResponse; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.entity.StringEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +/** + * @author xuxueli 2017-05-10 21:28:15 + */ +public class AdminApiUtil { + private static Logger logger = LoggerFactory.getLogger(AdminApiUtil.class); + + public static final String CALLBACK = "/api/callback"; + public static final String REGISTRY = "/api/registry"; + + public static ReturnT callApiFailover(String subUrl, Object requestObj) throws Exception { + + // admin assress list + List adminAddressList = new ArrayList(); + if (XxlJobExecutor.adminAddresses != null) { + for (String adminAddressItem: XxlJobExecutor.adminAddresses.split(",")) { + if (adminAddressItem.trim().length()>0 && !adminAddressList.contains(adminAddressItem)) { + adminAddressList.add(adminAddressItem); + } + } + } + if (adminAddressList==null || adminAddressList.size()==0) { + return ReturnT.FAIL; + } + + for (String adminAddress: adminAddressList) { + ReturnT registryResult = null; + try { + String apiUrl = adminAddress.concat(subUrl); + registryResult = callApi(apiUrl, requestObj); + } catch (Exception e) { + logger.error(e.getMessage(), e); + } + if (registryResult!=null && registryResult.getCode()==ReturnT.SUCCESS_CODE) { + return ReturnT.SUCCESS; + } + } + return ReturnT.FAIL; + } + + public static ReturnT callApi(String finalUrl, Object requestObj) throws Exception { + HttpPost httpPost = new HttpPost(finalUrl); + CloseableHttpClient httpClient = HttpClients.createDefault(); + try { + + // timeout + RequestConfig requestConfig = RequestConfig.custom() + .setConnectionRequestTimeout(10000) + .setSocketTimeout(10000) + .setConnectTimeout(10000) + .build(); + + httpPost.setConfig(requestConfig); + + // data + if (requestObj != null) { + String json = JacksonUtil.writeValueAsString(requestObj); + + StringEntity entity = new StringEntity(json, "utf-8"); + entity.setContentEncoding("UTF-8"); + entity.setContentType("application/json"); + + httpPost.setEntity(entity); + } + + // do post + HttpResponse response = httpClient.execute(httpPost); + HttpEntity entity = response.getEntity(); + if (null != entity) { + if (response.getStatusLine().getStatusCode() != 200) { + EntityUtils.consume(entity); + return ReturnT.FAIL; + } + + String responseMsg = EntityUtils.toString(entity, "UTF-8"); + EntityUtils.consume(entity); + if (responseMsg!=null && responseMsg.startsWith("{")) { + ReturnT result = JacksonUtil.readValue(responseMsg, ReturnT.class); + return result; + } + } + return ReturnT.FAIL; + } catch (Exception e) { + logger.error("", e); + return new ReturnT(ReturnT.FAIL_CODE, e.getMessage()); + } finally { + if (httpPost!=null) { + httpPost.releaseConnection(); + } + try { + httpClient.close(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } + +} diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java index 95750c88..da941a33 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/HttpClientUtil.java @@ -105,5 +105,5 @@ public class HttpClientUtil { } return new byte[] {}; } - + } diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java b/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java new file mode 100644 index 00000000..5fde5c02 --- /dev/null +++ b/xxl-job-core/src/main/java/com/xxl/job/core/util/JacksonUtil.java @@ -0,0 +1,93 @@ +package com.xxl.job.core.util; + + +import org.codehaus.jackson.JsonGenerationException; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +/** + * Jackson util + * + * 1、obj need private and set/get; + * 2、do not support inner class; + * + * @author xuxueli 2015-9-25 18:02:56 + */ +public class JacksonUtil { + private final static ObjectMapper objectMapper = new ObjectMapper(); + public static ObjectMapper getInstance() { + return objectMapper; + } + + /** + * bean、array、List、Map --> json + * + * @param obj + * @return json string + * @throws Exception + */ + public static String writeValueAsString(Object obj) { + try { + return getInstance().writeValueAsString(obj); + } catch (JsonGenerationException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + /** + * string --> bean、Map、List(array) + * + * @param jsonStr + * @param clazz + * @return obj + * @throws Exception + */ + public static T readValue(String jsonStr, Class clazz) { + try { + return getInstance().readValue(jsonStr, clazz); + } catch (JsonParseException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + public static T readValueRefer(String jsonStr, Class clazz) { + try { + return getInstance().readValue(jsonStr, new TypeReference() { }); + } catch (JsonParseException e) { + e.printStackTrace(); + } catch (JsonMappingException e) { + e.printStackTrace(); + } catch (IOException e) { + e.printStackTrace(); + } + return null; + } + + public static void main(String[] args) { + try { + Map map = new HashMap(); + map.put("aaa", "111"); + map.put("bbb", "222"); + String json = writeValueAsString(map); + System.out.println(json); + System.out.println(readValue(json, Map.class)); + } catch (Exception e) { + e.printStackTrace(); + } + } +} diff --git a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml index 07c3c843..42237b58 100644 --- a/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml +++ b/xxl-job-executor-example/src/main/resources/applicationcontext-xxl-job.xml @@ -27,34 +27,13 @@ + - - - - - - - - + + + - - - - - - - - - - - - - - - - - \ No newline at end of file diff --git a/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties b/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties index a950b06c..e1c342d6 100644 --- a/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties +++ b/xxl-job-executor-example/src/main/resources/xxl-job-executor.properties @@ -1,8 +1,5 @@ -### xxl-job db -xxl.job.db.driverClass=com.mysql.jdbc.Driver -xxl.job.db.url=jdbc:mysql://localhost:3306/xxl-job?useUnicode=true&characterEncoding=UTF-8 -xxl.job.db.user=root -xxl.job.db.password=root_pwd +### xxl-job admin address, such as "http://host01:port01/project,http://host02:port02/project" +xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin ### xxl-job executor address xxl.job.executor.appname=xxl-job-executor-example