diff --git a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java index ae385f3f..3114d933 100644 --- a/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java +++ b/xxl-job-core/src/main/java/com/xxl/job/core/thread/JobThread.java @@ -91,86 +91,94 @@ public class JobThread extends Thread{ @Override public void run() { - while(!toStop){ - running = false; - idleTimes++; - try { - // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) - TriggerParam triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); - if (triggerParam!=null) { - running = true; - idleTimes = 0; - triggerLogIdSet.remove(triggerParam.getLogId()); - - // parse param - String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) - ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(",")).toArray()) : null; - - // handle job - ReturnT executeResult = null; - try { - // log filename: yyyy-MM-dd/9999.log - String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); - XxlJobFileAppender.contextHolder.set(logFileName); - ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); - XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); + while(!toStop){ + running = false; + idleTimes++; + // handle job + ReturnT executeResult = null; + TriggerParam triggerParam = null; + try { + // to check toStop signal, we need cycle, so wo cannot use queue.take(), instand of poll(timeout) + triggerParam = triggerQueue.poll(3L, TimeUnit.SECONDS); + if (triggerParam!=null) { + running = true; + idleTimes = 0; + triggerLogIdSet.remove(triggerParam.getLogId()); - executeResult = handler.execute(handlerParams); - if (executeResult == null) { - executeResult = ReturnT.FAIL; - } + // parse param + String[] handlerParams = (triggerParam.getExecutorParams()!=null && triggerParam.getExecutorParams().trim().length()>0) + ? (String[])(Arrays.asList(triggerParam.getExecutorParams().split(" ")).toArray()) : null; - XxlJobLogger.log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + executeResult); - } catch (Exception e) { - if (toStop) { - XxlJobLogger.log("
----------- JobThread toStop, stopReason:" + stopReason); - } - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); + try { + // log filename: yyyy-MM-dd/9999.log + String logFileName = XxlJobFileAppender.makeLogFileName(new Date(triggerParam.getLogDateTim()), triggerParam.getLogId()); - XxlJobLogger.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); - } - - // callback handler info - if (!toStop) { - // commonm - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult)); - } else { - // is killed - ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); - } - } else { - if (idleTimes > 30) { - XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); - } - } - } catch (Throwable e) { - if (toStop) { - XxlJobLogger.log("
----------- xxl-job toStop, stopReason:" + stopReason); - } + XxlJobFileAppender.contextHolder.set(logFileName); + ShardingUtil.setShardingVo(new ShardingUtil.ShardingVO(triggerParam.getBroadcastIndex(), triggerParam.getBroadcastTotal())); + XxlJobLogger.log("
----------- xxl-job job execute start -----------
----------- Params:" + Arrays.toString(handlerParams)); - StringWriter stringWriter = new StringWriter(); - e.printStackTrace(new PrintWriter(stringWriter)); - String errorMsg = stringWriter.toString(); - XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg); - } - } - - // callback trigger request in queue - while(triggerQueue !=null && triggerQueue.size()>0){ - TriggerParam triggerParam = triggerQueue.poll(); - if (triggerParam!=null) { - // is killed - ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); - TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); - } - } - - logger.info(">>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); - } + executeResult = handler.execute(handlerParams); + if (executeResult == null) { + executeResult = ReturnT.FAIL; + } + + XxlJobLogger.log("
----------- xxl-job job execute end(finish) -----------
----------- ReturnT:" + executeResult); + } catch (Exception e) { + if (toStop) { + XxlJobLogger.log("
----------- JobThread toStop, stopReason:" + stopReason); + } + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); + + XxlJobLogger.log("
----------- JobThread Exception:" + errorMsg + "
----------- xxl-job job execute end(error) -----------"); + } + + } else { + if (idleTimes > 30) { + XxlJobExecutor.removeJobThread(jobId, "excutor idel times over limit."); + } + } + } catch (Throwable e) { + if (toStop) { + XxlJobLogger.log("
----------- xxl-job toStop, stopReason:" + stopReason); + } + + StringWriter stringWriter = new StringWriter(); + e.printStackTrace(new PrintWriter(stringWriter)); + String errorMsg = stringWriter.toString(); + executeResult = new ReturnT(ReturnT.FAIL_CODE, errorMsg); + + XxlJobLogger.log("----------- xxl-job JobThread Exception:" + errorMsg); + } finally { + if(triggerParam != null) { + // callback handler info + if (!toStop) { + // commonm + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), executeResult)); + } else { + // is killed + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [业务运行中,被强制终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); + } + } + } + } + + // callback trigger request in queue + while(triggerQueue !=null && triggerQueue.size()>0){ + TriggerParam triggerParam = triggerQueue.poll(); + if (triggerParam!=null) { + // is killed + ReturnT stopResult = new ReturnT(ReturnT.FAIL_CODE, stopReason + " [任务尚未执行,在调度队列中被终止]"); + TriggerCallbackThread.pushCallBack(new HandleCallbackParam(triggerParam.getLogId(), stopResult)); + } + } + + logger.info(">>>>>>>>>>>> xxl-job JobThread stoped, hashCode:{}", Thread.currentThread()); + } }