parts/jobs/job.js

'use strict';

/* jshint ignore:start */

const EventEmitter = require('events'),
    later = require('later'),
    ipc = require('./ipc.js'),
    log = require('../../utils/log.js')('jobs:job'),
    retry = require('./retry.js'),
    ObjectID = require('mongodb').ObjectID;

const STATUS = {
        SCHEDULED: 0,
        RUNNING: 1,
        DONE: 2,
        CANCELLED: 3,
        ABORTED: 4,
        PAUSED: 5,
        WAITING: 6
    },
    STATUS_MAP = {
        0: "SCHEDULED",
        1: "RUNNING",
        2: "DONE",
        3: "CANCELLED",
        4: "ABORTED",
        5: "PAUSED",
        6: "WAITING"
    },
    ERROR = {
        CRASH: 'crash',
        TIMEOUT: 'timeout'
    },
    EVT = {
        UPDATE: 'job:update',
        DONE: 'job:done',
        TRANSIENT_CHANNEL: 'jobs:transient',
        TRANSIENT_RUN: 'jobs:transient:run',
        TRANSIENT_DONE: 'jobs:transient:done'
    };

/**
 * Debounce function which decreases number of calls to func to be once in minWait ... maxWait.
 * @param {function} func - function to debounce
 * @param {number} minWait - minimal waiting time
 * @param {number} maxWait - maximal waiting time
 * @returns {function} debounced function
 */
const debounce = function(func, minWait, maxWait) {
    var timeout, first, args, context,
        delater = function() {
            func.apply(context, args);
            timeout = first = args = context = null;
        };
    return function() {
        context = this;
        args = arguments;
        if (!first) {
            first = Date.now();
        }

        clearTimeout(timeout);

        if (maxWait < (Date.now() - first)) {
            delater();
        }
        else {
            timeout = setTimeout(delater, Math.min(minWait, maxWait - (Date.now() - first)));
        }
    };
};

/**
 * Job superclass for all jobs. To add new job type you need to:
 * 1. Define job class which has to be single export from a node.js module. This module can be either in api/jobs folder (API job), 
 *    or in a plugins/plugin/api/jobs folder (plugin job). Job name is assigned automatically to have the form of "[API or plugin name]:[job file name]".
 * 2. Schedule a job by running any of the following (replace 'api:clear' with your Job name from point 1 above):
 *    - require('api/parts/jobs').job('api:clear', {some: 'data'}).now() - to run the job ASAP.
 *    - require('api/parts/jobs').job('api:clear', {some: 'data'}).in(5) - to run the job in 5 seconds from now.
 *    - require('api/parts/jobs').job('api:clear', {some: 'data'}).once(new Date() or Date.now()) - to run the job on specified date.
 *    - require('api/parts/jobs').job('api:clear', {some: 'data'}).schedule("once in 2 hours") - to run the job on specified schedule, see https://bunkat.github.io/later/parsers.html#text for examples.
 *    NOTE! For most jobs, scheduling must be done either from master process, or from plugins.register("/master", ...) to eliminate multiple replacing on app start.
 * 3. Optionally set allowed concurrency (maximum running jobs of this type at any point in time) of the job by overriding getConcurrency method in your job class.
 * 4. Optionally allow the job to be divided by overriding divide in your job class. Resulted subjobs (or subs) will be run as separte jobs in parallel to improve performance.
 *
 * There are 3 useful Job subclasses already implemented:
 * ResourcefulJob is used when job requires some persistent resource to run on. Read - network connection, memory cache, etc.
 * IPCJob which extends ResourcefulJob is used when resource needs to live in a separate from Countly master process. Example - push which runs a 
 * TransientJob which extends IPCJob is used when job shouldn't be saved in jobs collection with all status updates being run through IPC.
 */
