parts/jobs/runner.js

const cluster = require('cluster'),
    common = require('../../utils/common'),
    config = require('../../config'),
    log = common.log('core:runners'),

    ERROR_COOLDOWN = config.runners && config.runners.error_cooldown || 60000, // 1min to cooldown on error
    PERIODICITY = config.runners && config.runners.periodicity || 30000, // 1min between "I'm alive" updates
    MAX_LAST_SEEN = config.runners && config.runners.max_last_seen || 600000, // 10min for leader last seen date before considering it dead

    FORCE_UNLOCK = 60 * 60 * 1000, // ms to forcefully unlock runners
    UNLOCK_ATTEMPTS = 100, // how many times to try to unlock before terminating the process

    COLLECTION = 'runners';

let leader, // leader doc
    me,
    collection, // runners collection
    runners = [],
    advertized = false;

/**
 * Initial setup: insert this server document into db
 */
function setup() {
    if (!collection) {
        common.db.createCollection(COLLECTION, (err) => {
            if (err) {
                log.d('collection exists');
                collection = common.db.collection(COLLECTION);
                collection.deleteMany({ls: {$lt: Date.now() - 10 * MAX_LAST_SEEN}}, e => {
                    if (e) {
                        collection = undefined;
                        log.e(e);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                    else {
                        log.d('deleted stale records');
                        setImmediate(periodic);
                    }
                });
            }
            else {
                collection = common.db.collection(COLLECTION);
                setImmediate(periodic);
            }
        });
    }
    else if (!me) {
        let doc = {_id: config.runners && config.runners.id || common.db.ObjectID().toString(), ls: Date.now()};
        collection.updateOne({_id: doc._id}, {$set: doc}, {upsert: true}, err => {
            if (err) {
                log.e(err);
                setTimeout(periodic, ERROR_COOLDOWN);
            }
            else {
                me = doc;
                log.d('inserted me', me);
                setImmediate(periodic);
            }
        });
    }
    else {
        setImmediate(periodic);
    }
}

/**
 * Leader discovery & taking leadership
 */
function discoverLeader() {
    if (!leader) {
        log.d('looking for a leader');
        collection.findOne({_id: 'leader', ls: {$gt: Date.now() - MAX_LAST_SEEN}}, (err, doc) => {
            if (err) {
                leader = undefined;
                log.e(err);
                setTimeout(periodic, ERROR_COOLDOWN);
            }
            else if (doc && doc.ls < Date.now() - MAX_LAST_SEEN) {
                log.d('leader is stale', doc);
                collection.findOneAndUpdate({_id: 'leader', ls: doc.ls}, {$set: {runner: me._id, ls: me.ls}, $unset: {lock: 1}}, {returnDocument: 'after'}, (e, ok) => {
                    if (e) {
                        leader = undefined;
                        log.e(e);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                    else if (ok && ok.ok) {
                        if (ok.value) {
                            leader = ok.value;
                            log.d('took leadership from a stale one', leader);
                            setImmediate(periodic);
                        }
                        else {
                            log.d('failed to become a leader');
                            leader = undefined;
                            setImmediate(discoverLeader);
                        }
                    }
                    else {
                        leader = undefined;
                        log.e('not ok from findOneAndUpdate', ok);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                });
            }
            else if (doc) {
                leader = doc;
                log.d('leader is', leader);
                setImmediate(periodic);
            }
            else if (!config.runners || !config.runners.leader || me._id === config.runners.leader) {
                log.d('no leader, becoming one');
                collection.findOneAndUpdate({_id: 'leader'}, {$set: {runner: me._id, ls: me.ls}, $unset: {lock: 1}}, {upsert: true, returnDocument: 'after'}, (e, ok) => {
                    if (e) {
                        leader = undefined;
                        log.e(e);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                    else if (ok && ok.ok) {
                        if (ok.value) {
                            leader = ok.value;
                            log.d('became a leader', leader);
                            setImmediate(periodic);
                        }
                        else {
                            log.d('failed to become a leader');
                            leader = undefined;
                            setImmediate(discoverLeader);
                        }
                    }
                    else {
                        leader = undefined;
                        log.e('not ok from findOneAndUpdate', ok);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                });
            }
            else {
                setTimeout(periodic, PERIODICITY);
            }
        });
    }
    else if (leader.runner !== me._id && leader.ls < Date.now() - MAX_LAST_SEEN && (!config.runners || !config.runners.leader || me._id === config.runners.leader)) {
        log.d('haven\'t seen a leader for %d, taking leadership', MAX_LAST_SEEN);
        collection.findOneAndUpdate({_id: 'leader', runner: leader._id, ls: leader.ls}, {$set: {runner: me._id, ls: me.ls}, $unset: {lock: 1}}, {returnDocument: 'after'}, (e, ok) => {
            if (e) {
                leader = undefined;
                log.e(e);
                setTimeout(periodic, ERROR_COOLDOWN);
            }
            else if (ok && ok.ok) {
                if (ok.value) {
                    leader = ok.value;
                    log.d('took leadership', leader);
                    setImmediate(periodic);
                }
                else {
                    log.d('failed to become a leader');
                    leader = undefined;
                    setImmediate(discoverLeader);
                }
            }
            else {
                leader = undefined;
                log.e('not ok from findOneAndUpdate', ok);
                setTimeout(periodic, ERROR_COOLDOWN);
            }
        });
    }
    else {
        setTimeout(periodic, PERIODICITY);
    }
}

/**
 * Update last seen
 * 
 * @param {function} callback to call when done
 */
function imAlive(callback) {
    log.d('updating ls');
    collection.findOneAndUpdate({_id: me._id}, {$set: {ls: Date.now()}}, {returnDocument: 'after'}, (err, doc) => {
        if (err) {
            leader = undefined;
            log.e(err);
            setTimeout(periodic, ERROR_COOLDOWN);
        }
        else if (doc && doc.ok) {
            me = doc.value;
            log.d('updated ls');
            if (me && leader.runner === me._id) {
                log.d('updating leader ls');
                let update = {ls: me.ls},
                    locking = !leader.lock;
                if (locking) {
                    update.lock = Date.now();
                    log.d('locking leader');
                }
                else if (leader.lock < Date.now() - FORCE_UNLOCK) {
                    update.lock = Date.now();
                    locking = true;
                }
                collection.findOneAndUpdate({_id: 'leader', runner: me._id, ls: leader.ls}, {$set: update}, {returnDocument: 'after'}, (e, ok) => {
                    if (e) {
                        leader = undefined;
                        log.e(e);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                    else if (ok && ok.ok) {
                        if (ok.value) {
                            leader = ok.value;
                            if (locking) {
                                log.d('updated leader ls (%j), calling runners', leader);
                                callback((clb) => {
                                    if (!leader) {
                                        return clb(false);
                                    }
                                    collection.findOneAndUpdate({_id: 'leader', runner: me._id, ls: leader.ls}, {$unset: {lock: 1}}, {returnDocument: 'after'}, (error, ret) => {
                                        if (error) {
                                            clb(false);
                                        }
                                        else if (ret.ok && ret.value) {
                                            leader = ret.value;
                                            log.d('leader unlocked', leader);
                                            clb(true);
                                        }
                                        else {
                                            clb(false);
                                        }
                                    });
                                });
                                setTimeout(periodic, PERIODICITY);
                            }
                            else {
                                log.d('updated leader ls (%j), runners are locked for %dms', leader, Date.now() - leader.lock);
                                setTimeout(periodic, PERIODICITY);
                            }
                        }
                        else {
                            log.e('someone updated leader', leader);
                            leader = undefined;
                            setImmediate(discoverLeader);
                        }
                    }
                    else {
                        leader = undefined;
                        log.e('not ok from imalive leader findOneAndUpdate', ok);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                });
            }
            else {
                collection.findOne({_id: 'leader', ls: {$gt: Date.now() - MAX_LAST_SEEN}}, (e, ld) => {
                    if (e) {
                        leader = undefined;
                        log.e(e);
                        setTimeout(periodic, ERROR_COOLDOWN);
                    }
                    else {
                        leader = ld;
                        setTimeout(periodic, PERIODICITY);
                    }
                });
            }
        }
        else {
            leader = undefined;
            log.e('not ok from imalive findOneAndUpdate', doc);
            setTimeout(periodic, ERROR_COOLDOWN);
        }
    });
}

/**
 * Periodic function to be run each second on each server
 */
function periodic() {
    if (!advertized) {
        advertized = true;
        common.plugins.dispatch('/master/runners', module.exports);
    }
    if (!me) {
        setup();
    }
    else if (!leader) {
        discoverLeader();
    }
    else {
        imAlive((done) => {
            Promise.all(runners.map(runner => {
                try {
                    return runner().catch(e => {
                        log.e('Rejection in runner', e);
                    });
                }
                catch (e) {
                    log.e('Error in runner', e);
                }
            })).catch(e => {
                log.e('Error in runners', e);
            }).then(() => {
                /**
                 * Try to unlock runners, exponentially backing off for 20 attempts and terminating if it fails 20 times in a row
                 * 
                 * @param {integer} attempt number of attempt
                 */
                let trydone = (attempt) => {
                    done(ok => {
                        if (ok) {
                            log.d('Unlocked runners', ok);
                        }
                        else if (attempt < UNLOCK_ATTEMPTS) {
                            setTimeout(trydone.bind(this, attempt + 1), attempt * 1000);
                        }
                        else {
                            log.e(`Failed to unlock runners for ${UNLOCK_ATTEMPTS}-th time, terminating`);
                            process.exit(1);
                        }
                    });
                };
                trydone(0);
            });
        });
    }
}

if (cluster.isMaster) {
    setTimeout(periodic, PERIODICITY);
}
else {
    console.err('Runners can only work in master process');
}

/**
 * Simple lib which allows leader election within a cluster of servers.
 * Designed to process push queue on one server in multi server environment.
 * 
 * Works on MongoDB's findOneAndUpdate on `runners` collection:
 * - On server start assigns a random id to this server.
 * - Deletes stale docs from `runners`.
 * - Then checks `runners` collection for a doc with _id = 'leader'. If a leader with last seen < MAX_LAST_SEEN is found, then the leader is responsible for running the functions.
 * - If there's no such leader, tries to take leadership by modifying that doc with findOneAndUpdate and then runs the functions.
 * - In case a leader dies, waits for MAX_LAST_SEEN and picks new leader automagically.
 * 
 * Apart from automagical mode, one can assign configuration in api/config.js on each server:
 * {
 *   ...
 *   runners: {
 *     id: 'unique server id (optional)',
 *     leader: 'id of the leader'
 *   }
 *   ...
 * }
 *
 * runners collection:
 * {
 *   _id: 'leader'
 *   ls: Date.now(), // last seen date
 *   runner: ObjectID.toString() // id of the doc of last or current leader
 * }
 * {
 *   _id: ObjectID.toString()
 *   ls: Date.now(), // last seen date
 * }
 */
module.exports = {
    /**
     * Add runner function.
     * 
     * IMPORTANT: A runner function must not throw of fail. It WILL be called multiple times at once, ensure you use some locking mechanism to prevent that.
     * 
     * @param {function} runner runner function to add
     * @returns {integer} number of runners registered in the process
     */
    push: runner => {
        runners.push(runner);
        return runners.length;
    },
    /**
     * Remove runner function.
     * 
     * @param {function} runner runner function to remove
     * @returns {boolean|undefined} true if removed, undefined if not found
     */
    pop: runner => {
        let idx = runners.indexOf(runner);
        if (idx !== -1) {
            runners.splice(idx, 1);
            return true;
        }
    },
};



// module.exports.push(() => {
//     if (Math.random() < .33) {
//         console.log('throwing outside');
//         throw new Error('outside');
//     }
//     else if (Math.random() < .66) {
//         console.log('rejecting');
//         return new Promise((res, rej) => setTimeout(() => rej(new Error('wat')), 10000 * Math.random()));
//     }
//     else {
//         console.log('resolving');
//         return new Promise(res => setTimeout(res, 10000 * Math.random()));
//     }
// });