关于`node-schedule`使用的一些请教

首先项目是通过node-schedule包来实现定时任务功能的,如果想要通过数据库动态管理定时任务(添加任务、删除任务、变更任务执行时间),有什么好的思路和方法么?求各位大佬指导。

附上现在使用的代码:

// 定义任务列表
function startJob(code) {
    switch (code) {
        case '001':
            // 任务执行文件
            break;
        default:
            break;
    }
}

// 每5分钟检查定时任务设置
schedule.scheduleJob('0 */5 * * * *', function () {

    TimingTaskSwitchModel.find(
        {
            state: 'on',
            status: 'normal'
        },
        function (e, models) {

            if (e || !models) {
                originalJobs.forEach(function (j) {
                    try {
                        schedule.cancelJob(j);
                    } catch (error) {
                        console.log(moment().format('YYYY-MM-DD HH:mm:ss.SSS') + ' 取消任务:', j, error);
                    }
                });
            }

            openedJobs = [];
            allJobs = [];

            models.forEach(function (taskSwitch) {

                time = null;
                taskSwitch = taskSwitch.toObject();

                if (taskSwitch['rule'] && typeof taskSwitch['rule'] === 'object') {
                    time = new schedule.RecurrenceRule();
                    (((taskSwitch['rule'] || {})['second'] || [])[0] || ((taskSwitch['rule'] || {})['second'] || [])[0] === 0) && (time['second'] = (taskSwitch['rule'] || {})['second']);
                    (((taskSwitch['rule'] || {})['minute'] || [])[0] || ((taskSwitch['rule'] || {})['minute'] || [])[0] === 0) && (time['minute'] = (taskSwitch['rule'] || {})['minute']);
                    (((taskSwitch['rule'] || {})['hour'] || [])[0] || ((taskSwitch['rule'] || {})['hour'] || [])[0] === 0) && (time['hour'] = (taskSwitch['rule'] || {})['hour']);
                    (((taskSwitch['rule'] || {})['dayOfWeek'] || [])[0] || ((taskSwitch['rule'] || {})['dayOfWeek'] || [])[0] === 0) && (time['dayOfWeek'] = (taskSwitch['rule'] || {})['dayOfWeek']);
                } else if (!taskSwitch['rule'] && taskSwitch['date']) {
                    time = new Date(taskSwitch['date']);
                }

                if (time) {
                    openedJobs.push({
                        code: taskSwitch['code'],
                        time: time
                    });
                    allJobs.push(taskSwitch['code']);
                }
            });

            executionJobs = openedJobs;

            originalJobs.forEach(function (originalJob) {
                if (allJobs.indexOf(originalJob) < 0) {
                    console.log(moment().format('YYYY-MM-DD HH:mm:ss.SSS') + ' 取消任务:' + originalJob);
                    if (schedule.scheduledJobs[originalJob]) {
                        schedule.cancelJob(originalJob);
                    }
                    delete jobTimes[originalJob];
                }
            });

            originalJobs = allJobs;

        }
    );

});

// 设置定时任务(每5分执行一次)
schedule.scheduleJob('0 */5 * * * *', function () {

    executionJobs.forEach(function (executionJob) {

        if (String.trim(JSON.stringify(jobTimes[executionJob['code']])) !== String.trim(JSON.stringify(executionJob['time']))) {

            console.log(moment().format('YYYY-MM-DD HH:mm:ss.SSS') + ' 任务变更:' + executionJob['code']);

            jobTimes[executionJob['code']] = executionJob['time'];

            if (schedule.scheduledJobs[executionJob['code']]) {
                schedule.cancelJob(executionJob['code']);
            }

            schedule.scheduleJob(executionJob['code'], jobTimes[executionJob['code']], function () {
                startJob(executionJob['code']);
            });

        }

    });

});
阅读 3.7k
1 个回答

我写了个电影cms是这样的,pm2守护电影系统的进程。单独写了一个cron.js,用于执行定时任务,使用另一个进程守护工具forever守护cron.js。动态的开启,关闭定时任务。app.js和cron.js通讯,使用http服务。
app.js监听的是9999端口,cron.js监听的是8888端口。app.js后台管理系统创建一个定时任务的时候,通过request或者axios发生一个post请求。那边cron.js收到http请求,创建删除的时候通过参数识别,比如创建createCron,删除removeCron。然后带上要输入的时间参数( )。就是这样实现的,只不过cron.js这样没有负载,是单线程。想高并发就得自己写一个并发模型。