class Job extends EventEmitter {
    /**
    * Create job instance
    * @param {string} name - name of the job
    * @param {object} data - data about the job
    **/
    constructor(name, data) {
        super();
        if (typeof name === 'object') {
            this._json = name;
            if (this._json._id && typeof this._json._id === 'string') {
                this._json._id = new ObjectID(this._json._id);
            }
        }
        else {
            this._json = {
                //	_id: ObjectId
                name: name,
                created: Date.now(),
                status: STATUS.SCHEDULED,
                started: null,	// timestamp
                finished: null,	// timestamp
                duration: 0,	// seconds
                data: data || {}
                //	size: 0,
                //	done: 0,
                //	bookmark: null,	// anything
                //	error: ''
                //  data: {},
                //  subs: [
                //  	{_id: same ObjectId, idx: 0, name: 'same or other name', created, status, started, finished, duration, size, done, bookmark, error, data},
                //  	{_id: same ObjectId, idx: 1, name: 'same or other name', created, status, started, finished, duration, size, done, bookmark, error, data}
                //	],
            };
        }
        this._replace = false;
        this._errorCount = 0;

    }

    /**
    * Get job id with fallback
    * @returns {string} job id
    **/
    get id() {
        return this._json._id ? '' + this._json._id : undefined;
    }

    /**
    * Get job id
    * @returns {string} job id
    **/
    get _id() {
        return this._json._id;
    }

    /**
    * Get job channel
    * @returns {string} job channel id
    **/
    get channel() {
        return (this.id || '') + (this.isInExecutor ? ':executor' : '');
    }

    /**
    * Get job name
    * @returns {string} job name
    **/
    get name() {
        return this._json.name;
    }

    /**
    * Get job data
    * @returns {object} job data
    **/
    get data() {
        return this._json.data || {};
    }

    /**
    * Get job status
    * @returns {number} job status
    **/
    get status() {
        return this._json.status;
    }

    /**
    * Check if job is completed
    * @returns {boolean} if job was completed
    **/
    get isCompleted() {
        return this._json.status === STATUS.DONE;
    }

    /**
    * Check if job was aborted
    * @returns {boolean} if job was aborted
    **/
    get isAborted() {
        return this._json.status === STATUS.ABORTED;
    }

    /**
    * Check if job is running
    * @returns {boolean} if job is running
    **/
    get canRun() {
        return this._json.status === STATUS.RUNNING;
    }

    /**
    * Get schedule object
    * @returns {object} schedule object from later js
    **/
    get scheduleObj() {
        return this._json.schedule;
    }

    /**
    * Get strict schedule value
    * @returns {boolean} strict
    **/
    get strict() {
        return this._json.strict;
    }

    /**
    * Get next run time
    * @returns {object} next run
    **/
    get next() {
        return this._json.next;
    }


    /**
    * Schedule job
    * @param {object} schedule - schedule object from later js
    * @param {number} strict (optional) - maximum time in ms after each schedule occurrence date the job must be run, otherwise it'd be discarded
    * @param {object} nextTime (optional) - next run time
    * @returns {object} result of saving job
    **/
    schedule(schedule, strict, nextTime) {
        this._json.schedule = schedule;
        this._json.status = STATUS.SCHEDULED;

        if (strict !== undefined) {
            this._json.strict = strict;
        }

        if (nextTime) {
            this._json.next = nextTime;
        }
        else {
            schedule = typeof schedule === 'string' ? later.parse.text(schedule) : schedule;
            var next = later.schedule(schedule).next(1);
            if (!next) {
                return null;
            }

            this._json.next = next.getTime();
        }

        return this._save();
    }

    /**
    * Run job once
    * @param {number|Date} date - date when to run
    * @param {number} strict (optional) - maximum time in ms after scheduled date the job must be run, otherwise it'd be discarded
    * @returns {object} result of saving job
    **/
    once(date, strict) {
        this._json.next = typeof date === 'number' ? date : date.getTime();
        if (strict !== undefined) {
            this._json.strict = strict;
        }

        return this._save();
    }

    /**
    * Run job now
    * @returns {object} result of saving job
    **/
    now() {
        this._json.next = Date.now();
        return this._save();
    }

    /**
    * Run job in providd amount of seconds
    * @param {number} seconds - after how many seconds to run the job
    * @returns {object} result of saving job
    **/
    in(seconds) {
        this._json.next = Date.now() + seconds * 1000;
        return this._save();
    }

