HTTP远程调度交互重构

This commit is contained in:
xueli.xue 2016-03-12 18:04:57 +08:00
parent 1b6b8c2038
commit 53fd312d37
8 changed files with 83 additions and 76 deletions

View File

@ -16,8 +16,8 @@ import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody; import org.springframework.web.bind.annotation.ResponseBody;
import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.constant.Constants.JobGroupEnum; import com.xxl.job.core.constant.Constants.JobGroupEnum;
import com.xxl.job.core.model.ReturnT;
import com.xxl.job.core.model.XxlJobLog; import com.xxl.job.core.model.XxlJobLog;
import com.xxl.job.dao.IXxlJobLogDao; import com.xxl.job.dao.IXxlJobLogDao;
@ -73,16 +73,19 @@ public class JobLogController {
@RequestMapping("/save") @RequestMapping("/save")
@ResponseBody @ResponseBody
public ReturnT<String> triggerLog(int trigger_log_id, String status, String msg) { public RemoteCallBack triggerLog(int trigger_log_id, String status, String msg) {
RemoteCallBack callBack = new RemoteCallBack();
callBack.setStatus(RemoteCallBack.FAIL);
XxlJobLog log = xxlJobLogDao.load(trigger_log_id); XxlJobLog log = xxlJobLogDao.load(trigger_log_id);
if (log!=null) { if (log!=null) {
log.setHandleTime(new Date()); log.setHandleTime(new Date());
log.setHandleStatus(status); log.setHandleStatus(status);
log.setHandleMsg(msg); log.setHandleMsg(msg);
xxlJobLogDao.updateHandleInfo(log); xxlJobLogDao.updateHandleInfo(log);
return ReturnT.SUCCESS; callBack.setStatus(RemoteCallBack.SUCCESS);
return callBack;
} }
return ReturnT.FAIL; return callBack;
} }
} }

View File

