parts/jobs/handle.js

'use strict';

const logger = require('../../utils/log.js'),
    log = logger('jobs:handle'),
    ipc = require('./ipc.js'),
    job = require('./job.js'),
    scan = require('./scanner.js'),
    manager = require('../../../plugins/pluginManager.js'),
    common = require('../../utils/common.js');

const TRANSIENT_JOB_TIMEOUT = 300000;

/** handle running jobs **/
class Handle {
    /** constructor **/
    constructor() {
        log.i('Starting job handle in %d', process.pid);
        var self = this;
        manager.singleDefaultConnection().then((db) => {
            manager.reloadEnabledPluginList(db, function() {
                self.db = db;
                self.classes = {}; // {'job name': Constructor}
                self.files = {}; // {'ping': '/usr/local/countly/api/jobs/ping.js'}
                scan(self.db, self.files, self.classes);
            });
        });
    }

    /**
    * create job instance
    * @param {string} name - job name
    * @param {object} data - data about job
    * @returns {Job} job
    **/
    job(name, data) {
        let Constructor = this.classes[name];
        if (Constructor) {
            return new Constructor(name, data);
        }
        else {
            throw new Error('Couldn\'t find job file named ' + name);
        }
    }

    /**
    * Cancel job if any exists
    * 
    * @param {string} name - job name
    * @param {object} data - data about job
    * @returns {Job} job
    **/
    cancel(name, data) {
        return this.db.collection('jobs').updateMany({name, data, status: {$in: [job.STATUS.SCHEDULED, job.STATUS.WAITING, job.STATUS.PAUSED]}}, {$set: {status: job.STATUS.CANCELLED, error: 'Cancelled by user'}});
    }

    /**
    * run transient job
    * @param {string} name - job name
    * @param {object} data - data about job
    * @returns {Promise} promise
    **/
    runTransient(name, data) {
        data._id = data.id = '' + (data._id || this.db.ObjectID());

        let Constructor = this.classes[name];
        if (Constructor) {
            return new Promise((resolve, reject) => {

                var timeout = setTimeout(() => {
                        if (channel !== null) {
                            channel.remove();
                            channel = null;
                            reject('Transient timeout');
                        }
                    }, TRANSIENT_JOB_TIMEOUT),

                    j = new Constructor(name, data),

                    channel = new ipc.IdChannel(job.EVT.TRANSIENT_CHANNEL).attach(process).on(job.EVT.TRANSIENT_DONE, (json) => {
                        log.d('[%d]: Got transient job response %j', process.pid, j._json, json);
                        if (json._id === data._id) {
                            if (channel === null) {
                                return;
                            }
                            else {
                                channel.remove();
                                channel = null;
                                clearTimeout(timeout);
                                if (json.error) {
                                    reject(json);
                                }
                                else {
                                    resolve(json.result);
                                }
                            }
                        }
                    });

                j._json._id = data._id;

                log.d('[%d]: Sending transient job %j', process.pid, j._json);
                channel.send(job.EVT.TRANSIENT_RUN, j._json);
            });

        }
        else {
            throw new Error('Couldn\'t find job file named ' + name);
        }
    }

    /**
    * get ipc
    * @returns {ipc} ipc
    **/
    get ipc() {
        return ipc;
    }


    /**
     * Suspend a job from UI
     * @param {object} params - params object with job id
     * @returns {object} - result of the operation
     **/
    async suspendJob(params) {
        try {
            let jobStatus = JSON.parse(params.qstring.suspend) ? job.STATUS.SUSPENDED : job.STATUS.SCHEDULED;
            let currentStatus = JSON.parse(params.qstring.suspend) ? job.STATUS.SCHEDULED : job.STATUS.SUSPENDED;

            const result = await common.db.collection('jobs').findOneAndUpdate(
                {
                    _id: common.db.ObjectID(params.qstring.id),
                    status: currentStatus
                },
                {$set: {status: jobStatus}},
                {upsert: false}
            );

            if (result.value) {
                common.returnOutput(params, {result: true, message: jobStatus ? "Job suspended successfully" : "Job scheduled successfully"});
            }
            else {
                log.e("Updating job status failed. Job id: " + params.qstring.id, " Job Status: " + params.qstring.suspend);
                common.returnOutput(params, {result: false, message: "Updating job status failed, please check api logs"});
            }
        }
        catch (err) {
            log.e("Error while suspending job", err);
            common.returnOutput(params, {result: false, message: "Updating job status failed. Please check API logs"});
        }
    }
}

if (!Handle.instance) {
    Handle.instance = new Handle();
}

module.exports = Handle.instance;