    /**
    * Replace existing job if, it exists
    * @returns {object} self
    **/
    replace() {
        if (this.isInExecutor) {
            throw new Error('Replace cannot be run from executor');
        }
        this._replace = true;
        return this;
    }

    /**
    * Update job atomically
    * @param {object} db - database connection
    * @param {object} match - query for job
    * @param {object} update - update query for job
    * @param {boolean=} neo - should return new document
    * @returns {Promise} promise
    **/
    static updateAtomically(db, match, update, neo = true) {
        return new Promise((resolve, reject) => {
            db.collection('jobs').findAndModify(match, [['_id', 1]], update, {new: neo}, (err, doc) => {
                if (err) {
                    reject(err);
                }
                else if (!doc || !doc.ok || !doc.value) {
                    reject('Job not found');
                }
                else {
                    resolve(doc.value);
                }
            });
        });
    }

    /**
    * Update job
    * @param {object} db - database connection
    * @param {object} match - query for job
    * @param {object} update - update query for job
    * @returns {Promise} promise
    **/
    static update(db, match, update) {
        return new Promise((resolve, reject) => {
            db.collection('jobs').updateOne(match, update, (err, res) => {
                if (err || !res) {
                    reject(err || 'no res');
                }
                else {
                    resolve(res.matchedCount ? true : false);
                }
            });
        });
    }

    /**
    * Update multiple jobs
    * @param {object} db - database connection
    * @param {object} match - query for job
    * @param {object} update - update query for job
    * @returns {Promise} promise
    **/
    static updateMany(db, match, update) {
        return new Promise((resolve, reject) => {
            db.collection('jobs').updateMany(match, update, (err, res) => {
                if (err || !res) {
                    reject(err || 'no res');
                }
                else {
                    resolve(res.matchedCount || 0);
                }
            });
        });
    }

    /**
    * Insert new job
    * @param {object} db - database connection
    * @param {object} data - job document to insert
    * @returns {Promise} promise
    **/
    static insert(db, data) {
        return new Promise((resolve, reject) => {
            db.collection('jobs').insertOne(data, (err, res) => {
                if (err || !res) {
                    reject(err || 'no res');
                }
                else if (res.insertedCount) {
                    data._id = data._id || res.insertedId;
                    resolve(data);
                }
                else {
                    resolve(null);
                }
            });
        });
    }

    /**
    * Read job
    * @param {object} db - database connection
    * @param {string} id - id of the job
    * @returns {Promise} promise
    **/
    static load(db, id) {
        return new Promise((resolve, reject) => {
            db.collection('jobs').findOne({_id: typeof id === 'string' ? db.ObjectID(id) : id}, (err, job) => {
                if (err || !job) {
                    reject(err || 'no res');
                }
                else {
                    resolve(job);
                }
            });
        });
    }

    /**
    * Read multiple jobs
    * @param {object} db - database connection
    * @param {object} match - query for jobs
    * @returns {Promise} promise
    **/
    static findMany(db, match) {
        return new Promise((resolve, reject) => {
            db.collection('jobs').find(match).toArray((err, jobs) => {
                if (err) {
                    reject(err);
                }
                else {
                    resolve(jobs || []);
                }
            });
        });
    }

