uRick's PKM uRick's PKM
首页
导航
  • Java
  • 数据库
书单
  • 优质好文
  • 有趣的工具
  • 分类
  • 标签
  • 归档
关于
首页
导航
  • Java
  • 数据库
书单
  • 优质好文
  • 有趣的工具
  • 分类
  • 标签
  • 归档
关于
  • Mybatis基础实战
  • Mybatis的运作机制探索
  • 深入探索XXL-Job
    • 1. 业务执行器
    • 2. 任务调度中心
      • 2.1. 任务调度流程
      • 2.2. 时间轮算法
      • 2.3. 路由策略
      • 2.3.1. 一致性Hash策略
      • 2.3.2. 最近不经常使用策略
      • 2.3.3. 最近最少使用策略
    • 3. 总结
    • 4. 参考
  • 深入Nacos
  • freemwork
uRick
2021-07-25
目录

深入探索XXL-Job

随着业务不断拓展,系统对任务调度需求也逐渐增多,而当前任务调度都分散在各个子模块、子系统,对任务调度维护成本过高,也不利于系统资源管理,为了提高研发能效;调研市面开源软件,通过调研发现 XXL-JOB 比较符合业务需求,而且该分布式任务调度系统在市面非常流行,也比较成熟,而且学习成本低、开箱即用,易扩展;本文主要深入源码,探索 XXL-JOB 的运行机制,以便于更好的运用到项目中。

flowchart

XXL-JOB 采用执行器、任务调度器分离架构,执行器主要负责处理业务,一般都集成在具体业务系统中;任务调度器负责维护任务,处理任务调度,通过RPC触发执行器。

主要特点

  • 调度中心基于DB实现,部署简单,成本低;
  • 支持动态修改状态,即时生效;
  • 丰富的任务触发策略,包括:Cron、固定间隔、固定延时、API等触发;
  • 支持任务超时主动中断任务,任务失败重试,发送警告信息;
  • 支持任务动态分片、故障转移;
  • 提供友好的可视化监控、日志查询等功能。

# 1. 业务执行器

由于任务执行与任务调度分离,在业务开发中,接入 XXL-JOB 分离解耦非常简单,只需引入核心组件依赖,实现 IJobHandler 即可;当然在核心包中已经提供几种实现,现在仅需 @XxlJob 定义任务即可。

@XxlJob(value = "demoJobHandler", init = "init", destroy = "destroy")
public void demoJobHandler2() throws Exception {
    XxlJobHelper.log("XXL-JOB, Hello World.");
}
public void init(){
    logger.info("init");
}
public void destroy(){
    logger.info("destroy");
}
1
2
3
4
5
6
7
8
9
10

在任务执行端,实现非常简单。在系统工程启动后,通过配置信息初始化 XxlJobExecutor ,接着在内部统一配置服务,执行一系列初始化服务流程。主要核心流程包括启动日志线程、添加调度中心列表、启动化执行回调线程、启动化回调重试线程、启动执行器 Server 服务。

应用启动后,执行自动向任务调度中心注册服务,对于执行器服务的续约由单独的线程定时处理(30s/次);启动 Server 其实就是一个随机端口(默认9999)的 Netty 服务,然后把监听的端口、IP 发送给调度中心。同过阅读源码发现,任务调度中心是提供 HTTP 的 RESTful API 来完成消息通讯的,因此也可以自定义任务执行器,整个任务触发核心流程如下:

Executor-JobHander-Flow

在服务启动时创建许多维护线程,管理和处理执行器相关资源,如下表归纳相关线程功能与作用。

线程类 线程名称 功能
JobLogFileCleanThread xxl-job, executor JobLogFileCleanThread 定时清理日志,当 logRetentionDays > 3时启动线程,且一天清理一次
TriggerCallbackThread(回调) xxl-job, executor TriggerCallbackThread 处理任务执行回调信息,内部维护了一个 LinkedBlockingQueued 队列,当任务执行结束后,
会将执行结果通过回调线程通知调度中心
TriggerCallbackThread(回调重试) 未命名 为了保证回调可靠性,避免回调调度中心时发生网络异常等情况无法回调成功的一种补偿措施,每30s检测一次是否有回调失败的数据
EmbedServer#Thread 未命名 执行器 Netty 监听服务,监听调度中心下大的命令
ExecutorRegistryThread xxl-job, executor ExecutorRegistryThread 执行器注册线程,每 30s 执行一次注册,当执行器下线时,向调度中心发起下线操作
JobThread 未命名 每个具体执行任务线程封装,内部维护了一个触发队列,用于处理任务执行,相同的任务只会有一个 JobThread
new Thread(futureTask) 未命名 调度任务 IJobHandler#execute 异步化线程服务,当任务触发参数 executorTimeout>0 时,提供任务执行超时处理策略

业务执行器端实现相对比较简单,启动几个维护线程处理相关资源,并开放服务监听端口,用于接收调度中心发起的指令;其业务处理核心在 com.xxl.job.core.biz.impl.ExecutorBizImpl 中实现,它实现 ExecutorBiz 接口,内部定义 5 个方法。