首先解决的就是:nodejs定时任务通过pm2负载之后的一个问题就是,你无法关闭的定时任务,比如双核心四线程,那么pm2会启动四个nodejs,你发送的请求创建的时候可能分到A线程里面执行。但是第二次关闭的时候pm2可能会给你分发到B线程,但是B线程里面根本就没有执行定时任务,所以你无法关闭。

所以想要能找到这个任务,并且关闭,就必须单独出来。

// cron.js
require('shelljs/global');
const Koa = require('koa');
const path = require('path');
const bodyParser = require('koa-bodyparser');
const schedule = require('node-schedule');

const app = new Koa();
let queue = {};

app.use(bodyParser());

app.use(async (ctx, next) => {

ctx.set('Content-Type', 'application/json');

let { type } = ctx.request.body;

try{
    // 创建任务
    if(type === 'createCron'){

        (() => {

            let { time, name, script, clientTime, serverTime } = ctx.request.body;

            let initTime = time;
            let id = new Date().getTime() + '';
            let posTime = time.split(' ');
            let posHour = posTime[2];
            // 计算时区差异 + 计算差值
            if(posHour !== '*'){
                let clientHour = new Date(clientTime).getHours();
                let serverHour = new Date(serverTime).getHours();
                // 说明服务器和用户不在一个时区
                let maxHour = clientHour > serverHour ? clientHour : serverHour;
                let minHour = clientHour < serverHour ? clientHour : serverHour;
                // max - min = gap
                let gapHour = maxHour - minHour;
                let resultHour = clientHour > serverHour ? (clientHour - gapHour) : (clientHour + gapHour);
                posTime[2] = resultHour;
                time = posTime.join(' ');
            }


            let task = schedule.scheduleJob(time, () => {
                console.log('触发计划任务');
                let scriptPath = path.resolve(__dirname, `./script/${script}/app.js`);
                exec(`node ${scriptPath}`, {async: true}, (code, stdout, stderr) => {
                    console.log('单次任务执行完成,子进程退出状态码 ' + code);
                });
            });
            queue[id] = {
                id: id,
                el: task,
                name,
                time: initTime,
                script,
            }

        })();

    }else if(type === 'removeCron'){

        let { id } = ctx.request.body;
        queue[id].el.cancel();
        Reflect.deleteProperty(queue, id);

    }else if(type === 'getAllCrons'){
        let cronArr = [];
        for(let attr in queue){
            cronArr.push({
                id: queue[attr].id,
                name: queue[attr].name,
                time: queue[attr].time,
                script: queue[attr].script
            })
        }
        ctx.body = {
            code: 200,
            text: '任务列表获取成功',
            value: cronArr
        }
        return
    }
    ctx.body = {
        code: 200,
        text: '操作成功!'
    }
}catch(err){
    ctx.body = {
        code: 500,
        text: '发生错误!'
    }
}
})

app.listen(8899, () => {
    console.log('cron app启动,端口 8899');
})


// app.js 中的和cron.js通讯的部分
// 创建任务
let createCron = async (ctx, next) => {

await authToken(ctx, next, async () => {

    let { time, name, script, state, clientTime } = ctx.request.body;
    let serverTime = new Date().getTime()
    let res = await axios({
        method: 'POST',
        headers: {
            "Content-Type": "application/json"
        },
        url: 'http://127.0.0.1:8899/',
        data: JSON.stringify({ time, name, script, state, clientTime, serverTime, type: 'createCron' })
    });

    let promise = res && res.data && res.data.code === 200 ? Promise.resolve() : Promise.reject();
    await setResponse(ctx, promise, {success: '操作成功!', error: '操作失败!'});

}, {admin: true}, {insRole: true, childrenKey: 'createCron', parentKey: 'cron'})

}


image.png
image.png

撰写回答
你尚未登录,登录后可以
  • 和开发者交流问题的细节
  • 关注并接收问题和回答的更新提醒
  • 参与内容的编辑和改进,让解决方法与时俱进
推荐问题