    /**
    * Replace jobs that will run after provided timestamp
    * @param {number} next - timestamp
    * @param {function} queror - function which modifies query request
    * @returns {Promise} promise
    **/
    replaceAfter(next, queror) {
        log.d('[!!!!!!!!] %s: replaceAfter %d (%d %j)', this._id, next, this.next, this.data);
        return new Promise((resolve, reject) => {
            let query = {
                status: STATUS.SCHEDULED,
                name: this.name,
                next: {$gte: next}
            };
            if (this._id) {
                query._id = {$ne: this._id};
            }
            if (this.data && Object.keys(this.data).length) {
                query.data = this.data;
            }
            if (queror) {
                queror(query);
            }

            log.d('[!!!!!!!!] %s: replaceAfter %d query', this._id, next, query);

            Job.updateAtomically(this.db(), query, {$set: {next: next}}).then(arg => {
                log.d('[!!!!!!!!] %s: replaceAfter %d RESOLVE UPDATED: %j', this._id, next, arg);
                resolve(arg);
            }, err => {
                log.d('[!!!!!!!!] %s: replaceAfter %d ERROR %j', this._id, next, err);
                if (err === 'Job not found') {
                    query.next = {$lt: next};
                    Job.findMany(this.db(), query).then(existing => {
                        log.d('[!!!!!!!!] %s: replaceAfter %d EXISTING %j', this._id, next, existing);
                        if (existing && existing.length) {
                            log.d('[!!!!!!!!] %s: replaceAfter %d RESOLVE EXISTING', this._id, next);
                            resolve(existing[0]);
                        }
                        else {
                            log.d('[!!!!!!!!] %s: replaceAfter %d NEW JOB', this._id, next);
                            new Job(this.name, this.data).once(next).then((arg) => {
                                log.d('[!!!!!!!!] %s: replaceAfter %d NEW JOB CREATED %j', this._id, next, arg);
                                resolve(arg);
                            }, (e2) => {
                                log.d('[!!!!!!!!] %s: replaceAfter %d NEW JOB ERROR %j', this._id, next, e2);
                                reject(e2);
                            });
                        }
                    }, reject);
                }
                else {
                    reject(err);
                }
            });
        });
    }

    /**
    * Replace all jobs
    * @param {number} next - timestamp
    * @returns {object} job data
    **/
    async _replaceAll() {

        let query = {
            status: STATUS.SCHEDULED,
            name: this.name
        };
        if (this.data && Object.keys(this.data).length) {
            query.data = this.data;
        }

        log.i('Replacing jobs %s (%j)', this.name, query);

        if (this._json.schedule) {
            // query.next = {$lte: Date.now() + 30000};
            let updated = await Job.updateMany(this.db(), query, {$set: {status: STATUS.CANCELLED}});
            if (updated) {
                log.i('Cancelled %d previous jobs %s (%j)', updated, this.name, query);
            }

            // delete query.next;
        }

        let existing = await Job.findMany(this.db(), query);
        if (existing.length === 1) {
            log.i('No need for replace of %s (%j): existing job %j', this.name, this.data, existing[0]);
            this._json = existing[0];
            return existing[0];

        }
        else if (existing.length === 0) {
            log.i('No job to replace, inserting %j', this._json);
            let inserted = await Job.insert(this.db(), this._json);
            if (!inserted) {
                throw new Error('Cannot insert a job');
            }
            this._json = inserted;
            return inserted;
        }
        else {
            let last = existing.sort((a, b) => b.next - a.next)[0],
                others = existing.filter(a => a !== last);

            log.i('replacing last job %j with %j', last, this._json);

            if (others.length) {
                await Job.updateMany(this.db(), {_id: {$in: others.map(o => o._id)}}, {$set: {status: STATUS.CANCELLED}});
            }

            let neo;
            try {
                neo = await Job.updateAtomically(this.db(), query, {$set: this._json});
            }
            catch (e) {
                log.w('Job was modified while rescheduling, skipping', e);
            }

            if (!neo) {
                log.w('Job was modified while rescheduling, skipping');
                return null;
            }
            this._json = neo;
            return neo;
        }
    }

    /**
    * Save job
    * @param {boolean} set - if should update instead of creating
    * @returns {object} job data
    **/
    async _save(set) {
        if (set) {
            log.d('Updating job %s with %j', this.id, set);
        }
        else {
            log.d('Creating job %s with data %j', this.name, this.data);
        }

        if (this._replace) {
            return await this._replaceAll();
        }
        else if (this._id) {
            if (set) {
                Object.keys(set).forEach(k => {
                    this._json[k] = set[k];
                });
            }
            else {
                set = Object.assign({}, this._json);
                delete set._id;
            }

            set.modified = Date.now();
            if (this._json.started || set.started) {
                set.duration = set.modified - (this._json.started || set.started);
            }

            let updated = await Job.update(this.db(), {_id: this._id}, {$set: set});
            if (updated) {
                return set;
            }
            else {
                throw new Error('No such job in db');
            }
        }
        else {
            Object.keys(set || {}).forEach(k => {
                this._json[k] = set[k];
            });

            let inserted = await Job.insert(this.db(), this._json);
            if (inserted) {
                return inserted;
            }
            else {
                throw new Error('Cannot insert a job');
            }
        }
    }