public interface ExecutorBiz {
    //心跳检测
    public ReturnT<String> beat();
    //任务存活检测,检测任务是否运行中
    public ReturnT<String> idleBeat(IdleBeatParam idleBeatParam);
    //任务调度核心:根据任务类型,触发相关任务
    public ReturnT<String> run(TriggerParam triggerParam);
    //移除任务,通过interrupt方法中断线程
    public ReturnT<String> kill(KillParam killParam);
    //获取任务日志
    public ReturnT<LogResult> log(LogParam logParam);
}
1
2
3
4
5
6
7
8
9
10
11
12

接着看一下任务监听执行流程,其实无论什么类型的任务都会构造一个 JobThread,每一个任务都会有一个 JobThread,且内部维护一个触发队列,按序触发任务;

//ExecutorBizImpl#run
public ReturnT<String> run(TriggerParam triggerParam) {
    // 加载已有的任务线程JobThread
    JobThread jobThread = XxlJobExecutor.loadJobThread(triggerParam.getJobId());
    IJobHandler jobHandler = jobThread!=null?jobThread.getHandler():null;
    String removeOldReason = null;
    // 获取任务类型
    GlueTypeEnum glueTypeEnum = GlueTypeEnum.match(triggerParam.getGlueType());
    if (GlueTypeEnum.BEAN == glueTypeEnum) {
        // 获取注册的业务Handler
        IJobHandler newJobHandler = XxlJobExecutor.loadJobHandler(triggerParam.getExecutorHandler());
        // 验证Handler是否有变动
        if (jobThread!=null && jobHandler != newJobHandler) {
            // change handler, need kill old thread
            removeOldReason = "change jobhandler or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }
        // valid handler
        if (jobHandler == null) {
            jobHandler = newJobHandler;
            if (jobHandler == null) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "job handler [" + triggerParam.getExecutorHandler() + "] not found.");
            }
        }

    } else if (GlueTypeEnum.GLUE_GROOVY == glueTypeEnum) {
        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof GlueJobHandler
                    && ((GlueJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change handler or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }
        // valid handler
        if (jobHandler == null) {
            try {
                IJobHandler originJobHandler = GlueFactory.getInstance().loadNewInstance(triggerParam.getGlueSource());
                jobHandler = new GlueJobHandler(originJobHandler, triggerParam.getGlueUpdatetime());
            } catch (Exception e) {
                logger.error(e.getMessage(), e);
                return new ReturnT<String>(ReturnT.FAIL_CODE, e.getMessage());
            }
        }
    } else if (glueTypeEnum!=null && glueTypeEnum.isScript()) {
        // valid old jobThread
        if (jobThread != null &&
                !(jobThread.getHandler() instanceof ScriptJobHandler
                        && ((ScriptJobHandler) jobThread.getHandler()).getGlueUpdatetime()==triggerParam.getGlueUpdatetime() )) {
            // change script or gluesource updated, need kill old thread
            removeOldReason = "change job source or glue type, and terminate the old job thread.";
            jobThread = null;
            jobHandler = null;
        }
        // valid handler
        if (jobHandler == null) {
            jobHandler = new ScriptJobHandler(triggerParam.getJobId(), triggerParam.getGlueUpdatetime(), triggerParam.getGlueSource(), GlueTypeEnum.match(triggerParam.getGlueType()));
        }
    } else {
        return new ReturnT<String>(ReturnT.FAIL_CODE, "glueType[" + triggerParam.getGlueType() + "] is not valid.");
    }
    // executor block strategy
    if (jobThread != null) {
        ExecutorBlockStrategyEnum blockStrategy = ExecutorBlockStrategyEnum.match(triggerParam.getExecutorBlockStrategy(), null);
        if (ExecutorBlockStrategyEnum.DISCARD_LATER == blockStrategy) {
            // discard when running
            if (jobThread.isRunningOrHasQueue()) {
                return new ReturnT<String>(ReturnT.FAIL_CODE, "block strategy effect:"+ExecutorBlockStrategyEnum.DISCARD_LATER.getTitle());
            }
        } else if (ExecutorBlockStrategyEnum.COVER_EARLY == blockStrategy) {
            // kill running jobThread
            if (jobThread.isRunningOrHasQueue()) {
                removeOldReason = "block strategy effect:" + ExecutorBlockStrategyEnum.COVER_EARLY.getTitle();
                jobThread = null;
            }
        } else {
            // just queue trigger
        }
    }
    // 若jobThread 不存在则创建,并启动线程
    if (jobThread == null) {
        jobThread = XxlJobExecutor.registJobThread(triggerParam.getJobId(), jobHandler, removeOldReason);
    }
    // 将调度命令,推送到触发队列;任务执行中,不允许推送到队列,避免重复执行
    ReturnT<String> pushResult = jobThread.pushTriggerQueue(triggerParam);
    return pushResult;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89

通过走查上述源码发现,在 JobThread 内定义 JobHandler 类型的成员变量,与具体执行的业务 Handler 绑定,当 JobThread 线程运行时,触发 Handler 执行;XXL-Job 已经内置了 3 种 Handler 实现。

JobHandler

  • MethodJobHandler: 通过 @XxJob 注解定义的方法级别 Handler,可在应用启动时自动扫描注册;
  • GlueJobHandler: 通过在线编辑的 Java 源码处理 Handler,若是通过 Spring 集成,则支持 @Resource、@Autowired、@Qualifier 注入属性;
  • ScriptJobHandler: 支持Shell、Python、PHP、Nodejs、PowerShell脚本类型 Handler 实现。

除了上述内置 Handler 实现,还可以根据需求实现接口 IJobHanler 自定义任务类型,并注册到执行器中即可。

业务处理也不复杂,任务触发调度是由处理器调度的,通过获取 CPU 执行时间片来运行,当一个 JobThread 创建后,从参数队列 TriggerQueue 获取触发参数并执行任务,当队列没有触发参数时,线程重试 30 次依然没有数据,则自动中断线程(节约CPU资源),任务执行核心流程如下:

jobHandler-flow

# 2. 任务调度中心

任务调度中心实现稍微复杂一些,主要完成任务监控、任务动态调度管理、任务调度、分布式部署能力等;经过对业务执行器端源码走查,深入理解内部机制之后,很容器看出调度中心也有调度监听服务以及提供的 RESTful API 接口。

调度中心负责任务调度的核心逻辑都在 XxlJobScheduler 类中维护管理,在初始化时触发相关调度维护线程,维护任务核心调度。

public class XxlJobScheduler  {
    public void init() throws Exception {
        // admin trigger pool start,任务触发线程池初始化
        JobTriggerPoolHelper.toStart();
        // admin registry monitor run,监控执行器注册,维护执行器列表上下线处理
        JobRegistryHelper.getInstance().start();
        // admin fail-monitor run,监控任务失败,发起任务警告通知(默认邮件实现)
        JobFailMonitorHelper.getInstance().start();
        // admin lose-monitor run ( depend on JobTriggerPoolHelper )
        // 任务执行状态丢失,结果接收回调处理线程池
        JobCompleteHelper.getInstance().start();
        // admin log report start,自动生成调度日志报表(xxl_job_log_report)或自动清理日志(需要配置)
        JobLogReportHelper.getInstance().start();
        // start-schedule  ( depend on JobTriggerPoolHelper )
        //开启调度线程
        JobScheduleHelper.getInstance().start();
    }
    //依次关闭线程服务资源
    public void destroy() throws Exception {
        // stop-schedule
        JobScheduleHelper.getInstance().toStop();
        // admin log report stop
        JobLogReportHelper.getInstance().toStop();
        // admin lose-monitor stop
        JobCompleteHelper.getInstance().toStop();
        // admin fail-monitor stop
        JobFailMonitorHelper.getInstance().toStop();
        // admin registry stop
        JobRegistryHelper.getInstance().toStop();
        // admin trigger pool stop
        JobTriggerPoolHelper.toStop();
    }
    // 执行器对应API调用端服务
    private static ConcurrentMap<String, ExecutorBiz> executorBizRepository = new ConcurrentHashMap<String, ExecutorBiz>();
    public static ExecutorBiz getExecutorBiz(String address) throws Exception {
        if (address==null || address.trim().length()==0) { return null;}  // valid
        // load-cache
        address = address.trim();
        ExecutorBiz executorBiz = executorBizRepository.get(address);
        if (executorBiz != null) {return executorBiz;}
        // set-cache
        executorBiz = new ExecutorBizClient(address, XxlJobAdminConfig.getAdminConfig().getAccessToken());
        executorBizRepository.put(address, executorBiz);
        return executorBiz;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46

在调度中心服务启动时创建许多维护线程,管理和处理调度任务相关资源,如下表归纳线程的功能与作用。

线程类 线程命名 功能
JobTriggerPoolHelper#fastTriggerPool xxl-job, admin JobTriggerPoolHelper-fastTriggerPool- 执行触发响应快的任务调度
JobTriggerPoolHelper#slowTriggerPool xxl-job, admin JobTriggerPoolHelper-slowTriggerPool- 执行触发响应慢的任务调度,触发任务响应大于 500ms 且超时次数大于 10 的就会放到慢线程池中运行
JobRegistryHelper#registryOrRemoveThreadPool xxl-job, admin JobRegistryMonitorHelper-registryOrRemoveThreadPool- 接收执行器调用接口做注册、注销异步操作
JobRegistryHelper#registryMonitorThread xxl-job, admin JobRegistryMonitorHelper-registryMonitorThread 监控执行器注册表,维护注册的执行器状态,不可用时及时下线
JobFailMonitorHelper#monitorThread xxl-job, admin JobFailMonitorHelper 监控任务失败,发起任务警告通知(默认邮件实现)和失败重试触发
JobCompleteHelper#callbackThreadPool xxl-job, admin JobLosedMonitorHelper-callbackThreadPool- 接收执行器回调执行结果参数处理
JobCompleteHelper#monitorThread xxl-job, admin JobLosedMonitorHelper 任务结果丢失处理:调度记录停留在 "运行中" 状态超过 10min,且对应执行器心跳注册失败不在线,标记任务调度失败
JobLogReportHelper#logrThread xxl-job, admin JobLogReportHelper 自动生成调度日志报表(xxl_job_log_report)或自动清理日志(需要配置)
JobScheduleHelper#scheduleThread xxl-job, admin JobScheduleHelper#scheduleThread 计算触发时间,触发任务调度处理
JobScheduleHelper#ringThread xxl-job, admin JobScheduleHelper#ringThread 任务调度时间轮缓存区任务线程,用于通过 schedule_lock 获取5s内的调度任务,计算达到触发时间的任务,放入时间轮缓冲区

# 2.1. 任务调度流程

调度中心的主要责任监控管理任务之外,还负责任务核心调度;除了一些维护管理,任务调度处理核心都在 XxlJobScheduler 工具类,与执行器端一样的逻辑,初始化启动一系列维护线程池,大量采用异步处理任务调度资源,提高数据处理效率。参与执行器端交互和任务触发逻辑是在 JobTriggerPoolHelper 工具类,定义了快慢响应处理 2 种线程池,在任务触发时,1 分钟内任务触发 10 次超时,为了避免触发任务太多导致过度拥挤,由慢响应触发线程池来调度触发;在添加触发任务时,根据超时记录自动切换快慢线程池。

因为调度中心支持集群部署,在获取任务信息时,通过数据库显示排他锁来获取需要调度的任务列表,多个集群节点不能同时进行调度处理。还需要注意的是,任务调度受系统时钟影响的,所以一定要保证系统时间是准确的。在获取待触发任务列表时,根据配置的快慢线程池最大线程数量从数据库提取;在获取的任务信息中,不满足触发条件的任务都会推送到时间轮集合中,然后单独由的线程计算触发任务,关于任务调度中心任务触发核心流程如下:

xxl-job-schedule

  1. JobScheduleHelper

接下来分析 JobScheduleHelper 源码,了解内部具体的处理细节。

public class JobScheduleHelper {
    private static Logger logger = LoggerFactory.getLogger(JobScheduleHelper.class);
    private static JobScheduleHelper instance = new JobScheduleHelper();
    public static JobScheduleHelper getInstance(){return instance;}
    public static final long PRE_READ_MS = 5000;    // pre read
    private Thread scheduleThread;
    private Thread ringThread;
    private volatile boolean scheduleThreadToStop = false;
    private volatile boolean ringThreadToStop = false;
    private volatile static Map<Integer, List<Integer>> ringData = new ConcurrentHashMap<>();

    public void start(){
        // schedule thread
        scheduleThread = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    TimeUnit.MILLISECONDS.sleep(5000 - System.currentTimeMillis()%1000 );
                } catch (InterruptedException e) {
                    if (!scheduleThreadToStop) {
                        logger.error(e.getMessage(), e);
                    }
                }
                // pre-read count: treadpool-size * trigger-qps (each trigger cost 50ms, qps = 1000/50 = 20)
                int preReadCount = (XxlJobAdminConfig.getAdminConfig().getTriggerPoolFastMax() + XxlJobAdminConfig.getAdminConfig().getTriggerPoolSlowMax()) * 20;
                while (!scheduleThreadToStop) {
                    // Scan Job
                    long start = System.currentTimeMillis();
                    Connection conn = null;
                    Boolean connAutoCommit = null;
                    PreparedStatement preparedStatement = null;
                    boolean preReadSuc = true;
                    try {
                        conn = XxlJobAdminConfig.getAdminConfig().getDataSource().getConnection();
                        connAutoCommit = conn.getAutoCommit();
                        conn.setAutoCommit(false);
                        //因为调度端也支持,基于数据库集群部署,在获取任务时,通过排他锁获取任务调度
                        preparedStatement = conn.prepareStatement(  "select * from xxl_job_lock where lock_name = 'schedule_lock' for update" );
                        preparedStatement.execute();

                        // tx start

                        // 1、pre read
                        long nowTime = System.currentTimeMillis();
                        // 获取trigger_next_time< now+ 5000ms内的任务
                        List<XxlJobInfo> scheduleList = XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleJobQuery(nowTime + PRE_READ_MS, preReadCount);
                        if (scheduleList!=null && scheduleList.size()>0) {
                            // 2、push time-ring
                            for (XxlJobInfo jobInfo: scheduleList) {
                                // time-ring jump,getTriggerNextTime+5000,还是小于nowTime说明错过触发
                                if (nowTime > jobInfo.getTriggerNextTime() + PRE_READ_MS) {//这里是错过触发了
                                    // 根据错过触发的策略进行触发,当前支持忽略和立即执行一次策略
                                    MisfireStrategyEnum misfireStrategyEnum = MisfireStrategyEnum.match(jobInfo.getMisfireStrategy(), MisfireStrategyEnum.DO_NOTHING);
                                    if (MisfireStrategyEnum.FIRE_ONCE_NOW == misfireStrategyEnum) {
                                        JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.MISFIRE, -1, null, null, null);
                                    }
                                     //刷新下一次执行时间
                                    refreshNextValidTime(jobInfo, new Date());
                                } 
                                //达到触发时间,5s以内的任务,直接发起触发
                                else if (nowTime > jobInfo.getTriggerNextTime()) {
                                    JobTriggerPoolHelper.trigger(jobInfo.getId(), TriggerTypeEnum.CRON, -1, null, null, null);
                                     //刷新下一次执行时间
                                    refreshNextValidTime(jobInfo, new Date());
                                    
                                    //下一次触发时间,在5s内,则推送到时间轮中
                                    if (jobInfo.getTriggerStatus()==1 && nowTime + PRE_READ_MS > jobInfo.getTriggerNextTime()) {
                                        // 1、make ring second
                                        int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                        // 2、push time ring
                                        pushTimeRing(ringSecond, jobInfo.getId());
                                        //刷新下一次执行时间
                                        refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                                    }
                                } else {//放入时间轮中,由相关线程处理,nowTime< triggerTime < triggerTime+5s
                                    int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
                                    pushTimeRing(ringSecond, jobInfo.getId());
                                    //刷新下一次执行时间
                                    refreshNextValidTime(jobInfo, new Date(jobInfo.getTriggerNextTime()));
                                }
                            }
                            // 3、update trigger info
                            for (XxlJobInfo jobInfo: scheduleList) {
                                XxlJobAdminConfig.getAdminConfig().getXxlJobInfoDao().scheduleUpdate(jobInfo);
                            }
                        } else {
                            preReadSuc = false;
                        }
                        // tx stop
                    } catch (Exception e) {
                        if (!scheduleThreadToStop) {logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread error:{}", e);}
                    } finally {
                        // commit
                        if (conn != null) {
                            try {conn.commit();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}
                            try {conn.setAutoCommit(connAutoCommit);} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}
                            try {conn.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}
                            }
                        }
                        // close PreparedStatement
                        if (null != preparedStatement) {
                            try {preparedStatement.close();} catch (SQLException e) {if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}}
                        }
                    }
                    long cost = System.currentTimeMillis()-start;
                    // Wait seconds, align second
                    if (cost < 1000) {  // scan-overtime, not wait,没有拿到数据,就多睡一下(最近工作比较闲,就能多开店小差,哈哈)
                        try {
                            // pre-read period: success > scan each second; fail > skip this period;
                            TimeUnit.MILLISECONDS.sleep((preReadSuc?1000:PRE_READ_MS) - System.currentTimeMillis()%1000);
                        } catch (InterruptedException e) {
                            if (!scheduleThreadToStop) {logger.error(e.getMessage(), e);}
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#scheduleThread stop");
            }
        });
        scheduleThread.setDaemon(true);
        scheduleThread.setName("xxl-job, admin JobScheduleHelper#scheduleThread");
        scheduleThread.start();
        // ring thread
        ringThread = new Thread(new Runnable() {
            @Override
            public void run() {
                while (!ringThreadToStop) {
                    // 根据系统时钟来决定sleep时间
                    try {
                        TimeUnit.MILLISECONDS.sleep(1000 - System.currentTimeMillis() % 1000);
                    } catch (InterruptedException e) {
                        if (!ringThreadToStop) {
                            logger.error(e.getMessage(), e);
                        }
                    }
                    try {
                        // second data
                        List<Integer> ringItemData = new ArrayList<>();
                        int nowSecond = Calendar.getInstance().get(Calendar.SECOND);
                        for (int i = 0; i < 2; i++) {
                           // 避免处理耗时太长,跨过刻度,向前校验一个刻度;
                            List<Integer> tmpData = ringData.remove( (nowSecond+60-i)%60 );
                            if (tmpData != null) {
                                ringItemData.addAll(tmpData);
                            }
                        }
                        // 当前刻钟存在可调度任务,则推入触发器中
                        if (ringItemData.size() > 0) {
                            for (int jobId: ringItemData) {
                                JobTriggerPoolHelper.trigger(jobId, TriggerTypeEnum.CRON, -1, null, null, null);
                            }
                            ringItemData.clear();
                        }
                    } catch (Exception e) {
                        if (!ringThreadToStop) {
                            logger.error(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread error:{}", e);
                        }
                    }
                }
                logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper#ringThread stop");
            }
        });
        ringThread.setDaemon(true);
        ringThread.setName("xxl-job, admin JobScheduleHelper#ringThread");
        ringThread.start();
    }
    private void refreshNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
        Date nextValidTime = generateNextValidTime(jobInfo, fromTime);
        if (nextValidTime != null) {
            jobInfo.setTriggerLastTime(jobInfo.getTriggerNextTime());
            jobInfo.setTriggerNextTime(nextValidTime.getTime());
        } else {
            jobInfo.setTriggerStatus(0);
            jobInfo.setTriggerLastTime(0);
            jobInfo.setTriggerNextTime(0);
            logger.warn(">>>>>>>>>>> xxl-job, refreshNextValidTime fail for job: jobId={}, scheduleType={}, scheduleConf={}",jobInfo.getId(), jobInfo.getScheduleType(), jobInfo.getScheduleConf());
        }
    }

    private void pushTimeRing(int ringSecond, int jobId){
        // push async ring
        List<Integer> ringItemData = ringData.get(ringSecond);
        if (ringItemData == null) {
            ringItemData = new ArrayList<Integer>();
            ringData.put(ringSecond, ringItemData);
        }
        ringItemData.add(jobId);
        logger.debug(">>>>>>>>>>> xxl-job, schedule push time-ring : " + ringSecond + " = " + Arrays.asList(ringItemData) );
    }

    public void toStop(){
        // 1、stop schedule
        scheduleThreadToStop = true;
        try {
            TimeUnit.SECONDS.sleep(1);  // wait
        } catch (InterruptedException e) {
            logger.error(e.getMessage(), e);
        }
        if (scheduleThread.getState() != Thread.State.TERMINATED){
            // interrupt and wait,通过new Thread 常规处理策略
            scheduleThread.interrupt();//设置中断
            try {
                scheduleThread.join();//等待线程中断
            } catch (InterruptedException e) {
                logger.error(e.getMessage(), e);
            }
        }
        // if has ring data
        boolean hasRingData = false;
        if (!ringData.isEmpty()) {
            for (int second : ringData.keySet()) {
                List<Integer> tmpData = ringData.get(second);
                if (tmpData!=null && tmpData.size()>0) {
                    hasRingData = true;
                    break;
                }
            }
        }
        //缓冲环中还有数据,在等一会,等它处理完//缓冲环中还有数据,在等一会,等它处理完
        if (hasRingData) {
            try {TimeUnit.SECONDS.sleep(8);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}
        }
        // stop ring (wait job-in-memory stop)
        ringThreadToStop = true;
        try {TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {logger.error(e.getMessage(), e);}
        if (ringThread.getState() != Thread.State.TERMINATED){
            // interrupt and wait
            ringThread.interrupt();
            try { ringThread.join(); } catch (InterruptedException e) {logger.error(e.getMessage(), e);}
        }
        logger.info(">>>>>>>>>>> xxl-job, JobScheduleHelper stop");
    }
    // ---------------------- tools ----------------------
    public static Date generateNextValidTime(XxlJobInfo jobInfo, Date fromTime) throws Exception {
        ScheduleTypeEnum scheduleTypeEnum = ScheduleTypeEnum.match(jobInfo.getScheduleType(), null);
        if (ScheduleTypeEnum.CRON == scheduleTypeEnum) {//cron触发方式,计算下一次触发时间
            Date nextValidTime = new CronExpression(jobInfo.getScheduleConf()).getNextValidTimeAfter(fromTime);
            return nextValidTime;
        } else if (ScheduleTypeEnum.FIX_RATE == scheduleTypeEnum /*|| ScheduleTypeEnum.FIX_DELAY == scheduleTypeEnum*/) {
            return new Date(fromTime.getTime() + Integer.valueOf(jobInfo.getScheduleConf())*1000 );
        }
        return null;
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243

JobScheduleHelper 负责计算触发时间,发起任务触发,具体的触发策略,触发逻辑由 XxlJobTrigger#trigger 实现的,trigger 其实就是负责把任务信息拿到,然后根据路由策略获取到不同执行器集群机器,调用远程执行器的过程,逻辑并不复杂。

  1. XxlJobTrigger

XxlJobTrigger 内部封装了任务触发的细节,计算分片参数,拼装任务触发参数,然后根据路由策略算法从执行器集群中选出触发的服务地址,最后触发接口调用请求,处理流程如下。

trigger-flow

# 2.2. 时间轮算法

对于时间轮任务调度策略应用比较广泛,在许多有定时任务的场景会看到它的身影,它主要解决大量任务调度过程中做到秒级触发能力,一般需要调度任务量非常多,为了任务作业触发效率的一种策略,避免任务堆积,无法精准调度。

一般时间轮利用循环数组来实现,实现一个 60 个容量的数组,每个容量表示 1 秒,比如一个循环数组长度 60,可以表示 60 秒;把调度任务按照时间取摸 triggerTime % 60 放入循环数组,然后通过调度器从环形数组中取出来触发。

还有一些实现比较复杂的,多层时间轮的实现机制,如 60 秒以后执行的任务怎么放进去? 只要模以 60,用得到的余数,放到对应的格子就 OK 了。比如 90 % 60 = 30,它放在第30 个格子。这里就有了轮次的概念,第 90 秒的任务是第 2 轮的时候才执行。我们可以进一步对这个时间轮做改造,构造一个多层的时间轮。如:最内层 60 个格子,每个格子 1秒;外层 60 个格子,每个格子代表 1 分;再外层 24 个格子,每个格子代表 1 小时。最内层走一圈,外层走一格。其实本质就是一个钟表规则,随着时间流动,任务会降级,外层的任务会慢慢地向内层移动。时间轮任务插入和删除时间复杂度都为 0(1),它是一种高效的批量调度模型。而且应用范围非常广泛,比如Dubbo、Netty、 Kafka 都有时间轮的实现。

在xxl-job中时间轮比较简单,只有一个时间层,采用 ConcurrentHashMap 存储任务集合,集合中存放一个List的 JobId 集合;任务需要推入时间轮中时,通过 jobInfo.getTriggerNextTime()/1000)%60 计算 Map 的 key,

//int ringSecond = (int)((jobInfo.getTriggerNextTime()/1000)%60);
//推入集合
private void pushTimeRing(int ringSecond, int jobId){
    // push async ring
    List<Integer> ringItemData = ringData.get(ringSecond);
    if (ringItemData == null) {
        ringItemData = new ArrayList<Integer>();
        ringData.put(ringSecond, ringItemData);
    }
    ringItemData.add(jobId);
}
1
2
3
4
5
6
7
8
9
10
11
//从时间轮中获取任务
List<Integer> ringItemData = new ArrayList<>();
// 避免处理耗时太长,跨过刻度,向前校验一个刻度;
// 这里取两秒内的任务,来触发
int nowSecond = Calendar.getInstance().get(Calendar.SECOND);//获取当前时刻的秒数
for (int i = 0; i < 2; i++) {//取两次,根据当前秒数刻度和前一个刻度进行时间轮的任务获取
//(nowSecond+ 60-k)%60 也就是当前秒数和前一秒。比如当前秒数是 40,就获取时间轮 40 和 39 的任务。
//从 ringData 里面拿出来,添加到 ringltemData,这里面存的是这 2 秒内需要触发的所有任务的 jobld.
    List<Integer> tmpData = ringData.remove((nowSecond+60-i)%60);
    if (tmpData != null) {
        ringItemData.addAll(tmpData);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13

# 2.3. 路由策略

当执行器为了高可用,采用集群部署时,调度中心为了避免在不同的机器触发相同任务,需要一种处理机制来规避这种问题;在 XXL-Job 通过路由策略来选择对应的执行器机器(分配广播除外),任务调度故障转移、忙碌转移都是通过路由策略算法实现的,当前内置支持如下 9 种实现。

Router-Strategy

路由算法 描述
ExecutorRouteFirst 选择第一个策略,实现简单,只需要取列表中第一个即可
ExecutorRouteLast 选择最后一个策略,实现简单,只需要取列表中最后一个即可
ExecutorRouteRound 轮询策略,内部通过 Map 结合缓存任务轮询索引,使用 index/size 取摸获取服务;
采用缓存 24 小时惰性重置轮询索引, 第一次或索引大于100万时,随机生成
ExecutorRouteRandom 随机策略,根据执行器服务集群数量随机获取服务
ExecutorRouteLFU 最近不经常使用策略,使用频率比较低的机器
ExecutorRouteLRU 最近最久未使用策略,基于LinkedHashMap实现
ExecutorRouteBusyover 忙碌转移策略,调用执行器忙碌检测接口,排除忙碌服务发起调用
ExecutorRouteFailover 故障转移策略,调用执行器心跳检测接口,找出正常的服务调用
ExecutorRouteConsistentHash 一致性 Hash 策略,采用虚拟节点保证调度均匀分布到不同的机器

下面简单介绍一致性 Hash 、 LFU、LRU 算法的实现,在工作是场景的算法实现,遇到特定场景时可以使用,实现也不复杂。

# 2.3.1. 一致性Hash策略

先构造一个长度为 2^32 的整数环(这个环被称为一致性Hash环),并计算节点地址的 Hash 值(分布在 [0, 2^32-1]),将服务器节点放置在这个 Hash 环上;然后计算 jobId 的 Hash 值,接着在 Hash 环上顺时针查找距离这个 Key 值的 Hash 值最近的服务器节点,完成 Key 到服务器的映射查找,关于 Hash 环介绍详解:Redis:一致性哈希算法原理 (opens new window)

public class ExecutorRouteConsistentHash extends ExecutorRouter {
    private static int VIRTUAL_NODE_NUM = 100;
    /**
     * get hash code on 2^32 ring (md5散列的方式计算hash值)
     * @param key
     * @return
     */
    private static long hash(String key) {
        // md5 byte
        MessageDigest md5;
        try {
            md5 = MessageDigest.getInstance("MD5");
        } catch (NoSuchAlgorithmException e) {
            throw new RuntimeException("MD5 not supported", e);
        }
        md5.reset();
        byte[] keyBytes = null;
        try {
            keyBytes = key.getBytes("UTF-8");
        } catch (UnsupportedEncodingException e) {
            throw new RuntimeException("Unknown string :" + key, e);
        }
        md5.update(keyBytes);
        byte[] digest = md5.digest();
        // hash code, Truncate to 32-bits
        // 清除高 32 位值
        long hashCode = ((long) (digest[3] & 0xFF) << 24)| ((long) (digest[2] & 0xFF) << 16)| ((long) (digest[1] & 0xFF) << 8)| (digest[0] & 0xFF);
        long truncateHashCode = hashCode & 0xffffffffL;
        return truncateHashCode;
    }
    
    public String hashJob(int jobId, List<String> addressList) {
        //根据虚拟节点数量计算节点  
        TreeMap<Long, String> addressRing = new TreeMap<Long, String>();
        for (String address: addressList) {
            for (int i = 0; i < VIRTUAL_NODE_NUM; i++) {
                long addressHash = hash("SHARD-" + address + "-NODE-" + i);
                addressRing.put(addressHash, address);
            }
        }
        long jobHash = hash(String.valueOf(jobId));
        //顺时针查找,大于jobHash的数据,然后取出最近的一个服务器
        SortedMap<Long, String> lastRing = addressRing.tailMap(jobHash);
        if (!lastRing.isEmpty()) {
            return lastRing.get(lastRing.firstKey());
        }
        return addressRing.firstEntry().getValue();
    }
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = hashJob(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54

# 2.3.2. 最近不经常使用策略

Least Frequently Used,选择使用频率比较低的机器。

public class ExecutorRouteLFU extends ExecutorRouter {
    private static ConcurrentMap<Integer, HashMap<String, Integer>> jobLfuMap = new ConcurrentHashMap<Integer, HashMap<String, Integer>>();
    private static long CACHE_VALID_TIME = 0;
    public String route(int jobId, List<String> addressList) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLfuMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;//24小时惰性过期
        }
        // lfu item init
        // Key排序可以用TreeMap+构造入参Compare;Value排序暂时只能通过ArrayList;
        HashMap<String, Integer> lfuItemMap = jobLfuMap.get(jobId);
        if (lfuItemMap == null) {
            lfuItemMap = new HashMap<String, Integer>();
            jobLfuMap.putIfAbsent(jobId, lfuItemMap);   // 避免重复覆盖
        }

        // put new
        for (String address: addressList) {
            // 初始化时主动Random一次,缓解首次压力
            if (!lfuItemMap.containsKey(address) || lfuItemMap.get(address) > 1_000_000 ) {
                lfuItemMap.put(address, new Random().nextInt(addressList.size())); 
            }
        }
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey: lfuItemMap.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lfuItemMap.remove(delKey);
            }
        }
        // 排序
        List<Map.Entry<String, Integer>> lfuItemList = new ArrayList<Map.Entry<String, Integer>>(lfuItemMap.entrySet());
        Collections.sort(lfuItemList, new Comparator<Map.Entry<String, Integer>>() {
            @Override
            public int compare(Map.Entry<String, Integer> o1, Map.Entry<String, Integer> o2) {
                return o1.getValue().compareTo(o2.getValue());
            }
        });
        //获取最少使用的服务返回,并+1标记
        Map.Entry<String, Integer> addressItem = lfuItemList.get(0);
        String minAddress = addressItem.getKey();
        addressItem.setValue(addressItem.getValue() + 1);
        return addressItem.getKey();
    }
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56

# 2.3.3. 最近最少使用策略

least recently used,基于 LinkedHashMap 集合特性实现非常简单。

    private static ConcurrentMap<Integer, LinkedHashMap<String, String>> jobLRUMap = new ConcurrentHashMap<Integer, LinkedHashMap<String, String>>();
    private static long CACHE_VALID_TIME = 0;

    public String route(int jobId, List<String> addressList) {
        // cache clear
        if (System.currentTimeMillis() > CACHE_VALID_TIME) {
            jobLRUMap.clear();
            CACHE_VALID_TIME = System.currentTimeMillis() + 1000*60*60*24;
        }
        // init lru
        LinkedHashMap<String, String> lruItem = jobLRUMap.get(jobId);
        if (lruItem == null) {
            /**
             * LinkedHashMap
             *      a、accessOrder:true=访问顺序排序(get/put时排序);false=插入顺序排序;
             *      b、removeEldestEntry:新增元素时将会调用,返回true时会删除最老元素;可封装LinkedHashMap并重写该方法,比如定义最大容量,超出是返回true即可实现固定长度的LRU算法;
             */
            lruItem = new LinkedHashMap<String, String>(16, 0.75f, true);
            jobLRUMap.putIfAbsent(jobId, lruItem);
        }
        // put new
        for (String address: addressList) {
            if (!lruItem.containsKey(address)) {
                lruItem.put(address, address);
            }
        }
        // remove old
        List<String> delKeys = new ArrayList<>();
        for (String existKey: lruItem.keySet()) {
            if (!addressList.contains(existKey)) {
                delKeys.add(existKey);
            }
        }
        if (delKeys.size() > 0) {
            for (String delKey: delKeys) {
                lruItem.remove(delKey);
            }
        }
        // load,获取第一个值,最近没有被使用的,以为LinkedHashMap,accessOrder:true,每次访问集合数据,就会被放到链表最后
        String eldestKey = lruItem.entrySet().iterator().next().getKey();
        String eldestValue = lruItem.get(eldestKey);
        return eldestValue;
    }
    @Override
    public ReturnT<String> route(TriggerParam triggerParam, List<String> addressList) {
        String address = route(triggerParam.getJobId(), addressList);
        return new ReturnT<String>(address);
    }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48

# 3. 总结

XXL-Job 分布式任务调度是基于关系型数据库实现,调度中心与业务执行器都支持集群部署,对于系统任务资源处理运用了大量的线程和线程池技术,通过阅读源码随处可见,整个处理逻辑相对简单,也并不复杂;在日常开发中也可以使用线程技术异步化处理,提高数据处理效率,但需要保证线程安全。

调度中心为了支持集群部署,合理分配任务调度,使用数据库排他锁解决分布式任务调度,同时保证任务在秒级触发,通过时间轮算法计算任务触发时间;执行器集群部署时,通过一系列路由策略算法实现任务高可用,只要引入分布式服务,都需要考虑 HA,在业务实现中值得学习借鉴。

由于任务调度是基于关系数据库实现,当任务量大、服务机器较多时,数据库可能成为一个性能瓶颈。

Task-Execute-Flow

# 4. 参考

  1. 任务调度:时间轮算法经典案例解析及应用实现 (opens new window)
  2. 知乎:时间轮详解 (opens new window)
  3. xxl-job:官方手册 (opens new window)
#任务调度
上次更新: 2024/03/02, 14:21:03
Mybatis的运作机制探索
深入Nacos

← Mybatis的运作机制探索 深入Nacos→

最近更新
01
从0到1:开启商业与未来的秘密
11-26
02
如何阅读一本书: 读懂一本书,精于一件事
10-25
03
深入理解Lambda
06-27
更多文章>
Theme by Vdoing | Copyright © 2019-2024 uRick | CC BY 4.0
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式