执行器与数据库彻底解耦,但是执行器需要配置调度中心集群地址。调度中心提供API供执行器回调和心跳注册服务,取消调度中心内部jetty,心跳周期调整为30s,心跳失效为三倍心跳;

This commit is contained in:
xueli.xue 2017-05-10 23:09:16 +08:00
parent 66de2818ba
commit 8a6d462d3b
30 changed files with 451 additions and 251 deletions

View File

@ -1,30 +1,53 @@
package com.xxl.job.admin.core.biz;
package com.xxl.job.admin.controller;
import com.xxl.job.admin.controller.annotation.PermessionLimit;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.admin.dao.IXxlJobRegistryDao;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.util.AdminApiUtil;
import org.apache.commons.lang.StringUtils;
import org.quartz.SchedulerException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
import javax.annotation.Resource;
import java.text.MessageFormat;
import java.util.Date;
/**
* Created by xuxueli on 17/3/1.
* Created by xuxueli on 17/5/10.
*/
public class AdminBizImpl implements AdminBiz {
private static Logger logger = LoggerFactory.getLogger(AdminBizImpl.class);
@Controller
public class JobApiController {
private static Logger logger = LoggerFactory.getLogger(JobApiController.class);
@Resource
public IXxlJobLogDao xxlJobLogDao;
@Resource
private IXxlJobInfoDao xxlJobInfoDao;
@Resource
private IXxlJobRegistryDao xxlJobRegistryDao;
@RequestMapping(value= AdminApiUtil.CALLBACK, method = RequestMethod.POST, consumes = "application/json")
@ResponseBody
@PermessionLimit(limit=false)
public ReturnT<String> callback(@RequestBody HandleCallbackParam handleCallbackParam){
@Override
public ReturnT<String> callback(HandleCallbackParam handleCallbackParam) {
// valid log item
XxlJobLog log = XxlJobDynamicScheduler.xxlJobLogDao.load(handleCallbackParam.getLogId());
XxlJobLog log = xxlJobLogDao.load(handleCallbackParam.getLogId());
if (log == null) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "log item not found.");
}
@ -32,14 +55,14 @@ public class AdminBizImpl implements AdminBiz {
// trigger success, to trigger child job, and avoid repeat trigger child job
String childTriggerMsg = null;
if (ReturnT.SUCCESS_CODE==handleCallbackParam.getExecuteResult().getCode() && ReturnT.SUCCESS_CODE!=log.getHandleCode()) {
XxlJobInfo xxlJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(log.getJobId());
XxlJobInfo xxlJobInfo = xxlJobInfoDao.loadById(log.getJobId());
if (xxlJobInfo!=null && StringUtils.isNotBlank(xxlJobInfo.getChildJobKey())) {
childTriggerMsg = "<hr>";
String[] childJobKeys = xxlJobInfo.getChildJobKey().split(",");
for (int i = 0; i < childJobKeys.length; i++) {
String[] jobKeyArr = childJobKeys[i].split("_");
if (jobKeyArr!=null && jobKeyArr.length==2) {
XxlJobInfo childJobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1]));
XxlJobInfo childJobInfo = xxlJobInfoDao.loadById(Integer.valueOf(jobKeyArr[1]));
if (childJobInfo!=null) {
try {
boolean ret = XxlJobDynamicScheduler.triggerJob(String.valueOf(childJobInfo.getId()), String.valueOf(childJobInfo.getJobGroup()));
@ -79,9 +102,21 @@ public class AdminBizImpl implements AdminBiz {
log.setHandleTime(new Date());
log.setHandleCode(handleCallbackParam.getExecuteResult().getCode());
log.setHandleMsg(handleMsg.toString());
XxlJobDynamicScheduler.xxlJobLogDao.updateHandleInfo(log);
xxlJobLogDao.updateHandleInfo(log);
return new ReturnT<String>(ReturnT.SUCCESS_CODE, null);
return ReturnT.SUCCESS;
}
@RequestMapping(value=AdminApiUtil.REGISTRY, method = RequestMethod.POST, consumes = "application/json")
@ResponseBody
@PermessionLimit(limit=false)
public ReturnT<String> registry(@RequestBody RegistryParam registryParam){
int ret = xxlJobRegistryDao.registryUpdate(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
if (ret < 1) {
xxlJobRegistryDao.registrySave(registryParam.getRegistGroup(), registryParam.getRegistryKey(), registryParam.getRegistryValue());
}
return ReturnT.SUCCESS;
}
}

View File

@ -1,11 +1,11 @@
package com.xxl.job.admin.controller;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.enums.RegistryConfig;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.springframework.stereotype.Controller;
@ -33,9 +33,6 @@ public class JobGroupController {
@RequestMapping
public String index(Model model) {
// job admin
List<String> adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name());
// job group (executor)
List<XxlJobGroup> list = xxlJobGroupDao.findAll();
@ -43,7 +40,7 @@ public class JobGroupController {
for (XxlJobGroup group: list) {
List<String> registryList = null;
if (group.getAddressType() == 0) {
registryList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName());
registryList = JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName());
} else {
if (StringUtils.isNotBlank(group.getAddressList())) {
registryList = Arrays.asList(group.getAddressList().split(","));
@ -53,7 +50,6 @@ public class JobGroupController {
}
}
model.addAttribute("adminAddressList", adminAddressList);
model.addAttribute("list", list);
return "jobgroup/jobgroup.index";
}

View File

@ -7,12 +7,12 @@ import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
@ -68,7 +68,6 @@ public class RemoteHttpJobBean extends QuartzJobBean {
triggerParam.setGlueUpdatetime(jobInfo.getGlueUpdatetime().getTime());
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setLogAddress(findCallbackAddressList()); // callback address list
// do trigger
ReturnT<String> triggerResult = doTrigger(triggerParam, jobInfo, jobLog);
@ -100,7 +99,7 @@ public class RemoteHttpJobBean extends QuartzJobBean {
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
if (group.getAddressType() == 0) {
triggerSb.append("注册方式:自动注册");
addressList = (ArrayList<String>) JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName());
addressList = (ArrayList<String>) JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName());
} else {
triggerSb.append("注册方式:手动录入");
if (StringUtils.isNotBlank(group.getAddressList())) {
@ -212,19 +211,4 @@ public class RemoteHttpJobBean extends QuartzJobBean {
return runResult;
}
/**
* find callback address list
* @return
*/
public Set<String> findCallbackAddressList(){
Set<String> adminAddressSet = new HashSet<String>();
adminAddressSet.add(XxlJobDynamicScheduler.getCallbackAddress());
List<String> adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name());
if (adminAddressList!=null) {
adminAddressSet.addAll(adminAddressList);
}
return adminAddressSet;
}
}

View File

@ -3,15 +3,12 @@ package com.xxl.job.admin.core.schedule;
import com.xxl.job.admin.core.jobbean.RemoteHttpJobBean;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.admin.dao.IXxlJobRegistryDao;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.admin.core.biz.AdminBizImpl;
import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.util.IpUtil;
import org.quartz.*;
import org.quartz.Trigger.TriggerState;
import org.quartz.impl.matchers.GroupMatcher;
@ -39,37 +36,11 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
XxlJobDynamicScheduler.scheduler = scheduler;
}
// trigger callback address
private String callBackIp;
private int callBackPort = 8888;
private static String callbackAddress;
public void setCallBackIp(String callBackIp) {
this.callBackIp = callBackIp;
}
public void setCallBackPort(int callBackPort) {
this.callBackPort = callBackPort;
}
public static String getCallbackAddress(){
return callbackAddress;
}
// init
private NetComServerFactory serverFactory = new NetComServerFactory();
public void init() throws Exception {
// server
NetComServerFactory.putService(AdminBiz.class, new AdminBizImpl());
serverFactory.start(callBackPort, callBackIp, null, null);
// init callbackAddress
if (callBackIp!=null && callBackIp.trim().length()>0) {
callbackAddress = callBackIp.trim().concat(":").concat(String.valueOf(callBackPort));
} else {
callbackAddress = IpUtil.getIpPort(callBackPort);;
}
// admin registry run
JobRegistryHelper.getInstance().start();
// admin registry monitor run
JobRegistryMonitorHelper.getInstance().start();
// admin monitor run
JobMonitorHelper.getInstance().start();
@ -78,7 +49,7 @@ public final class XxlJobDynamicScheduler implements ApplicationContextAware, In
// destroy
public void destroy(){
// admin registry stop
JobRegistryHelper.getInstance().toStop();
JobRegistryMonitorHelper.getInstance().toStop();
// admin monitor stop
JobMonitorHelper.getInstance().toStop();

View File

@ -2,7 +2,7 @@ package com.xxl.job.admin.core.thread;
import com.xxl.job.admin.core.model.XxlJobRegistry;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.enums.RegistryConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -15,11 +15,11 @@ import java.util.concurrent.TimeUnit;
* job registry instance
* @author xuxueli 2016-10-02 19:10:24
*/
public class JobRegistryHelper {
private static Logger logger = LoggerFactory.getLogger(JobRegistryHelper.class);
public class JobRegistryMonitorHelper {
private static Logger logger = LoggerFactory.getLogger(JobRegistryMonitorHelper.class);
private static JobRegistryHelper instance = new JobRegistryHelper();
public static JobRegistryHelper getInstance(){
private static JobRegistryMonitorHelper instance = new JobRegistryMonitorHelper();
public static JobRegistryMonitorHelper getInstance(){
return instance;
}
@ -33,18 +33,12 @@ public class JobRegistryHelper {
public void run() {
while (!toStop) {
try {
// registry admin
int ret = XxlJobDynamicScheduler.xxlJobRegistryDao.registryUpdate(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobDynamicScheduler.getCallbackAddress());
if (ret < 1) {
XxlJobDynamicScheduler.xxlJobRegistryDao.registrySave(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name(), XxlJobDynamicScheduler.getCallbackAddress());
}
// remove dead admin/executor
XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistHelper.TIMEOUT*2);
XxlJobDynamicScheduler.xxlJobRegistryDao.removeDead(RegistryConfig.DEAD_TIMEOUT);
// fresh registry map
ConcurrentHashMap<String, List<String>> temp = new ConcurrentHashMap<String, List<String>>();
List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistHelper.TIMEOUT*2);
List<XxlJobRegistry> list = XxlJobDynamicScheduler.xxlJobRegistryDao.findAll(RegistryConfig.DEAD_TIMEOUT);
if (list != null) {
for (XxlJobRegistry item: list) {
String groupKey = makeGroupKey(item.getRegistryGroup(), item.getRegistryKey());
@ -61,7 +55,7 @@ public class JobRegistryHelper {
logger.error("job registry instance error:{}", e);
}
try {
TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.error("job registry instance error:{}", e);
}

View File

@ -5,13 +5,16 @@ import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
import com.xxl.job.admin.dao.*;
import com.xxl.job.admin.core.thread.JobRegistryMonitorHelper;
import com.xxl.job.admin.dao.IXxlJobGroupDao;
import com.xxl.job.admin.dao.IXxlJobInfoDao;
import com.xxl.job.admin.dao.IXxlJobLogDao;
import com.xxl.job.admin.dao.IXxlJobLogGlueDao;
import com.xxl.job.admin.service.IXxlJobService;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.ExecutorBlockStrategyEnum;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.glue.GlueTypeEnum;
import com.xxl.job.core.registry.RegistHelper;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.time.DateUtils;
@ -289,7 +292,7 @@ public class XxlJobServiceImpl implements IXxlJobService {
for (XxlJobGroup group: groupList) {
List<String> registryList = null;
if (group.getAddressType() == 0) {
registryList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName());
registryList = JobRegistryMonitorHelper.discover(RegistryConfig.RegistType.EXECUTOR.name(), group.getAppName());
} else {
if (StringUtils.isNotBlank(group.getAddressList())) {
registryList = Arrays.asList(group.getAddressList().split(","));

View File

@ -84,10 +84,6 @@
<bean id="xxlJobDynamicScheduler" class="com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler" init-method="init" destroy-method="destroy" >
<!-- (轻易不要变更“调度器名称”, 任务创建时会绑定该“调度器名称”) -->
<property name="scheduler" ref="quartzScheduler"/>
<!-- 调度中心回调IP[选填],为空则自动获取 -->
<property name="callBackIp" value="${xxl.job.callBackIp}"/>
<!-- 调度中心回调端口号 -->
<property name="callBackPort" value="${xxl.job.callBackPort}"/>
</bean>
</beans>

View File

@ -10,7 +10,7 @@
http://www.springframework.org/schema/mvc
http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd">
<mvc:annotation-driven />
<mvc:annotation-driven />
<context:component-scan base-package="com.xxl.job.admin.controller" />
<mvc:resources mapping="/favicon.ico" location="/favicon.ico" />

View File

@ -4,10 +4,6 @@ xxl.job.db.url=jdbc:mysql://localhost:3306/xxl-job?useUnicode=true&characterEnco
xxl.job.db.user=root
xxl.job.db.password=root_pwd
### xxl-job callback address
xxl.job.callBackIp=
xxl.job.callBackPort=8888
### xxl-job email
xxl.job.mail.host=smtp.163.com
xxl.job.mail.port=25

View File

@ -30,8 +30,6 @@
<div class="box-header">
<h3 class="box-title">执行器列表</h3>&nbsp;&nbsp;
<button class="btn btn-info btn-xs pull-left2 add" >+新增执行器</button>
&nbsp;&nbsp;&nbsp;&nbsp;
调度中心OnLine机器<#if adminAddressList?exists><#list adminAddressList as item><span class="badge bg-green">${item}</span></#list></#if>
</div>
<div class="box-body">
<table id="joblog_list" class="table table-bordered table-striped display" width="100%" >

View File

@ -0,0 +1,23 @@
package com.xxl.job.dao.impl;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.util.AdminApiUtil;
/**
* Created by xuxueli on 17/5/10.
*/
public class AdminApiTest {
public static void main(String[] args) {
try {
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), "aaa", "112312312312");
ReturnT<String> registryResult = AdminApiUtil.callApi("http://localhost:8080/xxl-job-admin"+AdminApiUtil.REGISTRY, registryParam);
System.out.println(registryResult);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -1,13 +0,0 @@
package com.xxl.job.core.biz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
/**
* Created by xuxueli on 17/3/1.
*/
public interface AdminBiz {
public ReturnT<String> callback(HandleCallbackParam handleCallbackParam);
}

View File

@ -1,7 +1,6 @@
package com.xxl.job.core.biz.model;
import java.io.Serializable;
import java.util.Set;
/**
* Created by xuxueli on 17/3/2.
@ -10,13 +9,11 @@ public class HandleCallbackParam implements Serializable {
private static final long serialVersionUID = 42L;
private int logId;
private Set<String> logAddress;
private ReturnT<String> executeResult;
public HandleCallbackParam(int logId, Set<String> logAddress, ReturnT<String> executeResult) {
public HandleCallbackParam(){}
public HandleCallbackParam(int logId, ReturnT<String> executeResult) {
this.logId = logId;
this.logAddress = logAddress;
this.executeResult = executeResult;
}
@ -28,14 +25,6 @@ public class HandleCallbackParam implements Serializable {
this.logId = logId;
}
public Set<String> getLogAddress() {
return logAddress;
}
public void setLogAddress(Set<String> logAddress) {
this.logAddress = logAddress;
}
public ReturnT<String> getExecuteResult() {
return executeResult;
}
@ -43,4 +32,12 @@ public class HandleCallbackParam implements Serializable {
public void setExecuteResult(ReturnT<String> executeResult) {
this.executeResult = executeResult;
}
@Override
public String toString() {
return "HandleCallbackParam{" +
"logId=" + logId +
", executeResult=" + executeResult +
'}';
}
}

View File

@ -0,0 +1,54 @@
package com.xxl.job.core.biz.model;
import java.io.Serializable;
/**
* Created by xuxueli on 2017-05-10 20:22:42
*/
public class RegistryParam implements Serializable {
private static final long serialVersionUID = 42L;
private String registGroup;
private String registryKey;
private String registryValue;
public RegistryParam(){}
public RegistryParam(String registGroup, String registryKey, String registryValue) {
this.registGroup = registGroup;
this.registryKey = registryKey;
this.registryValue = registryValue;
}
public String getRegistGroup() {
return registGroup;
}
public void setRegistGroup(String registGroup) {
this.registGroup = registGroup;
}
public String getRegistryKey() {
return registryKey;
}
public void setRegistryKey(String registryKey) {
this.registryKey = registryKey;
}
public String getRegistryValue() {
return registryValue;
}
public void setRegistryValue(String registryValue) {
this.registryValue = registryValue;
}
@Override
public String toString() {
return "RegistryParam{" +
"registGroup='" + registGroup + '\'' +
", registryKey='" + registryKey + '\'' +
", registryValue='" + registryValue + '\'' +
'}';
}
}

View File

@ -18,7 +18,8 @@ public class ReturnT<T> implements Serializable {
private int code;
private String msg;
private T content;
public ReturnT(){}
public ReturnT(int code, String msg) {
this.code = code;
this.msg = msg;

View File

@ -1,7 +1,6 @@
package com.xxl.job.core.biz.model;
import java.io.Serializable;
import java.util.Set;
/**
* Created by xuxueli on 16/7/22.
@ -22,8 +21,6 @@ public class TriggerParam implements Serializable{
private int logId;
private long logDateTim;
private Set<String> logAddress;
public int getJobId() {
return jobId;
}
@ -96,11 +93,18 @@ public class TriggerParam implements Serializable{
this.logDateTim = logDateTim;
}
public Set<String> getLogAddress() {
return logAddress;
}
public void setLogAddress(Set<String> logAddress) {
this.logAddress = logAddress;
@Override
public String toString() {
return "TriggerParam{" +
"jobId=" + jobId +
", executorHandler='" + executorHandler + '\'' +
", executorParams='" + executorParams + '\'' +
", executorBlockStrategy='" + executorBlockStrategy + '\'' +
", glueType='" + glueType + '\'' +
", glueSource='" + glueSource + '\'' +
", glueUpdatetime=" + glueUpdatetime +
", logId=" + logId +
", logDateTim=" + logDateTim +
'}';
}
}

View File

@ -0,0 +1,13 @@
package com.xxl.job.core.enums;
/**
* Created by xuxueli on 17/5/10.
*/
public class RegistryConfig {
public static final int BEAT_TIMEOUT = 30;
public static final int DEAD_TIMEOUT = BEAT_TIMEOUT * 3;
public enum RegistType{ EXECUTOR, ADMIN }
}

View File

@ -4,7 +4,6 @@ import com.xxl.job.core.biz.ExecutorBiz;
import com.xxl.job.core.biz.impl.ExecutorBizImpl;
import com.xxl.job.core.handler.IJobHandler;
import com.xxl.job.core.handler.annotation.JobHander;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.rpc.netcom.NetComServerFactory;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import com.xxl.job.core.thread.JobThread;
@ -30,7 +29,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
private String ip;
private int port = 9999;
private String appName;
private RegistHelper registHelper;
public static String adminAddresses;
public static String logPath;
public void setIp(String ip) {
@ -42,8 +41,8 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
public void setAppName(String appName) {
this.appName = appName;
}
public void setRegistHelper(RegistHelper registHelper) {
this.registHelper = registHelper;
public void setAdminAddresses(String adminAddresses) {
this.adminAddresses = adminAddresses;
}
public void setLogPath(String logPath) {
this.logPath = logPath;
@ -54,7 +53,7 @@ public class XxlJobExecutor implements ApplicationContextAware, ApplicationListe
public void start() throws Exception {
// executor start
NetComServerFactory.putService(ExecutorBiz.class, new ExecutorBizImpl());
serverFactory.start(port, ip, appName, registHelper);
serverFactory.start(port, ip, appName);
// trigger callback thread start
TriggerCallbackThread.getInstance().start();

View File

@ -1,13 +0,0 @@
package com.xxl.job.core.registry;
/**
* Created by xuxueli on 16/9/30.
*/
public interface RegistHelper {
public static final int TIMEOUT = 15;
public enum RegistType{ EXECUTOR, ADMIN }
public int registry(String registGroup, String registryKey, String registryValue);
}

View File

@ -1,28 +0,0 @@
package com.xxl.job.core.registry.impl;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.util.DBUtil;
import javax.sql.DataSource;
/**
* Created by xuxueli on 16/9/30.
*/
public class DbRegistHelper implements RegistHelper {
private DataSource dataSource;
public void setDataSource(DataSource dataSource) {
this.dataSource = dataSource;
}
@Override
public int registry(String registGroup, String registryKey, String registryValue) {
String updateSql = "UPDATE XXL_JOB_QRTZ_TRIGGER_REGISTRY SET `update_time` = NOW() WHERE `registry_group` = ? AND `registry_key` = ? AND `registry_value` = ?";
String insertSql = "INSERT INTO XXL_JOB_QRTZ_TRIGGER_REGISTRY( `registry_group` , `registry_key` , `registry_value`, `update_time`) VALUES(? , ? , ?, NOW())";
int ret = DBUtil.update(dataSource, updateSql, new Object[]{registGroup, registryKey, registryValue});
if (ret<1) {
ret = DBUtil.update(dataSource, insertSql, new Object[]{registGroup, registryKey, registryValue});
}
return ret;
}
}

View File

@ -1,7 +1,6 @@
package com.xxl.job.core.rpc.netcom;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.rpc.codec.RpcRequest;
import com.xxl.job.core.rpc.codec.RpcResponse;
import com.xxl.job.core.rpc.netcom.jetty.server.JettyServer;
@ -22,8 +21,8 @@ public class NetComServerFactory {
// ---------------------- server start ----------------------
JettyServer server = new JettyServer();
public void start(int port, String ip, String appName, RegistHelper registHelper) throws Exception {
server.start(port, ip, appName, registHelper);
public void start(int port, String ip, String appName) throws Exception {
server.start(port, ip, appName);
}
// ---------------------- server destroy ----------------------

View File

@ -1,6 +1,5 @@
package com.xxl.job.core.rpc.netcom.jetty.server;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.thread.ExecutorRegistryThread;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Handler;
@ -20,7 +19,7 @@ public class JettyServer {
private Server server;
private Thread thread;
public void start(final int port, final String ip, final String appName, final RegistHelper registHelper) throws Exception {
public void start(final int port, final String ip, final String appName) throws Exception {
thread = new Thread(new Runnable() {
@Override
public void run() {
@ -42,7 +41,7 @@ public class JettyServer {
// Start the server
server.start();
logger.info(">>>>>>>>>>>> xxl-job jetty server start success at port:{}.", port);
ExecutorRegistryThread.getInstance().start(port, ip, appName, registHelper);
ExecutorRegistryThread.getInstance().start(port, ip, appName);
server.join(); // block until thread stopped
logger.info(">>>>>>>>>>> xxl-rpc server join success, netcon={}, port={}", JettyServer.class.getName(), port);
} catch (Exception e) {

View File

@ -1,7 +1,12 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.biz.model.RegistryParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.enums.RegistryConfig;
import com.xxl.job.core.util.AdminApiUtil;
import com.xxl.job.core.util.IpUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit;
@ -9,6 +14,7 @@ import java.util.concurrent.TimeUnit;
* Created by xuxueli on 17/3/2.
*/
public class ExecutorRegistryThread extends Thread {
private static Logger logger = LoggerFactory.getLogger(ExecutorRegistryThread.class);
private static ExecutorRegistryThread instance = new ExecutorRegistryThread();
public static ExecutorRegistryThread getInstance(){
@ -17,27 +23,36 @@ public class ExecutorRegistryThread extends Thread {
private Thread registryThread;
private boolean toStop = false;
public void start(final int port, final String ip, final String appName, final RegistHelper registHelper){
if (registHelper==null && appName==null || appName.trim().length()==0) {
public void start(final int port, final String ip, final String appName){
if (appName==null || appName.trim().length()==0) {
logger.warn(">>>>>>>>>>>> xxl-job, executor registry config fail");
return;
}
// executor address (generate addredd = ip:port)
final String executorAddress;
if (ip != null && ip.trim().length()>0) {
executorAddress = ip.trim().concat(":").concat(String.valueOf(port));
} else {
executorAddress = IpUtil.getIpPort(port);
}
registryThread = new Thread(new Runnable() {
@Override
public void run() {
while (!toStop) {
try {
// generate addredd = ip:port
String address = null;
if (ip != null && ip.trim().length()>0) {
address = ip.trim().concat(":").concat(String.valueOf(port));
} else {
address = IpUtil.getIpPort(port);
}
registHelper.registry(RegistHelper.RegistType.EXECUTOR.name(), appName, address);
TimeUnit.SECONDS.sleep(RegistHelper.TIMEOUT);
RegistryParam registryParam = new RegistryParam(RegistryConfig.RegistType.EXECUTOR.name(), appName, executorAddress);
ReturnT<String> registryResult = AdminApiUtil.callApiFailover(AdminApiUtil.REGISTRY, registryParam);
logger.info(">>>>>>>>>>> xxl-job registry, RegistryParam:{}, registryResult:{}", new Object[]{registryParam.toString(), registryResult.toString()});
} catch (Exception e) {
e.printStackTrace();
logger.error(">>>>>>>>>>> xxl-job ExecutorRegistryThread Exception:", e);
}
try {
TimeUnit.SECONDS.sleep(RegistryConfig.BEAT_TIMEOUT);
} catch (InterruptedException e) {
logger.error(e.getMessage(), e);
}
}
}

View File

@ -131,11 +131,11 @@ public class JobThread extends Thread{
// callback handler info
if (!toStop) {
// commonm
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), executeResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult));
} else {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}
} catch (Exception e) {
@ -156,7 +156,7 @@ public class JobThread extends Thread{
if (triggerParam!=null) {
// is killed
ReturnT<String> stopResult = new ReturnT<String>(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]");
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), triggerParam.getLogAddress(), stopResult));
TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult));
}
}

View File

@ -1,9 +1,8 @@
package com.xxl.job.core.thread;
import com.xxl.job.core.biz.AdminBiz;
import com.xxl.job.core.biz.model.HandleCallbackParam;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import com.xxl.job.core.util.AdminApiUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -33,19 +32,12 @@ public class TriggerCallbackThread {
try {
HandleCallbackParam callback = getInstance().callBackQueue.take();
if (callback != null) {
for (String address : callback.getLogAddress()) {
try {
// callback
AdminBiz adminBiz = (AdminBiz) new NetComClientProxy(AdminBiz.class, address).getObject();
ReturnT<String> callbackResult = adminBiz.callback(callback);
logger.info(">>>>>>>>>>> xxl-job callback , CallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()});
if (ReturnT.SUCCESS_CODE == callbackResult.getCode()) {
break;
}
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e);
}
// callback
try {
ReturnT<String> callbackResult = AdminApiUtil.callApiFailover(AdminApiUtil.CALLBACK, callback);
logger.info(">>>>>>>>>>> xxl-job callback, HandleCallbackParam:{}, callbackResult:{}", new Object[]{callback.toString(), callbackResult.toString()});
} catch (Exception e) {
logger.error(">>>>>>>>>>> xxl-job TriggerCallbackThread Exception:", e);
}
}
} catch (Exception e) {

View File

@ -0,0 +1,116 @@
package com.xxl.job.core.util;
import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.executor.XxlJobExecutor;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* @author xuxueli 2017-05-10 21:28:15
*/
public class AdminApiUtil {
private static Logger logger = LoggerFactory.getLogger(AdminApiUtil.class);
public static final String CALLBACK = "/api/callback";
public static final String REGISTRY = "/api/registry";
public static ReturnT<String> callApiFailover(String subUrl, Object requestObj) throws Exception {
// admin assress list
List<String> adminAddressList = new ArrayList<String>();
if (XxlJobExecutor.adminAddresses != null) {
for (String adminAddressItem: XxlJobExecutor.adminAddresses.split(",")) {
if (adminAddressItem.trim().length()>0 && !adminAddressList.contains(adminAddressItem)) {
adminAddressList.add(adminAddressItem);
}
}
}
if (adminAddressList==null || adminAddressList.size()==0) {
return ReturnT.FAIL;
}
for (String adminAddress: adminAddressList) {
ReturnT<String> registryResult = null;
try {
String apiUrl = adminAddress.concat(subUrl);
registryResult = callApi(apiUrl, requestObj);
} catch (Exception e) {
logger.error(e.getMessage(), e);
}
if (registryResult!=null && registryResult.getCode()==ReturnT.SUCCESS_CODE) {
return ReturnT.SUCCESS;
}
}
return ReturnT.FAIL;
}
public static ReturnT<String> callApi(String finalUrl, Object requestObj) throws Exception {
HttpPost httpPost = new HttpPost(finalUrl);
CloseableHttpClient httpClient = HttpClients.createDefault();
try {
// timeout
RequestConfig requestConfig = RequestConfig.custom()
.setConnectionRequestTimeout(10000)
.setSocketTimeout(10000)
.setConnectTimeout(10000)
.build();
httpPost.setConfig(requestConfig);
// data
if (requestObj != null) {
String json = JacksonUtil.writeValueAsString(requestObj);
StringEntity entity = new StringEntity(json, "utf-8");
entity.setContentEncoding("UTF-8");
entity.setContentType("application/json");
httpPost.setEntity(entity);
}
// do post
HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity();
if (null != entity) {
if (response.getStatusLine().getStatusCode() != 200) {
EntityUtils.consume(entity);
return ReturnT.FAIL;
}
String responseMsg = EntityUtils.toString(entity, "UTF-8");
EntityUtils.consume(entity);
if (responseMsg!=null && responseMsg.startsWith("{")) {
ReturnT<String> result = JacksonUtil.readValue(responseMsg, ReturnT.class);
return result;
}
}
return ReturnT.FAIL;
} catch (Exception e) {
logger.error("", e);
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
} finally {
if (httpPost!=null) {
httpPost.releaseConnection();
}
try {
httpClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}

View File

@ -105,5 +105,5 @@ public class HttpClientUtil {
}
return new byte[] {};
}
}

View File

@ -0,0 +1,93 @@
package com.xxl.job.core.util;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
import org.codehaus.jackson.map.JsonMappingException;
import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.type.TypeReference;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
/**
* Jackson util
*
* 1obj need private and set/get
* 2do not support inner class
*
* @author xuxueli 2015-9-25 18:02:56
*/
public class JacksonUtil {
private final static ObjectMapper objectMapper = new ObjectMapper();
public static ObjectMapper getInstance() {
return objectMapper;
}
/**
* beanarrayListMap --> json
*
* @param obj
* @return json string
* @throws Exception
*/
public static String writeValueAsString(Object obj) {
try {
return getInstance().writeValueAsString(obj);
} catch (JsonGenerationException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* string --> beanMapList(array)
*
* @param jsonStr
* @param clazz
* @return obj
* @throws Exception
*/
public static <T> T readValue(String jsonStr, Class<T> clazz) {
try {
return getInstance().readValue(jsonStr, clazz);
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static <T> T readValueRefer(String jsonStr, Class<T> clazz) {
try {
return getInstance().readValue(jsonStr, new TypeReference<T>() { });
} catch (JsonParseException e) {
e.printStackTrace();
} catch (JsonMappingException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
public static void main(String[] args) {
try {
Map<String, String> map = new HashMap<String, String>();
map.put("aaa", "111");
map.put("bbb", "222");
String json = writeValueAsString(map);
System.out.println(json);
System.out.println(readValue(json, Map.class));
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -27,34 +27,13 @@
<property name="ip" value="${xxl.job.executor.ip}" />
<!-- 执行器端口号 -->
<property name="port" value="${xxl.job.executor.port}" />
<!-- 执行器AppName为空则关闭自动注册 -->
<property name="appName" value="${xxl.job.executor.appname}" />
<!-- 执行器注册器 -->
<property name="registHelper" >
<!-- 执行器 "DbRegistHelper" 依赖 "XXL-JOB公共数据源" 推荐将其抽象为RPC远程服务, 可取消对JDBC的依赖如不启用执行自动注册功能也可忽略JDBC配置; -->
<bean class="com.xxl.job.core.registry.impl.DbRegistHelper" >
<!-- XXL-JOB公共数据源 -->
<property name="dataSource" ref="xxlJobDataSource" />
</bean>
</property>
<!-- 执行器注册中心地址,为空则关闭自动注册 -->
<property name="adminAddresses" value="${xxl.job.admin.addresses}" />
<!-- 执行器日志路径 -->
<property name="logPath" value="${xxl.job.executor.logpath}" />
</bean>
<!-- ********************************* "XXL-JOB公共数据源" 配置, 仅在启动 "DbRegistHelper" 时才需要, 否则可删除 ********************************* -->
<!-- 配置03、XXL-JOB公共数据源 -->
<bean id="xxlJobDataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" destroy-method="close">
<property name="driverClass" value="${xxl.job.db.driverClass}" />
<property name="jdbcUrl" value="${xxl.job.db.url}" />
<property name="user" value="${xxl.job.db.user}" />
<property name="password" value="${xxl.job.db.password}" />
<property name="initialPoolSize" value="3" />
<property name="minPoolSize" value="2" />
<property name="maxPoolSize" value="10" />
<property name="maxIdleTime" value="60" />
<property name="acquireRetryDelay" value="1000" />
<property name="acquireRetryAttempts" value="10" />
<property name="preferredTestQuery" value="SELECT 1" />
</bean>
</beans>

View File

@ -1,8 +1,5 @@
### xxl-job db
xxl.job.db.driverClass=com.mysql.jdbc.Driver
xxl.job.db.url=jdbc:mysql://localhost:3306/xxl-job?useUnicode=true&characterEncoding=UTF-8
xxl.job.db.user=root
xxl.job.db.password=root_pwd
### xxl-job admin address, such as "http://host01:port01/project,http://host02:port02/project"
xxl.job.admin.addresses=http://127.0.0.1:8080/xxl-job-admin
### xxl-job executor address
xxl.job.executor.appname=xxl-job-executor-example