    // _save (set) {
    // 	return new Promise((resolve, reject) => {
    // 		try {
    // 			this._json.modified = Date.now();
    // 			var query, update, clb = (err, res) => {
    // 				if (err) { 
    // 					if (this._errorCount++ < MAXIMUM_SAVE_ERRORS) {
    // 						log.w('Error while saving job: %j', err);
    // 						setTimeout(() => {
    // 							this._save(set).then(resolve.bind(null, set), reject);
    // 						}, 1000); 
    // 					} else {
    // 						log.e('Error while saving job: %j', err);
    // 						reject(err);
    // 					}
    // 				} else if (res.result.nModified === 0) {
    // 					log.e('Job %s has been changed while doing _save: %j / setting %j for query %j', this._id, this._json, update, query);
    // 					reject('Job cannot be found while doing _save');
    // 				} else {
    // 					resolve(set || this._json);
    // 				}
    // 			};

    // 			if (this._replace) {
    // 				query = {status: STATUS.SCHEDULED, name: this.name};
    // 				if (this.data) { query.data = this.data; }

    // 				if (this._json.schedule) {
    // 					let schedule = typeof this._json.schedule === 'string' ? later.parse.text(this._json.schedule) : this._json.schedule,
    // 						prev = later.schedule(schedule).prev(1);

    // 					log.i('replacing job %j with', query, this._json);
    // 					this.db().collection('jobs').find(query).toArray((err, jobs) => {
    // 						if (err) {
    // 							log.e('job replacement error when looking for existing jobs to replace', err);
    // 							this.db().collection('jobs').save(this._json, clb);
    // 						} else if (jobs && jobs.length) {
    // 							try {
    // 							let last = jobs.sort((a, b) => b.next - a.next)[0];
    // 							let others = jobs.filter(a => a !== last);
    // 							if (others.length) {
    // 								log.i('found %d jobs with %j, going to cancel %j', jobs.length, query, others.map(j => j._id));
    // 								Promise.all(others.map(j => {
    // 									return require('./index.js').create(j).cancel(this.db(), false);
    // 								}));
    // 								// this.db().collection('jobs').update({_id: {$in: others.map(j => j._id)}}, {$set: {status: STATUS.CANCELLED}}, {multi: true}, log.logdb(''));
    // 							}

    // 							if (last.schedule === this._json.schedule && last.next > prev.getTime()) {
    // 								// just do nothing
    // 								log.i('last job is scheduled correctly, won\'t replace anything for %j: current %j, won\'t replace to %j', query, new Date(last.next), new Date(this.next));
    // 								resolve(set);
    // 							} else {
    // 								log.i('replacing last job %j with %j', last, this._json);
    // 								this.db().collection('jobs').findAndModify(query, [['_id', 1]], {$set: this._json}, {new: true}, (err, job) => {
    // 									if (err) {
    // 										log.e('job replacement error, saving new job', err, job);
    // 										this.db().collection('jobs').save(this._json, clb);
    // 									} else if (job && !job.value){
    // 										log.i('no job found to replace, saving new job', err, job);
    // 										this.db().collection('jobs').save(this._json, clb);
    // 									} else {
    // 										log.i('job replacing done', job.value);
    // 										resolve(set);
    // 									}
    // 								});
    // 							}
    // 						}catch(e) { log.e(e, e.stack); }
    // 						} else {
    // 							log.i('no jobs found to replace for %j, saving new one', query);
    // 							this.db().collection('jobs').save(this._json, clb);
    // 						}
    // 					});
    // 				} else {
    // 					this.db().collection('jobs').findAndModify(query, [['_id', 1]], {$set: this._json}, {new: true}, (err, job) => {
    // 						if (err) {
    // 							log.e('job replacement error, saving new job', err, job);
    // 							this.db().collection('jobs').save(this._json, clb);
    // 						} else if (job && !job.value){
    // 							log.i('no job found to replace, saving new job', err, job);
    // 							this.db().collection('jobs').save(this._json, clb);
    // 						} else {
    // 							log.i('job replacing done', job.value);
    // 							resolve(set);
    // 						}
    // 					});
    // 				}


