diff --git a/xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBean.java b/xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBean.java new file mode 100644 index 00000000..62077260 --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBean.java @@ -0,0 +1,50 @@ +package com.xxl.job.service.job; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.quartz.DisallowConcurrentExecution; +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.quartz.QuartzJobBean; + +/** + * http job bean + * @author xuxueli 2015-12-17 18:20:34 + */ + +@DisallowConcurrentExecution // 串行;线程数要多配置几个,否则不生效; +public class LocalJobBean extends QuartzJobBean { + private static Logger logger = LoggerFactory.getLogger(LocalJobBean.class); + + @Override + protected void executeInternal(JobExecutionContext context) + throws JobExecutionException { + + String triggerKey = context.getTrigger().getKey().getName(); + String triggerGroup = context.getTrigger().getKey().getGroup(); + Map jobDataMap = context.getMergedJobDataMap().getWrappedMap(); + + // jobDataMap 2 params + Map params = new HashMap(); + if (jobDataMap!=null && jobDataMap.size()>0) { + for (Entry item : jobDataMap.entrySet()) { + params.put(item.getKey(), String.valueOf(item.getValue())); + } + } + + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + logger.info(">>>>>>>>>>> xxl-job run :jobId:{}, group:{}, jobDataMap:{}", + new Object[]{triggerKey, triggerGroup, jobDataMap}); + } + +} \ No newline at end of file diff --git a/xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBeanB.java b/xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBeanB.java new file mode 100644 index 00000000..5868e71d --- /dev/null +++ b/xxl-job-admin/src/main/java/com/xxl/job/service/job/LocalJobBeanB.java @@ -0,0 +1,48 @@ +package com.xxl.job.service.job; + +import java.util.HashMap; +import java.util.Map; +import java.util.Map.Entry; +import java.util.concurrent.TimeUnit; + +import org.quartz.JobExecutionContext; +import org.quartz.JobExecutionException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.scheduling.quartz.QuartzJobBean; + +/** + * http job bean + * @author xuxueli 2015-12-17 18:20:34 + */ + +public class LocalJobBeanB extends QuartzJobBean { + private static Logger logger = LoggerFactory.getLogger(LocalJobBeanB.class); + + @Override + protected void executeInternal(JobExecutionContext context) + throws JobExecutionException { + + String triggerKey = context.getTrigger().getKey().getName(); + String triggerGroup = context.getTrigger().getKey().getGroup(); + Map jobDataMap = context.getMergedJobDataMap().getWrappedMap(); + + // jobDataMap 2 params + Map params = new HashMap(); + if (jobDataMap!=null && jobDataMap.size()>0) { + for (Entry item : jobDataMap.entrySet()) { + params.put(item.getKey(), String.valueOf(item.getValue())); + } + } + + try { + TimeUnit.SECONDS.sleep(5); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + logger.info(">>>>>>>>>>> xxl-job run :jobId:{}, group:{}, jobDataMap:{}", + new Object[]{triggerKey, triggerGroup, jobDataMap}); + } + +} \ No newline at end of file diff --git a/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java new file mode 100644 index 00000000..b21713be --- /dev/null +++ b/xxl-job-client/src/main/java/com/xxl/job/client/handler/HandlerThread.java @@ -0,0 +1,94 @@ +package com.xxl.job.client.handler; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.xxl.job.client.handler.IJobHandler.JobHandleStatus; +import com.xxl.job.client.util.HttpUtil; + +/** + * handler thread + * @author xuxueli 2016-1-16 19:52:47 + */ +public class HandlerThread extends Thread{ + private static Logger logger = LoggerFactory.getLogger(HandlerThread.class); + + private IJobHandler handler; + private LinkedBlockingQueue> handlerDataQueue; + + public HandlerThread(IJobHandler handler) { + this.handler = handler; + handlerDataQueue = new LinkedBlockingQueue>(); + } + + public void pushData(Map param) { + handlerDataQueue.offer(param); + } + + int i = 1; + @Override + public void run() { + try { + i++; + Map handlerData = handlerDataQueue.poll(); + if (handlerData!=null) { + String trigger_log_url = handlerData.get(HandlerRepository.TRIGGER_LOG_URL); + String trigger_log_id = handlerData.get(HandlerRepository.TRIGGER_LOG_ID); + String handler_params = handlerData.get(HandlerRepository.HANDLER_PARAMS); + + // parse param + String[] handlerParams = null; + if (handler_params!=null && handler_params.trim().length()>0) { + handlerParams = handler_params.split(","); + } else { + handlerParams = new String[0]; + } + + // handle job + JobHandleStatus _status = JobHandleStatus.FAIL; + String _msg = null; + try { + _status = handler.handle(handlerParams); + } catch (Exception e) { + logger.info("HandlerThread Exception:", e); + StringWriter out = new StringWriter(); + e.printStackTrace(new PrintWriter(out)); + _msg = out.toString(); + } + + // callback handler info + String callback_response[] = null; + try { + + HashMap params = new HashMap(); + params.put(HandlerRepository.TRIGGER_LOG_ID, trigger_log_id); + params.put(HttpUtil.status, _status.name()); + params.put(HttpUtil.msg, _msg); + callback_response = HttpUtil.post(trigger_log_url, params); + } catch (Exception e) { + logger.info("HandlerThread Exception:", e); + } + logger.info("<<<<<<<<<<< xxl-job thread handle, handlerData:{}, callback_status:{}, callback_msg:{}, callback_response:{}, thread:{}", + new Object[]{handlerData, _status, _msg, callback_response, this}); + } else { + try { + TimeUnit.MILLISECONDS.sleep(i * 100); + } catch (InterruptedException e) { + e.printStackTrace(); + } + if (i>5) { + i= 0; + } + } + } catch (Exception e) { + logger.info("HandlerThread Exception:", e); + } + } +}