执行器策略推送

This commit is contained in:
xueli.xue 2017-03-12 21:44:31 +08:00
parent 6169ac546f
commit 177ab8d21e
11 changed files with 184 additions and 141 deletions

View File

@ -737,7 +737,7 @@ Tips: 历史版本(V1.3.x)目前已经Release至稳定版本, 进入维护阶段
- 3、CleanCode清理无效的历史参数
- 4、规范系统配置数据通过配置文件统一管理
- 5、执行器支持手动设置执行地址列表提供开关切换使用注册地址还是手动设置的地址
- 6、执行器路由规则第一个、循环、随机、顺序故障(默认)转移;
- 6、执行器路由规则第一个、最后一个、轮询、随机、一致性HASH、最不经常使用、最近最久未使用、故障转移;
- 7、底层扩展数据接口调整
- 8、新建任务默认为非运行状态

View File

@ -154,7 +154,7 @@ CREATE TABLE `XXL_JOB_QRTZ_TRIGGER_INFO` (
`update_time` datetime DEFAULT NULL,
`author` varchar(64) DEFAULT NULL COMMENT '作者',
`alarm_email` varchar(255) DEFAULT NULL COMMENT '报警邮件',
`executor_route_strategy` varchar(20) DEFAULT NULL COMMENT '执行器路由策略',
`executor_route_strategy` varchar(50) DEFAULT NULL COMMENT '执行器路由策略',
`executor_handler` varchar(255) DEFAULT NULL COMMENT '执行器任务handler',
`executor_param` varchar(255) DEFAULT NULL COMMENT '执行器任务参数',
`glue_switch` int(11) DEFAULT '0' COMMENT 'GLUE模式开关0-否1-是',

View File

@ -3,6 +3,7 @@ package com.xxl.job.admin.core.jobbean;
import com.xxl.job.admin.core.model.XxlJobGroup;
import com.xxl.job.admin.core.model.XxlJobInfo;
import com.xxl.job.admin.core.model.XxlJobLog;
import com.xxl.job.admin.core.route.ExecutorRouteStrategyEnum;
import com.xxl.job.admin.core.schedule.XxlJobDynamicScheduler;
import com.xxl.job.admin.core.thread.JobMonitorHelper;
import com.xxl.job.admin.core.thread.JobRegistryHelper;
@ -11,6 +12,7 @@ import com.xxl.job.core.biz.model.ReturnT;
import com.xxl.job.core.biz.model.TriggerParam;
import com.xxl.job.core.registry.RegistHelper;
import com.xxl.job.core.rpc.netcom.NetComClientProxy;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang.StringUtils;
import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException;
@ -19,7 +21,6 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean;
import java.text.MessageFormat;
import java.util.*;
/**
@ -34,26 +35,23 @@ public class RemoteHttpJobBean extends QuartzJobBean {
@Override
protected void executeInternal(JobExecutionContext context)
throws JobExecutionException {
// load job
JobKey jobKey = context.getTrigger().getJobKey();
Integer jobId = Integer.valueOf(jobKey.getName());
XxlJobInfo jobInfo = XxlJobDynamicScheduler.xxlJobInfoDao.loadById(jobId);
// save log
// log part-1
XxlJobLog jobLog = new XxlJobLog();
jobLog.setJobGroup(jobInfo.getJobGroup());
jobLog.setJobId(jobInfo.getId());
XxlJobDynamicScheduler.xxlJobLogDao.save(jobLog);
logger.debug(">>>>>>>>>>> xxl-job trigger start, jobId:{}", jobLog.getId());
// admin address
List<String> adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name());
Set<String> adminAddressSet = new HashSet<String>();
if (adminAddressList!=null) {
adminAddressSet.addAll(adminAddressList);
}
adminAddressSet.add(XxlJobDynamicScheduler.getCallbackAddress());
// update trigger info 1/2
// log part-2 param
//jobLog.setExecutorAddress(executorAddress);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
jobLog.setTriggerTime(new Date());
// trigger request
@ -64,114 +62,152 @@ public class RemoteHttpJobBean extends QuartzJobBean {
triggerParam.setGlueSwitch((jobInfo.getGlueSwitch()==0)?false:true);
triggerParam.setLogId(jobLog.getId());
triggerParam.setLogDateTim(jobLog.getTriggerTime().getTime());
triggerParam.setLogAddress(adminAddressSet);
triggerParam.setLogAddress(findCallbackAddressList()); // callback address list
// parse address
String groupAddressInfo = "注册方式:";
List<String> addressList = new ArrayList<String>();
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(Integer.valueOf(jobInfo.getJobGroup()));
if (group!=null) {
if (group.getAddressType() == 0) {
groupAddressInfo += "自动注册";
addressList = JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName());
} else {
groupAddressInfo += "手动录入";
if (StringUtils.isNotBlank(group.getAddressList())) {
addressList = Arrays.asList(group.getAddressList().split(","));
}
}
groupAddressInfo += ",地址列表:" + addressList.toString();
}
groupAddressInfo += "<br><br>";
// do trigger
ReturnT<String> triggerResult = doTrigger(triggerParam, jobInfo, jobLog);
// failover trigger
ReturnT<String> triggerResult = failoverTrigger(addressList, triggerParam, jobLog);
jobLog.setExecutorHandler(jobInfo.getExecutorHandler());
jobLog.setExecutorParam(jobInfo.getExecutorParam());
logger.info(">>>>>>>>>>> xxl-job failoverTrigger, jobId:{}, triggerResult:{}", jobLog.getId(), triggerResult.toString());
// update trigger info 2/2
// log part-2
jobLog.setTriggerCode(triggerResult.getCode());
jobLog.setTriggerMsg(groupAddressInfo + triggerResult.getMsg());
jobLog.setTriggerMsg(triggerResult.getMsg());
XxlJobDynamicScheduler.xxlJobLogDao.updateTriggerInfo(jobLog);
// monitor triger
JobMonitorHelper.monitor(jobLog.getId());
logger.debug(">>>>>>>>>>> xxl-job trigger end, jobId:{}", jobLog.getId());
}
public ReturnT<String> doTrigger(TriggerParam triggerParam, XxlJobInfo jobInfo, XxlJobLog jobLog){
StringBuffer triggerSb = new StringBuffer();
/**
* failover for trigger remote address
* @return
*/
public ReturnT<String> failoverTrigger(List<String> addressList, TriggerParam triggerParam, XxlJobLog jobLog){
if (addressList==null || addressList.size() < 1) {
return new ReturnT<String>(ReturnT.FAIL_CODE, "Trigger error, <br>>>>[address] is null <br><hr>");
} else if (addressList.size() == 1) {
String address = addressList.get(0);
// store real address
jobLog.setExecutorAddress(address);
// exerutor address list
ArrayList<String> addressList = null;
XxlJobGroup group = XxlJobDynamicScheduler.xxlJobGroupDao.load(jobInfo.getJobGroup());
if (group.getAddressType() == 0) {
triggerSb.append("注册方式:自动注册");
addressList = (ArrayList<String>) JobRegistryHelper.discover(RegistHelper.RegistType.EXECUTOR.name(), group.getAppName());
} else {
triggerSb.append("注册方式:手动录入");
if (StringUtils.isNotBlank(group.getAddressList())) {
addressList = new ArrayList<String>(Arrays.asList(group.getAddressList().split(",")));
}
}
triggerSb.append("<br>地址列表:").append(addressList!=null?addressList.toString():"");
if (CollectionUtils.isEmpty(addressList)) {
triggerSb.append("<hr>调度失败:").append("执行器地址为空");
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
}
// real trigger
ExecutorBiz executorBiz = null;
try {
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
ReturnT<String> runResult = executorBiz.run(triggerParam);
// trigger remote executor
if (addressList.size() == 1) {
String address = addressList.get(0);
jobLog.setExecutorAddress(address);
String failoverMessage = MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[code] : {1}, <br>>>>[msg] : {2} <br><hr>",
address, runResult.getCode(), runResult.getMsg());
runResult.setMsg(runResult.getMsg() + failoverMessage);
return runResult;
} else {
ReturnT<String> runResult = runExecutor(triggerParam, address);
triggerSb.append("<hr>").append(runResult.getMsg());
// for ha
Collections.shuffle(addressList);
// for failover
String failoverMessage = "";
for (String address : addressList) {
if (StringUtils.isNotBlank(address)) {
ExecutorBiz executorBiz = null;
try {
executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
} catch (Exception e) {
e.printStackTrace();
return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
}
// beat check
ReturnT<String> beatResult = executorBiz.beat();
failoverMessage += MessageFormat.format("BEAT running, <br>>>>[address] : {0}, <br>>>>[code] : {1}, <br>>>>[msg] : {2} <br><hr>",
address, beatResult.getCode(), beatResult.getMsg());
// beat success, trigger do
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
// store real address
jobLog.setExecutorAddress(address);
// real trigger
ReturnT<String> runResult = executorBiz.run(triggerParam);
failoverMessage += MessageFormat.format("Trigger running, <br>>>>[address] : {0}, <br>>>>[status] : {1}, <br>>>>[msg] : {2} <br><hr>",
address, runResult.getCode(), runResult.getMsg());
runResult.setMsg( runResult.getMsg() + failoverMessage);
return runResult;
}
}
return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
} else {
// executor route strategy
ExecutorRouteStrategyEnum executorRouteStrategyEnum = ExecutorRouteStrategyEnum.match(jobInfo.getExecutorRouteStrategy(), null);
triggerSb.append("<br>路由策略:").append(executorRouteStrategyEnum!=null?(executorRouteStrategyEnum.name() + "-" + executorRouteStrategyEnum.getTitle()):null);
if (executorRouteStrategyEnum == null) {
triggerSb.append("<hr>调度失败:").append("执行器路由策略为空");
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
}
return new ReturnT<String>(ReturnT.FAIL_CODE, failoverMessage);
if (executorRouteStrategyEnum != ExecutorRouteStrategyEnum.FAILOVER) {
// get address
String address = executorRouteStrategyEnum.getRouter().route(jobInfo.getId(), addressList);
jobLog.setExecutorAddress(address);
// run
ReturnT<String> runResult = runExecutor(triggerParam, address);
triggerSb.append("<hr>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
} else {
for (String address : addressList) {
// beat
ReturnT<String> beatResult = beatExecutor(address);
triggerSb.append("<hr>").append(beatResult.getMsg());
if (beatResult.getCode() == ReturnT.SUCCESS_CODE) {
jobLog.setExecutorAddress(address);
ReturnT<String> runResult = runExecutor(triggerParam, address);
triggerSb.append("<hr>").append(runResult.getMsg());
return new ReturnT<String>(runResult.getCode(), triggerSb.toString());
}
}
return new ReturnT<String>(ReturnT.FAIL_CODE, triggerSb.toString());
}
}
}
/**
* run executor
* @param address
* @return
*/
public ReturnT<String> beatExecutor(String address){
ReturnT<String> beatResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
beatResult = executorBiz.beat();
} catch (Exception e) {
logger.error("", e);
beatResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
StringBuffer sb = new StringBuffer("心跳检测:");
sb.append("<br>address").append(address);
sb.append("<br>code").append(beatResult.getCode());
sb.append("<br>msg").append(beatResult.getMsg());
beatResult.setMsg(sb.toString());
return beatResult;
}
/**
* run executor
* @param triggerParam
* @param address
* @return
*/
public ReturnT<String> runExecutor(TriggerParam triggerParam, String address){
ReturnT<String> runResult = null;
try {
ExecutorBiz executorBiz = (ExecutorBiz) new NetComClientProxy(ExecutorBiz.class, address).getObject();
runResult = executorBiz.run(triggerParam);
} catch (Exception e) {
logger.error("", e);
runResult = new ReturnT<String>(ReturnT.FAIL_CODE, ""+e );
}
StringBuffer sb = new StringBuffer("触发调度:");
sb.append("<br>address").append(address);
sb.append("<br>code").append(runResult.getCode());
sb.append("<br>msg").append(runResult.getMsg());
runResult.setMsg(sb.toString());
return runResult;
}
/**
* find callback address list
* @return
*/
public Set<String> findCallbackAddressList(){
Set<String> adminAddressSet = new HashSet<String>();
adminAddressSet.add(XxlJobDynamicScheduler.getCallbackAddress());
List<String> adminAddressList = JobRegistryHelper.discover(RegistHelper.RegistType.ADMIN.name(), RegistHelper.RegistType.ADMIN.name());
if (adminAddressList!=null) {
adminAddressSet.addAll(adminAddressList);
}
return adminAddressSet;
}
}

View File

@ -14,6 +14,7 @@ public class ExecutorRouteRandom extends ExecutorRouter {
@Override
public String route(int jobId, ArrayList<String> addressList) {
// Collections.shuffle(addressList);
return addressList.get(localRandom.nextInt(addressList.size()));
}

View File

@ -17,7 +17,9 @@ public interface IXxlJobLogDao {
public XxlJobLog load(int id);
public int save(XxlJobLog xxlJobLog);
public int updateTriggerInfo(XxlJobLog xxlJobLog);
public int updateHandleInfo(XxlJobLog xxlJobLog);
public int delete(int jobId);

View File

@ -88,16 +88,10 @@
<insert id="save" parameterType="com.xxl.job.admin.core.model.XxlJobLog" useGeneratedKeys="true" keyProperty="id" >
INSERT INTO XXL_JOB_QRTZ_TRIGGER_LOG (
`job_group`,
`job_id`,
`executor_address`,
`executor_handler`,
`executor_param`
`job_id`
) VALUES (
#{jobGroup},
#{jobId},
#{executorAddress},
#{executorHandler},
#{executorParam}
#{jobId}
);
<selectKey resultType="java.lang.Integer" order="AFTER" keyProperty="id">
SELECT LAST_INSERT_ID()

View File

@ -78,12 +78,12 @@
<th name="id" >id</th>
<th name="jobGroup" >执行器ID</th>
<th name="jobId" >任务ID</th>
<th name="triggerTime" >调度时间</th>
<th name="triggerCode" >调度结果</th>
<th name="triggerMsg" >调度备注</th>
<th name="executorAddress" >执行器地址</th>
<th name="executorHandler" >JobHandler</th>
<th name="executorParam" >任务参数</th>
<th name="triggerTime" >调度时间</th>
<th name="triggerCode" >调度结果</th>
<th name="triggerMsg" >调度备注</th>
<th name="handleTime" >执行时间</th>
<th name="handleCode" >执行结果</th>
<th name="handleMsg" >执行备注</th>

View File

@ -138,7 +138,7 @@ $(function() {
"sProcessing" : "处理中...",
"sLengthMenu" : "每页 _MENU_ 条记录",
"sZeroRecords" : "没有匹配结果",
"sInfo" : "第 _PAGE_ 页 ( 总共 _PAGES_ 页 )",
"sInfo" : "第 _PAGE_ 页 ( 总共 _PAGES_ 页_TOTAL_ 条记录 )",
"sInfoEmpty" : "无记录",
"sInfoFiltered" : "(由 _MAX_ 项结果过滤)",
"sInfoPostFix" : "",

View File

@ -81,6 +81,25 @@ $(function() {
{ "data": 'id', "bSortable": false, "visible" : false},
{ "data": 'jobGroup', "visible" : false},
{ "data": 'jobId', "visible" : false},
{
"data": 'triggerTime',
"render": function ( data, type, row ) {
return data?moment(new Date(data)).format("YYYY-MM-DD HH:mm:ss"):"";
}
},
{
"data": 'triggerCode',
"render": function ( data, type, row ) {
return (data==200)?'<span style="color: green">成功</span>':(data==500)?'<span style="color: red">失败</span>':(data==0)?'':data;
}
},
{
"data": 'triggerMsg',
"render": function ( data, type, row ) {
return data?'<a class="logTips" href="javascript:;" >查看<span style="display:none;">'+ data +'</span></a>':"无";
}
},
{ "data": 'executorAddress', "visible" : true},
{
"data": 'executorHandler',
@ -90,25 +109,7 @@ $(function() {
}
},
{ "data": 'executorParam', "visible" : true},
{
"data": 'triggerTime',
"render": function ( data, type, row ) {
return data?moment(new Date(data)).format("YYYY-MM-DD HH:mm:ss"):"";
}
},
{
"data": 'triggerCode',
"render": function ( data, type, row ) {
return (data==200)?'<span style="color: green">成功</span>':(data==500)?'<span style="color: red">失败</span>':(data==0)?'':data;
}
},
{
"data": 'triggerMsg',
"render": function ( data, type, row ) {
return data?'<a class="logTips" href="javascript:;" >查看<span style="display:none;">'+ data +'</span></a>':"无";
}
},
{
"data": 'handleTime',
"render": function ( data, type, row ) {
@ -147,7 +148,7 @@ $(function() {
"sProcessing" : "处理中...",
"sLengthMenu" : "每页 _MENU_ 条记录",
"sZeroRecords" : "没有匹配结果",
"sInfo" : "第 _PAGE_ 页 ( 总共 _PAGES_ 页 )",
"sInfo" : "第 _PAGE_ 页 ( 总共 _PAGES_ 页_TOTAL_ 条记录 )",
"sInfoEmpty" : "无记录",
"sInfoFiltered" : "(由 _MAX_ 项结果过滤)",
"sInfoPostFix" : "",

View File

@ -1,5 +1,7 @@
package com.xxl.job.core.util;
import com.xxl.job.core.rpc.codec.RpcResponse;
import com.xxl.job.core.rpc.serialize.HessianSerializer;
import org.apache.http.HttpEntity;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpPost;
@ -8,6 +10,8 @@ import org.apache.http.entity.ContentType;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
@ -18,6 +22,7 @@ import java.io.InputStream;
* @author xuxueli 2015-10-31 19:50:41
*/
public class HttpClientUtil {
private static Logger logger = LoggerFactory.getLogger(HttpClientUtil.class);
/**
* post request
@ -47,7 +52,11 @@ public class HttpClientUtil {
EntityUtils.consume(entity);
}
} catch (Exception e) {
e.printStackTrace();
logger.error("", e);
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setError(e.getMessage());
responseBytes = HessianSerializer.serialize(rpcResponse);
} finally {
httpPost.releaseConnection();
try {