    // 			} else if (this._json._id) {
    // 				if (set) {
    // 					for (let k in set) {
    // 						if (k !== '_id') {
    // 							this._json[k] = set[k];
    // 						}
    // 					}
    // 				}
    // 				query = {_id: this._json._id};
    // 				update = {$set: set || this._json};
    // 				update.$set.modified = this._json.modified;
    // 				delete update.$set._id;
    // 				log.d('saving %j: %j', query, update);
    // 				this.db().collection('jobs').updateOne(query, update, clb);
    // 			} else {
    // 				log.d('saving %j', this._json);
    // 				this._json._id = this.db().ObjectID();
    // 				this.db().collection('jobs').save(this._json, clb);
    // 			}
    // 		} catch(e) {
    // 			log.e(e, e.stack);
    // 			throw e;
    // 		}
    // 	}).then(() => { this._replace = false; return set; });
    // }

    /**
    * Get database connection
    * @returns {object} db
    **/
    db() {
        return require('./index.js').db;
    }

    /**
    * Abort job
    * @param {Error} err - error with which to abort
    * @returns {Promise} promise
    **/
    _abort(err) {
        log.d('%s: aborting', this.channel);
        return this._finish(err || 'Aborted');
        // if (this.retryPolicy().errorIsRetriable(err)) {
        // 	log.d('%s: won\'t abort since error %j is retriable', this._idIpc, err);
        // } else {
        // 	log.d('%s: aborting', this._idIpc);
        // 	return this._finish(err || 'Aborted');
        // }
    }

    /**
    * Finish job
    * @param {Error=} err - error with which to finish
    * @returns {Promise} promise
    **/
    _finish(err) {
        if (this.isCompleted) {
            return Promise.resolve();
        }
        else {
            log.d('%s: finishing', this.id);
            this._json.status = STATUS.DONE;
            this._json.finished = Date.now();
            this._json.duration = this._json.finished - this._json.started;
            this._json.error = err ? (err.message || err.code || (typeof err === 'string' ? err : JSON.stringify(err))) : null;
            this.emit(EVT.DONE, this._json);
            return this._save({
                status: this._json.status,
                finished: this._json.finished,
                duration: this._json.duration,
                error: this._json.error
            });
        }
    }

    /**
    * Internal run function for managing states
    * @returns {Promise} promise
    **/
    _run() {
        return new Promise((resolve, reject) => {
            this._json.status = STATUS.RUNNING;
            this._json.started = Date.now();
            this._save({
                status: STATUS.RUNNING,
                started: this._json.started
            }).then(() => {
                try {
                    let promise = this.run(
                        this.db(),
                        (err) => {
                            log.d('%s: done running: error %j', this.id, err);
                            if (!this.isCompleted) {
                                this._finish(err).then(
                                    err ? reject.bind(null, err) : resolve,
                                    err ? reject.bind(null, err) : reject
                                );
                            }
                        },
                        () => {});

                    if (promise && promise instanceof Promise) {
                        promise.then(() => {
                            log.d('%s: done running', this.id);
                            this._finish().then(resolve, resolve);
                        }, (err) => {
                            log.d('%s: done running: error %j', this.id, err);
                            if (!this.isCompleted) {
                                this._finish(err).then(reject.bind(null, err), reject.bind(null, err));
                            }
                        });
                    }
                }
                catch (e) {
                    log.e('[%s] caught error when running: %j / %j', this.channel, e, e.stack);
                    this._finish(e).then(reject.bind(null, e), reject.bind(null, e));
                }
            }, reject);
        });
    }

