'use strict';
const cp = require('child_process'),
EventEmitter = require('events'),
ipc = require('./ipc.js'),
JOB = require('./job.js'),
log = require('../../utils/log.js')('jobs:resource');
const CMD = {
RUN: 'resource:run',
ABORT: 'resource:abort',
DONE: 'resource:done',
OPENED: 'resource:opened',
CLOSED: 'resource:closed',
CRASH: 'resource:crash',
TIMEOUT: 'resource:timeout',
ONLINE: 'resource:online'
},
EVT = {
ONLINE: 'online',
OPENED: 'opened',
CLOSED: 'closed',
ABORT: 'abort',
EXIT: 'exit',
TIMEOUT: 'timeout',
CRASH: 'crash',
},
RESOURCE_PING_INTERVAL = 10000,
RESOURCE_CLOSE_TIMEOUT = 360000,
RESOURCE_CMD_TIMEOUT = 15 * 60000;
/**
* Return random string
* @returns {string} random string
**/
function random() {
var s = Math.random().toString(36).slice(2);
return s;
// return s.length === 16 ? s : random();
}
/**
* Base class for both: Resource & ResourceFaçade which implements interface for talking to Job / JobFaçade & Manager.
*/
class ResourceInterface extends EventEmitter {
/**
* Constructor
* @param {string} id - id of resource
* @param {string} name - name of the resource
**/
constructor(id, name) {
super();
this._online = false;
this._open = null;
this._job = null;
this._id = id;
this._name = name;
}
/**
* Check if resource is busy
* @returns {boolean} if resource is busy
**/
get isBusy() {
return !!this._job;
}
/**
* Check if resource is open
* @returns {boolean} if resource is open
**/
get isOpen() {
return !!this._open;
}
/**
* Get id of resource
* @returns {string} resource id
**/
get id() {
return this._id;
}
/**
* Get name of resource
* @returns {string} resource name
**/
get name() {
return this._name;
}
/**
* Get resource job
* @returns {Job} resource job
**/
get job() {
return this._job;
}
/**
* Set resource job
* @param {Job} job - resource job
**/
set job(job) {
this._job = job;
}
/**
* Run job
**/
run(/*job*/) {
throw new Error('Resource.run must be overridden to return promise');
}
/**
* Abort job
**/
abort(/*job*/) {
throw new Error('Resource.run must be overridden to return promise');
}
/**
* Whether manager is allowed to terminate process on master exit
* @return {boolean} true if manager can;
*/
canBeTerminated() {
return true;
}
/**
* Resolved returned promise, once resource is online
* @returns {Promise} promise
**/
onceOnline() {
if (this._online) {
return Promise.resolve();
}
else {
return new Promise((resolve) => {
setTimeout(resolve.bind(null, false), 30000);
this.once(EVT.ONLINE, () => {
this._online = true;
resolve(true);
});
});
}
}
/**
* Resolved returned promise, once resource is opened
* @returns {Promise} promise
**/
onceOpened() {
if (this._open) {
return Promise.resolve();
}
else if (!this._online) {
return new Promise((resolve, reject) => {
return this.onceOnline().then(() => {
this.onceOpened().then(resolve, reject);
}, reject);
});
}
else {
this.once(EVT.OPENED, () => {
this._open = true;
});
return this.open();
}
}
/**
* Resolved returned promise, once resource is closed
* @returns {Promise} promise
**/
onceClosed() {
if (!this._open) {
return Promise.resolve();
}
else {
this.once(EVT.CLOSED, () => {
this._open = false;
});
return this.close().catch(e => log.w('[%d]: Error in onceClosed of resource %s', process.pid, this.id, e.stack || e));
}
}
}
/**
* ResourceFaçade is a thin IPC façade for actual resource running in a separate process.
* Constructor requires actual job instance just to instantiate a resource from its createResource call, separate run call is required to start it.
*/
class ResourceFaçade extends ResourceInterface {
/**
* Constructor
* @param {Job} job for resource
* @param {string} file
**/
constructor(job, file) {
super('res:' + job.resourceName() + ':' + random(), 'res:' + job.resourceName());
this._file = file;
this._worker = cp.fork(__dirname + '/executor.js', [JSON.stringify({
_id: this.id,
name: this.name,
file: this._file,
job: job._json
})]);
log.i('[façade]: Started resource %j in %d for %j: %j', this.name, process.pid, this._worker.pid, this.id);
this.channel = new ipc.IdChannel(this.id);
this.channel.attach(this._worker);
this.channel.on(CMD.DONE, (json) => {
if (json.error) {
log.i('[façade]: Done running %s with error %j %j (%j) in %d', json._id, json.error, this.name, this.id, this._worker.pid);
this.reject(json.error);
}
else {
log.i('[façade]: Done running %s with success %j (%j) in %d', json._id, this.name, this.id, this._worker.pid);
this.resolve(json);
}
});
this.channel.on(CMD.ABORT, () => {
this.reject(EVT.ABORT);
});
this.channel.on(CMD.OPENED, (json) => {
log.i('[façade]: Resource %j opened in %d: %j, %j', this.name, this._worker.pid, this.id, json);
this._open = true;
this._canBeTerminated = json.canBeTerminated;
this.emit(EVT.OPENED, json);
});
this.channel.on(CMD.CLOSED, (err) => {
log.i('[façade]: Resource %j closed in %d: %j', this.name, this._worker.pid, this.id, err);
this._open = false;
if (!this._crashed) {
if (this._job) {
log.i('[façade]: Resource %j closed in %d (%j) while running %s, will reject', this.name, this._worker.pid, this.id, this._job.channel, err);
this.reject(err || EVT.CLOSED);
}
this.emit(EVT.CLOSED, err);
}
});
this.channel.on('exit', () => {
log.i('[façade]: Resource %j exited in %d: %j', this.name, this._worker.pid, this.id);
if (!this._crashed) {
this.emit(EVT.CLOSED);
if (this._job) {
log.i('[façade]: Resource %j exited in %d (%j) while running %s, will reject', this.name, this._worker.pid, this.id, this._job.channel);
this.reject('Process exited');
}
this.emit(EVT.EXIT);
}
});
this.channel.on(CMD.CRASH, (err) => {
if (!this._crashed) {
log.e('[façade]: Resource %s (%s) crashed in %d: %j', this.name, this.id, this._worker.pid, err);
if (this.job) {
this.reject([JOB.ERROR.CRASH, err]);
}
else {
this.emit(EVT.CRASH);
}
this.close().catch(e => log.w('[%d]: Error in .on(CMD.CRASH) of resource %s', process.pid, this.id, e.stack || e));
}
});
this.channel.on(CMD.TIMEOUT, (err) => {
if (!this._crashed) {
log.e('[façade]: Resource %s (%s) timed out in %d: %j', this.name, this.id, this._worker.pid, err);
if (this.job) {
this.reject([JOB.ERROR.TIMEOUT, err]);
}
else {
this.emit(EVT.TIMEOUT);
}
this.close().catch(e => log.w('[%d]: Error in .on(CMD.TIMEOUT) of resource %s', process.pid, this.id, e.stack || e));
}
});
this.channel.on(CMD.ONLINE, () => {
log.d('ResourceFaçade %s is online', this.id);
this.emit(EVT.ONLINE);
});
}
/**
* Check if resource is busy
* @returns {boolean} if resource is busy
**/
get isBusy() {
return !!this._job;
}
/**
* Check if resource is ready
* @returns {boolean} if resource is ready
**/
get isReady() {
return this.open === true || this.open === null;
}
/**
* Whether manager is allowed to terminate process on master exit
* Uses opened event data to know if underlying resource can be terminated
* @return {boolean} true if manager can;
*/
canBeTerminated() {
return !!this._canBeTerminated;
}
/**
* Run job
* @param {Job} job to run
* @returns {Promise} promise
**/
run(job) {
if (this.isBusy) {
log.w('[façade]: Resource façade %j is busy in %d: %j', this.name, this._worker.pid, this.id);
return Promise.reject('busy');
}
this.job = job;
log.i('[façade]: Resource façade %j in %d (%j) is going to run %s', this.name, this._worker.pid, this.id, job.channel);
return new Promise((resolve, reject) => {
this._resolve = resolve;
this._reject = reject;
this.onceOpened().then(this.channel.send.bind(this.channel, CMD.RUN, job._json), (e) => {
log.e('Error in job resource façade .run() promise ', e, e.stack);
this.close().catch(err => log.w('[%d]: Error in onceOpened of resource %s', process.pid, this.id, err.stack || err));
reject(e);
});
});
}
/**
* Close resource
* @returns {Promise} promise
**/
close() {
if (this.isOpen) {
log.i('Closing underlying resource %s from façade', this.id);
log.i('Stack %j', new Error().stack);
return new Promise((resolve, reject) => {
setTimeout(reject.bind(null, JOB.ERROR.TIMEOUT), RESOURCE_CMD_TIMEOUT);
this.channel.once(CMD.CLOSED, () => {
this.channel.remove();
resolve();
});
this.channel.send(CMD.CLOSED);
});
}
else {
return Promise.resolve();
}
}
/**
* Kill resource
* @returns {Promise} promise
**/
kill() {
return new Promise((resolve) => {
this._worker.kill();
this._open = false;
this._job = null;
resolve();
});
}
/**
* Open resource
* @returns {Promise} promise
**/
open() {
if (this.isOpen) {
return Promise.resolve();
}
else {
log.i('Opening underlying resource %s from façade', this.id);
return new Promise((resolve, reject) => {
let to = setTimeout(() => {
reject(JOB.ERROR.TIMEOUT);
this.close().catch(e => log.w('[%d]: Error in .open() of resource %s', process.pid, this.id, e.stack || e));
}, RESOURCE_CMD_TIMEOUT);
this.channel.send(CMD.OPENED);
this.channel.once(CMD.OPENED, arg => {
clearTimeout(to);
resolve(arg);
});
this.channel.once(CMD.CLOSED, arg => {
clearTimeout(to);
reject(arg);
});
});
}
}
/**
* Abort job
* @param {Job} job to abort
* @returns {Promise} promise
**/
abort(job) {
if (!this.job) {
log.w('[façade]: Resource façade %j is not open in %d: %j', this.name, this._worker.pid, this.id);
return Promise.reject('no job is running to abort');
}
if (this.job._id !== job._id) {
log.w('[façade]: Resource façade %j is not open in %d: %j', this.name, this._worker.pid, this.id);
return Promise.reject('busy with job other than requested to abort');
}
this.channel.send(CMD.ABORT, job._json);
}
/**
* Resolve job
**/
resolve() {
if (this._resolve) {
log.i('[façade]: Resolving %s', this.job.channel);
this._resolve.apply(this, arguments);
this.job.releaseResource(this).then(() => {
log.i('[façade]: Released resource for %s', this.job.channel);
this.job = null;
}, err => {
log.e('[façade]: Resource release returned error for %s: %j', this.job.channel, err);
this.job = null;
});
this._resolve = this._reject = this.job.resource = null;
}
else {
log.i('ResourceFaçade %s already returned, nothing to resolve', this.id);
}
}
/**
* Reject job
* @param {Error} error with which to reject
**/
reject(error) {
if (this._reject) {
log.i('[façade]: Rejecting %s', this.job.channel);
this._reject.apply(this, arguments);
this.job.releaseResource(this).then(() => {
log.i('[façade]: Released resource for %s', this.job.channel);
this.job = null;
}, err => {
log.e('[façade]: Resource release returned error for %s: %j', this.job.channel, err);
this.job = null;
});
this._resolve = this._reject = this.job.resource = null;
}
else {
log.i('ResourceFaçade %s already returned, nothing to reject with %j', this.id, error);
}
}
}
/** Class for resource pool **/
class ResourcePool extends EventEmitter {
/**
* Constructor
* @param {function} construct - resource constructor
* @param {number} maxResources - maximal amount of resources
**/
constructor(construct, maxResources) {
super();
this.construct = construct;
this.maxResources = maxResources;
this.pool = [];
}
/**
* Check if there are any resources in the pool available
* @returns {boolean} if any available
**/
canRun() {
for (let i = 0; i < this.pool.length; i++) {
if (!this.pool[i].isBusy) {
return true;
}
}
return this.pool.length < this.maxResources;
}
/**
* Get a free resource
* @returns {object} resource to use
**/
getResource() {
for (let i = 0; i < this.pool.length; i++) {
if (!this.pool[i].isBusy) {
return this.pool[i];
}
}
if (this.pool.length < this.maxResources) {
let resource = this.construct();
resource.on(EVT.CLOSED, () => {
let idx = this.pool.indexOf(resource);
if (idx !== -1) {
log.d('[jobs]: Resource %j is closed, removing from pool', resource._id);
this.pool.splice(idx, 1);
if (this.pool.length === 0) {
this.emit(EVT.CLOSED);
}
}
});
this.pool.push(resource);
return resource;
}
throw new Error('ResourcePool should be checked with canRun() before calling getResource()');
}
/**
* Close resourse
* @returns {Promise} promise
**/
close() {
return Promise.all(this.pool.map(r => r.close().catch(e => log.w('[%d]: Error in .close() of pool for resource %s', process.pid, r.id, e.stack || e)))).catch((error) => {
log.w('Error while closing pooled resources', error);
});
}
/**
* Kill resourse
* @returns {Promise} promise
**/
kill() {
return Promise.all(this.pool.map(r => r.kill())).catch((error) => {
log.w('Error while killing pooled resources', error);
});
}
/** cann be terminated
* @returns {boolean} true - if pool is empty or termination allowed
*/
canBeTerminated() {
return this.pool.length === 0 || this.pool[0].canBeTerminated();
}
}
/**
* Main class for custom resources to override.
*/
class Resource extends ResourceInterface {
/**
* Cosntructor
* @param {string} _id - resource id
* @param {string} name - resource name
* @param {number} checkInterval - resource ping interval in miliseconds
* @param {number} autoCloseTimeout - resource close timeout in miliseconds
**/
constructor(_id, name, checkInterval, autoCloseTimeout) {
super(_id, name);
this._resourceCheckMillis = checkInterval || RESOURCE_PING_INTERVAL;
this._resourceAutoCloseMillis = autoCloseTimeout || RESOURCE_CLOSE_TIMEOUT;
}
/**
* Called when resource opens
**/
opened() {
this._open = true;
log.i('[%d]: Opened resource %j (%j)', process.pid, this.name, this.id);
this.emit(EVT.OPENED, {canBeTerminated: this.canBeTerminated()});
this.channel.send(CMD.OPENED, {canBeTerminated: this.canBeTerminated()});
this._checkInterval = setInterval(() => {
log.i('[%d]: Checking resource %j (%j)', process.pid, this.name, this.id);
this.checkActive().then((active) => {
log.i('[%d]: Resource %j (%j) is ' + (active ? 'active' : 'inactive'), process.pid, this.name, this.id);
if (!active) {
this._open = false;
this.close().catch(e => log.w('[%d]: Error in .opened() of resource %s', process.pid, this.id, e.stack || e));
}
}, (error) => {
log.e('[%d]: Couldn\'t check resource %j (%j): %j', process.pid, this.name, this.id, error);
this.close().catch(e => log.w('[%d]: Error in onceClosed of resource %s', process.pid, this.id, e.stack || e));
});
}, this._resourceCheckMillis);
}
/**
* Called when resource closes
**/
closed() {
this._open = false;
clearInterval(this._checkInterval);
clearTimeout(this._closeTimeout);
this.emit(EVT.CLOSED);
this.channel.send(CMD.CLOSED);
this.channel.remove();
log.i('[%d]: Closed resource %j (%j), exiting', process.pid, this.name, this.id);
setTimeout(() => {
process.exit(0);
}, 1000);
}
/**
* Open resource
**/
open() {
throw new Error('Resource.open must be overridden to return a Promise which calls Resource.opened in case of success');
}
/**
* Close resource
**/
close() {
throw new Error('Resource.open must be overridden to return a Promise which calls Resource.closed in case of success');
}
/**
* Kill resource
**/
kill() {
throw new Error('Resource.kill should not be ever called');
}
/**
* Check if resource is active
**/
checkActive() {
log.i('[%d]: Checking resource %j (%j)', process.pid, this.name, this.id);
}
/**
* Start channel communication
* @param {object} channel - channel to use
* @param {object} db - database connection
* @param {function} Constructor - cosntructor for job
**/
start(channel, db, Constructor) {
this.db = db;
this.channel = channel;
this.channel.on(CMD.RUN, (json) => {
if (this.job) {
log.e('[%d]: Resource is already running a job %j', process.pid, this.job.channel);
throw new Error('Resource is already running a job');
}
this.job = new Constructor(json);
this.job.resource = this;
if (!(this.job instanceof JOB.IPCJob)) {
throw new Error('Only IPCJob subclasses can be run on a resource');
}
clearTimeout(this._closeTimeout);
log.i('[%d]: Running job %j (%j) in resource %j', process.pid, this.job.name, this.job.channel, this.id);
this.onceOpened().then(() => {
log.d('[%d]: Resource is open for %j', process.pid, this.job.channel);
this.job.prepare(null, db).then(() => {
this.job._run(this.db, this).then(this.done.bind(this, this.job, null), this.done.bind(this, this.job));
}, this.done.bind(this, this.job));
}, this.done.bind(this, this.job));
});
this.channel.on(CMD.CLOSED, () => {
this.close().catch(e => log.w('[%d]: Error in CMD.CLOSED of resource %s', process.pid, this.id, e.stack || e));
});
this.channel.on(CMD.OPENED, () => {
log.d('[%d]: Opening %s by command of façade', process.pid, this.id);
this.open().then(() => {}, (err) => {
log.w('[%d]: Error while opening %s by command of façade: %j', process.pid, this.id, err);
this.channel.send(CMD.CLOSED, err);
setTimeout(() => {
this.close().catch(e => log.w('[%d]: Error in CMD.OPENED of resource %s', process.pid, this.id, e.stack || e));
}, 1000);
});
});
process.on('uncaughtException', (err) => {
log.e('[%d]: Crash in resource %s (%s):', process.pid, this.name, this.id, err, err.stack);
this.job._sendSave();
this.channel.send(CMD.CRASH, `uncaughtException: ${err}`);
setTimeout(this.close.bind(this), 1200);
});
this.channel.send(CMD.ONLINE);
log.d('Resource is online');
}
/**
* Job done
* @param {Job} job that is completed
* @param {Error} error - error if any happened
**/
done(job, error) {
if (error === JOB.ERROR.TIMEOUT) {
log.w('[%d]: Timeout for job %s (%s) in resource %s', process.pid, job.name, job.channel, this.id);
this.job._sendSave();
this.channel.send(CMD.TIMEOUT);
setTimeout(this.close.bind(this), 1200);
}
else {
log.i('[%d]: Done running job %j (%j) in resource %s', process.pid, job.name, job.channel, this.id);
job._json.error = job._json.error || error;
this.job.resource = this.job = null;
this.channel.send(CMD.DONE, job._json);
if (this._closeTimeout) {
clearTimeout(this._closeTimeout);
}
this._closeTimeout = setTimeout(() => {
log.i('[%d]: Auto-closing resource %s after %dms', process.pid, this.id, this._resourceAutoCloseMillis);
this.close().catch(e => log.w('[%d]: Error when closing resource %s for job %s', process.pid, this.id, job.channel, e.stack || e));
}, this._resourceAutoCloseMillis);
}
}
}
module.exports = {
CMD: CMD,
EVT: EVT,
Resource: Resource,
ResourceFaçade: ResourceFaçade,
ResourcePool: ResourcePool
};