@ -11,7 +11,7 @@ import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.model.XxlJobInfo; import com.xxl.job.core.model.XxlJobInfo;
import com.xxl.job.core.model.XxlJobLog; import com.xxl.job.core.model.XxlJobLog;
import com.xxl.job.core.util.DynamicSchedulerUtil; import com.xxl.job.core.util.DynamicSchedulerUtil;
@ -40,13 +40,13 @@ public class JobMonitorHelper {
if (jobLogId != null && jobLogId > 0) { if (jobLogId != null && jobLogId > 0) {
XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId); XxlJobLog log = DynamicSchedulerUtil.xxlJobLogDao.load(jobLogId);
if (log!=null) { if (log!=null) {
if (HttpUtil.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) { if (RemoteCallBack.SUCCESS.equals(log.getTriggerStatus()) && StringUtils.isBlank(log.getHandleStatus())) {
JobMonitorHelper.monitor(jobLogId); JobMonitorHelper.monitor(jobLogId);
} }
if (HttpUtil.SUCCESS.equals(log.getTriggerStatus()) && HttpUtil.SUCCESS.equals(log.getHandleStatus())) { if (RemoteCallBack.SUCCESS.equals(log.getTriggerStatus()) && RemoteCallBack.SUCCESS.equals(log.getHandleStatus())) {
// pass // pass
} }
if (HttpUtil.FAIL.equals(log.getTriggerStatus()) || HttpUtil.FAIL.equals(log.getHandleStatus())) { if (RemoteCallBack.FAIL.equals(log.getTriggerStatus()) || RemoteCallBack.FAIL.equals(log.getHandleStatus())) {
String monotorKey = log.getJobGroup().concat("_").concat(log.getJobName()); String monotorKey = log.getJobGroup().concat("_").concat(log.getJobName());
Integer count = countMap.get(monotorKey); Integer count = countMap.get(monotorKey);
if (count == null) { if (count == null) {

View File

@ -15,7 +15,7 @@ import org.slf4j.LoggerFactory;
import org.springframework.scheduling.quartz.QuartzJobBean; import org.springframework.scheduling.quartz.QuartzJobBean;
import com.xxl.job.client.handler.HandlerRepository; import com.xxl.job.client.handler.HandlerRepository;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.client.util.JacksonUtil; import com.xxl.job.client.util.JacksonUtil;
import com.xxl.job.core.model.XxlJobInfo; import com.xxl.job.core.model.XxlJobInfo;
import com.xxl.job.core.model.XxlJobLog; import com.xxl.job.core.model.XxlJobLog;
@ -60,14 +60,14 @@ public abstract class LocalNomalJobBean extends QuartzJobBean {
} }
jobLog.setTriggerTime(new Date()); jobLog.setTriggerTime(new Date());
jobLog.setTriggerStatus(HttpUtil.SUCCESS); jobLog.setTriggerStatus(RemoteCallBack.SUCCESS);
jobLog.setTriggerMsg(null); jobLog.setTriggerMsg(null);
try { try {
Object responseMsg = this.handle(handlerParams); Object responseMsg = this.handle(handlerParams);
jobLog.setHandleTime(new Date()); jobLog.setHandleTime(new Date());
jobLog.setHandleStatus(HttpUtil.SUCCESS); jobLog.setHandleStatus(RemoteCallBack.SUCCESS);
jobLog.setHandleMsg(JacksonUtil.writeValueAsString(responseMsg)); jobLog.setHandleMsg(JacksonUtil.writeValueAsString(responseMsg));
} catch (Exception e) { } catch (Exception e) {
logger.info("HandlerThread Exception:", e); logger.info("HandlerThread Exception:", e);
@ -75,7 +75,7 @@ public abstract class LocalNomalJobBean extends QuartzJobBean {
e.printStackTrace(new PrintWriter(out)); e.printStackTrace(new PrintWriter(out));
jobLog.setHandleTime(new Date()); jobLog.setHandleTime(new Date());
jobLog.setHandleStatus(HttpUtil.FAIL); jobLog.setHandleStatus(RemoteCallBack.FAIL);
jobLog.setHandleMsg(out.toString()); jobLog.setHandleMsg(out.toString());
} }

View File

@ -4,7 +4,6 @@ import java.util.Date;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.quartz.DisallowConcurrentExecution; import org.quartz.DisallowConcurrentExecution;
import org.quartz.JobExecutionContext; import org.quartz.JobExecutionContext;
import org.quartz.JobExecutionException; import org.quartz.JobExecutionException;
@ -15,6 +14,7 @@ import org.springframework.scheduling.quartz.QuartzJobBean;
import com.xxl.job.client.handler.HandlerRepository; import com.xxl.job.client.handler.HandlerRepository;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil;
import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.client.util.JacksonUtil; import com.xxl.job.client.util.JacksonUtil;
import com.xxl.job.core.model.XxlJobInfo; import com.xxl.job.core.model.XxlJobInfo;
import com.xxl.job.core.model.XxlJobLog; import com.xxl.job.core.model.XxlJobLog;
@ -60,35 +60,25 @@ public class RemoteHttpJobBean extends QuartzJobBean {
params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME)); params.put(HandlerRepository.HANDLER_NAME, jobDataMap.get(HandlerRepository.HANDLER_NAME));
params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS)); params.put(HandlerRepository.HANDLER_PARAMS, jobDataMap.get(HandlerRepository.HANDLER_PARAMS));
// handler address, netty or servlet // handler address, jetty or servlet
String handler_address = jobDataMap.get(HandlerRepository.HANDLER_ADDRESS); String handler_address = jobDataMap.get(HandlerRepository.HANDLER_ADDRESS);
if (!handler_address.startsWith("http")){ if (!handler_address.startsWith("http")){
handler_address = "http://" + handler_address + "/"; handler_address = "http://" + handler_address + "/";
} }
String[] postResp = HttpUtil.post(handler_address, params); RemoteCallBack callback = HttpUtil.post(handler_address, params);
logger.info(">>>>>>>>>>> xxl-job trigger http response, jobLog.id:{}, jobLog:{}", jobLog.getId(), jobLog); logger.info(">>>>>>>>>>> xxl-job trigger http response, jobLog.id:{}, jobLog:{}, callback:{}", jobLog.getId(), jobLog, callback);
// parse trigger response
String responseMsg = postResp[0];
String exceptionMsg = postResp[1];
jobLog.setTriggerTime(new Date());
jobLog.setTriggerStatus(HttpUtil.FAIL);
jobLog.setTriggerMsg("[responseMsg]:"+responseMsg+"<br>[exceptionMsg]:"+exceptionMsg);
if (StringUtils.isNotBlank(responseMsg) && responseMsg.indexOf("{")>-1 ) {
Map<String, String> responseMap = JacksonUtil.readValue(responseMsg, Map.class);
if (responseMap!=null && StringUtils.isNotBlank(responseMap.get(HttpUtil.status))) {
jobLog.setTriggerStatus(responseMap.get(HttpUtil.status));
jobLog.setTriggerMsg(responseMap.get(HttpUtil.msg));
}
}
// update trigger info // update trigger info
jobLog.setTriggerTime(new Date());
jobLog.setTriggerStatus(callback.getStatus());
jobLog.setTriggerMsg(callback.getMsg());
DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog); DynamicSchedulerUtil.xxlJobLogDao.updateTriggerInfo(jobLog);
JobMonitorHelper.monitor(jobLog.getId());
logger.info(">>>>>>>>>>> xxl-job trigger end, jobLog.id:{}, jobLog:{}", jobLog.getId(), jobLog);
// monitor triger
JobMonitorHelper.monitor(jobLog.getId());
logger.info(">>>>>>>>>>> xxl-job trigger end, jobLog.id:{}, jobLog:{}", jobLog.getId(), jobLog);
} }
} }