    /**
    * Run job with retry policy applied
    * @returns {Promise} promise
    **/
    _runWithRetries() {
        return this.retryPolicy().run(this._run.bind(this));
    }

    /**
	 * Override in actual job class
	 * Takes 2 parameters omitted for the sake of ESLint:
	 * 		- db - db connection which must be used for this job in most cases, otherwise you're responsible for opening / closing an appropriate connection
	 * 		- done - function which must be called (when promise is not returned) when job processing is done with either no arguments or error string
	 * 		
	 */
    run(/*db, done*/) {
        throw new Error('Job must be overridden');
    }

    /**
	 * Override if job needs a graceful cancellation. Job is cancelled in two cases:
	 * 		1. When server is restarted and last modification of the job was too long ago to consider it not running (becauseOfRestart = true).
	 * 		2. When server was not running at the time strict job should have been run (becauseOfRestart = false).
     * @returns {Promise} promise
	 */
    cancel(/*db, becauseOfRestart*/) {
        return this._save({
            status: STATUS.CANCELLED,
            error: 'Cancelled on restart',
            modified: Date.now(),
            finished: Date.now()
        });
    }

    /**
	 * Override if default policy isn't good enough
     * @returns {RetryPolicy} retry policy
	 */
    retryPolicy() {
        return new retry.DefaultRetryPolicy(3);
    }

    /**
	 * Override if job needs a manager instance to run
     * @returns {Promise} promise
	 */
    prepare(/*manager, db*/) {
        return Promise.resolve();
    }

    /**
	 * Override if 0 doesn't work for this job:
	 *  0 = default = run jobs of this type on any number of servers, with any number of jobs running at the same time
	 *  1 ... N = run not more than N jobs of this time at the same time
     * @returns {number} concurrency
	 */
    getConcurrency() {
        return 0;
    }
}

/**
 * Job which needs some resource to run: socket, memory cache, etc.
 * Resource lives longer than a single job and reassigned from one job to another when first one is done. 
 */
class ResourcefulJob extends Job {
    /** Create resource **/
    createResource(/*_id, name, options */) {
        throw new Error('ResourcefulJob.createResource must be overridden to return possibly open resource instance');
    }

    /** Release resource **/
    releaseResource(/* resource */) {
        throw new Error('ResourcefulJob.releaseResource must be overridden to return possibly open resource instance');
    }

    /** Get resource name **/
    resourceName() {
        throw new Error('ResourcefulJob.resourceName must be overridden to return non-unique string which identifies type of a resource');
    }
}

/**
 * Job which runs in 2 processes:
 * - Initiator process (Countly master) creates subprocess (fork of executor.js) and processes IPC messages from subprocess
 * - Subprocess actually runs the job, but sends state updates through IPC, keeping all listeners aware of its lifecycle
 *
 * This complex structure gives following advantage:
 * - Subprocess can safely crash, initiator process will be able to just restart the job in a new subprocess from the point it stopped.
 * 
 * Extends ResourcefulJob, meaning job resource can live longer than job and reassigned to other job after current job is done.
 *
 * Works in combination with IPCFaçadeJob which listens for IPC status messages from subprocess and persists them in DB.
 */
class IPCJob extends ResourcefulJob {

    /**
    * Check if job is run in executor
    * @returns {boolean} if job is run in executor
    **/
    get isInExecutor() {
        return process.argv[1].endsWith('executor.js');
    }

    /**
    * Get retry policy
    * @returns {RetryPolicy} retry policy
    **/
    retryPolicy() {
        return new retry.IPCRetryPolicy(1);
    }

    /**
    * Release resource
    * @returns {Promise} promise
    **/
    releaseResource(/* resource */) {
        return Promise.resolve();
    }