View File

@ -11,7 +11,7 @@ import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import com.xxl.job.client.handler.IJobHandler; import com.xxl.job.client.handler.IJobHandler;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.core.model.XxlJobLog; import com.xxl.job.core.model.XxlJobLog;
import com.xxl.job.dao.IXxlJobLogDao; import com.xxl.job.dao.IXxlJobLogDao;
@ -41,7 +41,7 @@ public class XxlJobLogTest {
public void updateTriggerInfo(){ public void updateTriggerInfo(){
XxlJobLog xxlJobLog = xxlJobLogDao.load(29); XxlJobLog xxlJobLog = xxlJobLogDao.load(29);
xxlJobLog.setTriggerTime(new Date()); xxlJobLog.setTriggerTime(new Date());
xxlJobLog.setTriggerStatus(HttpUtil.SUCCESS); xxlJobLog.setTriggerStatus(RemoteCallBack.SUCCESS);
xxlJobLog.setTriggerMsg("trigger msg"); xxlJobLog.setTriggerMsg("trigger msg");
xxlJobLogDao.updateTriggerInfo(xxlJobLog); xxlJobLogDao.updateTriggerInfo(xxlJobLog);
} }

View File

@ -1,13 +1,12 @@
package com.xxl.job.client.handler; package com.xxl.job.client.handler;
import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
import com.xxl.job.client.util.JacksonUtil; import com.xxl.job.client.util.JacksonUtil;
/** /**
@ -38,9 +37,9 @@ public class HandlerRepository {
public static String pushHandleQueue(Map<String, String> _param) { public static String pushHandleQueue(Map<String, String> _param) {
logger.info(">>>>>>>>>>> xxl-job pushHandleQueue start, _param:{}", new Object[]{_param}); logger.info(">>>>>>>>>>> xxl-job pushHandleQueue start, _param:{}", new Object[]{_param});
// result // callback
String _status = HttpUtil.FAIL; RemoteCallBack callback = new RemoteCallBack();
String _msg = ""; callback.setStatus(RemoteCallBack.FAIL);
// push data to queue // push data to queue
String handler_name = _param.get(HandlerRepository.HANDLER_NAME); String handler_name = _param.get(HandlerRepository.HANDLER_NAME);
@ -48,22 +47,16 @@ public class HandlerRepository {
HandlerThread handlerThread = handlerTreadMap.get(handler_name); HandlerThread handlerThread = handlerTreadMap.get(handler_name);
if (handlerThread != null) { if (handlerThread != null) {
handlerThread.pushData(_param); handlerThread.pushData(_param);
_status = HttpUtil.SUCCESS; callback.setStatus(RemoteCallBack.SUCCESS);
} else { } else {
_msg = "handler not found."; callback.setMsg("handler[" + handler_name + "] not found.");
} }
}else{ }else{
_msg = "param[HANDLER_NAME] not exists."; callback.setMsg("param[HANDLER_NAME] can not be null.");
} }
logger.info(">>>>>>>>>>> xxl-job pushHandleQueue end, triggerData:{}", new Object[]{callback});
HashMap<String, String> triggerData = new HashMap<String, String>(); return JacksonUtil.writeValueAsString(callback);
triggerData.put(HandlerRepository.TRIGGER_LOG_ID, _param.get(HandlerRepository.TRIGGER_LOG_ID));
triggerData.put(HttpUtil.status, _status);
triggerData.put(HttpUtil.msg, _msg);
logger.info(">>>>>>>>>>> xxl-job pushHandleQueue end, triggerData:{}", new Object[]{triggerData});
return JacksonUtil.writeValueAsString(triggerData);
} }
} }

View File

@ -12,6 +12,7 @@ import org.slf4j.LoggerFactory;
import com.xxl.job.client.handler.IJobHandler.JobHandleStatus; import com.xxl.job.client.handler.IJobHandler.JobHandleStatus;
import com.xxl.job.client.util.HttpUtil; import com.xxl.job.client.util.HttpUtil;
import com.xxl.job.client.util.HttpUtil.RemoteCallBack;
/** /**
* handler thread * handler thread
@ -65,19 +66,19 @@ public class HandlerThread extends Thread{
} }
// callback handler info // callback handler info
String callback_response[] = null; RemoteCallBack callback = null;
try { try {
HashMap<String, String> params = new HashMap<String, String>(); HashMap<String, String> params = new HashMap<String, String>();
params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id); params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id);
params.put(HttpUtil.status, _status.name()); params.put("status", _status.name());
params.put(HttpUtil.msg, _msg); params.put("msg", _msg);
callback_response = HttpUtil.post(trigger_log_url, params); callback = HttpUtil.post(trigger_log_url, params);
} catch (Exception e) { } catch (Exception e) {
logger.info("HandlerThread Exception:", e); logger.info("HandlerThread Exception:", e);
} }
logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback_response:{}, thread:{}", logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback:{}, thread:{}",
new Object[]{handlerData, _status, _msg, callback_response, this}); new Object[]{handlerData, _status, _msg, callback, this});
} else { } else {
try { try {
TimeUnit.MILLISECONDS.sleep(i * 100); TimeUnit.MILLISECONDS.sleep(i * 100);

View File

@ -25,12 +25,28 @@ import org.apache.http.util.EntityUtils;
*/ */
public class HttpUtil { public class HttpUtil {
// response param /**
public static final String status = "status"; * http remote callback
public static final String msg = "msg"; */
// response status enum public static class RemoteCallBack{
public static final String SUCCESS = "SUCCESS"; public static final String SUCCESS = "SUCCESS";
public static final String FAIL = "FAIL"; public static final String FAIL = "FAIL";
private String status;
private String msg;
public void setStatus(String status) {
this.status = status;
}
public String getStatus() {
return status;
}
public void setMsg(String msg) {
this.msg = msg;
}
public String getMsg() {
return msg;
}
}
/** /**
* http post request * http post request
@ -38,9 +54,9 @@ public class HttpUtil {
* @param params * @param params
* @return [0]=responseMsg, [1]=exceptionMsg * @return [0]=responseMsg, [1]=exceptionMsg
*/ */
public static String[] post(String reqURL, Map<String, String> params){ public static RemoteCallBack post(String reqURL, Map<String, String> params){
String responseMsg = null; RemoteCallBack callback = new RemoteCallBack();
String exceptionMsg = null; callback.setStatus(RemoteCallBack.FAIL);
// do post // do post
HttpPost httpPost = null; HttpPost httpPost = null;
@ -60,18 +76,25 @@ public class HttpUtil {
HttpResponse response = httpClient.execute(httpPost); HttpResponse response = httpClient.execute(httpPost);
HttpEntity entity = response.getEntity(); HttpEntity entity = response.getEntity();
if (null != entity) { if (response.getStatusLine().getStatusCode() == 200) {
responseMsg = EntityUtils.toString(entity, "UTF-8"); if (null != entity) {
EntityUtils.consume(entity); String responseMsg = EntityUtils.toString(entity, "UTF-8");
} callback = JacksonUtil.readValue(responseMsg, RemoteCallBack.class);
if (response.getStatusLine().getStatusCode() != 200) { if (callback == null) {
exceptionMsg = "response.getStatusLine().getStatusCode() = " + response.getStatusLine().getStatusCode(); callback = new RemoteCallBack();
callback.setStatus(RemoteCallBack.FAIL);
callback.setMsg("responseMsg parse json fail, responseMsg:" + responseMsg);
}
EntityUtils.consume(entity);
}
} else {
callback.setMsg("http statusCode error, statusCode:" + response.getStatusLine().getStatusCode());
} }
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
StringWriter out = new StringWriter(); StringWriter out = new StringWriter();
e.printStackTrace(new PrintWriter(out)); e.printStackTrace(new PrintWriter(out));
exceptionMsg = out.toString(); callback.setMsg(out.toString());
} finally{ } finally{
if (httpPost!=null) { if (httpPost!=null) {
httpPost.releaseConnection(); httpPost.releaseConnection();
@ -85,9 +108,6 @@ public class HttpUtil {
} }
} }
String[] result = new String[2]; return callback;
result[0] = responseMsg;
result[1] = exceptionMsg;
return result;
} }
} }