    /**
    * Save resource data
    * @param {object} data - resource data
    * @returns {Promise} promise
    **/
    _save(data) {
        if (process.send) {
            log.d('[%d]: Sending progress update %j', process.pid, {
                _id: this.channel,
                cmd: EVT.UPDATE,
                from: process.pid,
                data: data
            });
            process.send({
                _id: this.channel,
                cmd: EVT.UPDATE,
                from: process.pid,
                data: data
            });
        }
        return super._save.apply(this, arguments);
    }
}
/** Listens for IPC status messages from subprocess and persists them in DB **/
class IPCFaçadeJob extends ResourcefulJob {
    /**
    * Constructor
    * @param {Job} job - job
    * @param {function} getResourceFaçade - function to get resource
    **/
    constructor(job, getResourceFaçade) {
        super(job._json, null, null);
        this.job = job;
        this.getResourceFaçade = getResourceFaçade;
    }

    /**
    * Create resource
    * @returns {object} resource
    **/
    createResource() {
        log.d('[%s] IPCFaçadeJob creates a resource', this.job.channel);
        return this.getResourceFaçade();
    }

    /**
    * Get resource name
    * @returns {string} resource name
    **/
    resourceName() {
        return this.job.resourceName();
    }

    /**
    * Release resource
    * @param {object} resource to release
    * @returns {Promise} promise
    **/
    releaseResource(resource) {
        log.d('[%s] IPCFaçadeJob releases its resource', this.job.channel);
        return this.job.releaseResource(resource);
    }

    /**
    * Get retry policy
    * @returns {RetryPolicy} retry policy
    **/
    retryPolicy() {
        return this.job.retryPolicy();
    }

    /**
    * Run the job
    * @returns {Promise} promise
    **/
    _run() {
        log.d('[%s] Running in IPCFaçadeJob', this.job.channel);
        try {
            this.resourceFaçade = this.getResourceFaçade();
        }
        catch (e) {
            log.d('[%s] Caught Façade error: %j', this.job.channel, e);
            return Promise.reject(e);
        }

        this.ipcChannel = new ipc.IdChannel(this.job.channel).attach(this.resourceFaçade._worker);
        this.ipcChannel.on(EVT.UPDATE, (json) => {
            log.d('[%s] UPDATE in IPCFaçadeJob: %j', this.job.channel, json);
            for (var k in json) {
                this.job._json[k] = json[k];
            }
            if (json.status) {
                this.emit(EVT.UPDATE, json);
            }
        });

        return this.resourceFaçade.run(this).then(() => {
            this.ipcChannel.remove();
        }, (error) => {
            this.ipcChannel.remove();
            log.e('[%s] Error in IPCFaçadeJob %s: %j / %j', this.job.channel, this.resourceFaçade._id, error, error.stack);
            this.job._finish(error || 'Aborted').catch(()=>{});
            throw error;
        });
    }

    /**
    * Abort the job
    * @param {Error} error - error with which to abort
    **/
    _abort(error) {
        log.w('%s: ABORTING in IPCFaçadeJob', this.job.channel);
        this.resourceFaçade.abort(error);
    }
}

/** Class for transiend jobs **/
class TransientJob extends IPCJob {
    /**
    * Send data for current channel
    * @param {object} data - data to send
    **/
    _sendAndSave(data) {
        log.d('[%s] transient _sendAndSave: %j', this.channel, data);
    }

    /**
    * Save data
    * @param {object} data - data to save
    * @returns {Promise} promise
    **/
    _save(data) {
        if (process.send) {
            log.d('[%d]: Sending progress update %j', process.pid, {
                _id: this.channel,
                cmd: EVT.UPDATE,
                from: process.pid,
                data: data
            });
            process.send({
                _id: this.channel,
                cmd: EVT.UPDATE,
                from: process.pid,
                data: data
            });
        }
        if (data) {
            for (let k in data) {
                if (k !== '_id') {
                    this._json[k] = data[k];
                }
            }
        }
        return Promise.resolve(data);
    }
}

module.exports = {
    EVT: EVT,
    ERROR: ERROR,
    Job: Job,
    IPCJob: IPCJob,
    IPCFaçadeJob: IPCFaçadeJob,
    TransientJob: TransientJob,
    STATUS: STATUS,
    STATUS_MAP: STATUS_MAP,
    debounce: debounce
};