'use strict';
const log = require('../../utils/log.js')('cache:' + process.pid),
// common = require('../../utils/common.js'),
{CentralWorker, CentralMaster} = require('../jobs/ipc.js'),
LRU = require('lru-cache'),
config = require('../../config.js');
const CENTRAL = 'cache', OP = {INIT: 'i', PURGE: 'p', READ: 'r', WRITE: 'w', UPDATE: 'u'};
// new job: {o: 2, k: 'ObjectId', g: 'jobs', d: '{"name": "jobs:clean", "status": 0, ...}'}
// job update: {o: 3, k: 'ObjectId', g: 'jobs', d: '{"status": 1}'}
// job retreival: {o: 1, k: 'ObjectId', g: 'jobs'}
// job removal: {o: 2, k: 'ObjectId', g: 'jobs', d: null}
// job removal from cache: {o: 0, k: 'ObjectId', g: 'jobs'}
/**
* Get value in nested objects
* @param {object} obj - object to checl
* @param {string|array} is - keys for nested value
* @param {any} value - if provided acts as setter setting this value in nested object
* @return {varies} returns value in provided key in nested object
*/
const dot = function(obj, is, value) {
if (typeof is === 'string') {
return dot(obj, is.split('.'), value);
}
else if (is.length === 1 && value !== undefined) {
obj[is[0]] = value;
return value;
}
else if (is.length === 0) {
return obj;
}
else if (!obj) {
return obj;
}
else {
return dot(obj[is[0]], is.slice(1), value);
}
};
/**
* Fifo size-bound key-value store.
*/
class DataStore {
/**
* Constructor
* @param {int} size max number of items to store
* @param {int} age max life of an object in ms
* @param {Function} dispose called whenever object is shifted from cache
*/
constructor(size, age, dispose) {
this.size = size;
this.age = age;
this.lru = new LRU({max: size || Number.MAX_SAFE_INTEGER, maxAge: age || Number.MAX_SAFE_INTEGER, dispose: dispose, noDisposeOnSet: true, updateAgeOnGet: true});
}
/**
* Length getter
* @return {int} current store size
*/
get length() {
return this.lru.length;
}
/**
* Read value by key
*
* @param {String} id key
* @return {Any} value or undefined if no value under such key is stored
*/
read(id) {
return this.lru.get(id.toString());
}
/**
* Store value
*
* @param {String} id key
* @param {Object} data value (pass null to delete data record)
* @return {Object} returns the data supplied if it was stored, undefined otherwise
*/
write(id, data) {
if (data) {
this.lru.set(id.toString(), data);
return data;
}
else if (this.read(id) !== undefined) {
this.lru.del(id.toString());
}
}
/**
* Update value stored under key with $set-like update
*
* @param {String} id key
* @param {Object} set flattened object of {attr1: 2, 'obj.attr2': 3} kind
* @return {Boolean} true if updated, false in case no object is stored under key
*/
update(id, set) {
let existing = this.read(id);
if (!existing) {
return false;
}
for (let k in set) {
dot(existing, k, set[k]);
}
return true;
}
/**
* Remove an item from the store
* @param {String} id id of the item to remove
*/
remove(id) {
this.write(id, null);
}
/**
* Iterate through all stored items
* @param {Function} f function(id, item) to call with every item
*/
iterate(f) {
this.lru.forEach((v, k) => f(k, v));
}
}
/**
* Cache instance for a worker process:
* - keeps local copy of a cache;
* - listens for updates from master;
* - notifies master about updates;
* - loads data from master when local copy misses a particular key.
*/
class CacheWorker {
/**
* Constructor
*
* @param {Mongo} db database instance
* @param {Number} size max number of cache groups
*/
constructor(db, size = 100) {
this.db = db;
this.data = new DataStore(size);
this.ipc = new CentralWorker(CENTRAL, (m, reply) => {
log.d('handling %s: %j', reply ? 'reply' : 'broadcast', m);
let {o, k, g, d} = m || {};
if (!g) {
return;
}
let store = this.data.read(g);
if (o === OP.INIT) {
this.data.write(g, new DataStore(d.size, d.age));
return;
}
else if (!store) {
log.e('Group store is not initialized');
return;
}
if (o === OP.PURGE) {
store.write(k, null);
}
else if (o === OP.READ) {
store.write(k, d);
}
else if (o === OP.WRITE) {
store.write(k, d);
}
else if (o === OP.UPDATE) {
store.update(k, d);
}
else {
throw new Error(`Illegal cache operaion: ${o}, ${k}, ${g}, ${d}`);
}
// store.iterate((k, v) => {
// log.d('have %s: %j', k, v);
// });
});
}
/**
* Start listening to IPC messages
*/
start() {
log.d('starting worker');
this.ipc.attach();
this.ipc.request({o: OP.INIT}).then(ret => {
log.d('got init response: %j', ret);
Object.keys(ret).forEach(g => {
if (!this.data.read(g)) {
this.data.write(g, new DataStore(ret[g].size, ret[g].age));
}
});
});
}
/**
* Write data to cache:
* - send a write to the master;
* - wait for a response with write status;
* - write the data to local copy and return it in case of success, throw error otherwise.
*
* @param {String} group group key
* @param {String} id data key
* @param {Object} data data to store
* @return {Object} data if succeeded, null otherwise, throws in case of an error
*/
async write(group, id, data) {
if (!group || !id || !data || typeof id !== 'string') {
throw new Error('Where are my args?');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`writing ${group}:${id}`);
let rsp = await this.ipc.request({o: OP.WRITE, g: group, k: id, d: data});
if (rsp) {
this.data.read(group).write(id, rsp);
}
return this.has(group, id);
}
/**
* Update data in the cache:
* - send an update to the master;
* - wait for a response with update status;
* - update the data in the local copy and return updated object in case of success, throw error otherwise.
*
* @param {String} group group key
* @param {String} id data key
* @param {Object} update data to store
* @return {Object} data if succeeded, null otherwise, throws in case of an error
*/
async update(group, id, update) {
if (!group || !id || !update || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`updating ${group}:${id}`);
let rsp = await this.ipc.request({o: OP.UPDATE, g: group, k: id, d: update}),
store = this.data.read(group);
if (rsp) {
store.update(id, rsp);
}
else {
store.remove(id);
}
return this.has(group, id);
}
/**
* Remove a record from cache and database.
*
* @param {String} group group key
* @param {String} id data key
* @return {Boolean} true if removed
*/
async remove(group, id) {
if (!group || !id || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`removing ${group}:${id}`);
await this.ipc.request({o: OP.WRITE, g: group, k: id, d: null});
let store = this.data.read(group);
if (store) {
store.remove(id);
}
return this.has(group, id) === undefined;
}
/**
* Remove a record from cache.
*
* @param {String} group group key
* @param {String} id data key
* @return {Boolean} true if removed
*/
async purge(group, id) {
if (!group || !id || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`purging ${group}:${id}`);
await this.ipc.request({o: OP.PURGE, g: group, k: id});
let store = this.data.read(group);
if (store) {
store.remove(id);
}
return this.has(group, id) === undefined;
}
/**
* Read a record from cache:
* - from local copy if exists;
* - send a read request to master otherwise.
*
* @param {String} group group key
* @param {String} id data key
* @return {Object} data if any, null otherwise
*/
async read(group, id) {
if (!group || !id) {
throw new Error('Where are my args?!');
}
let data = this.has(group, id);
if (data) {
return data;
}
else {
let rsp = await this.ipc.request({o: OP.READ, g: group, k: id});
if (rsp) {
let store = this.data.read(group);
if (!store) {
store = this.data.write(group, new DataStore(this.size));
}
store.write(id, rsp);
}
return this.has(group, id);
}
}
/**
* Check if local copy has data under the key.
*
* @param {String} group group key
* @param {String} id data key
* @return {Object} data if any, undefined otherwise
*/
has(group, id) {
if (!group) {
throw new Error('Where are my args?!');
}
let store = this.data.read(group);
if (id) {
return store && store.read(id) || undefined;
}
else {
return store;
}
}
/**
* Just a handy method which returns an object with partials with given group.
*
* @param {String} group group name
* @return {Object} object with all the {@code CacheWorker} methods without group
*/
cls(group) {
return {
read: this.read.bind(this, group),
write: this.write.bind(this, group),
update: this.update.bind(this, group),
remove: this.remove.bind(this, group),
purge: this.purge.bind(this, group),
has: this.has.bind(this, group),
iterate: f => {
let g = this.data.read(group);
if (g) {
g.iterate(f);
}
else {
log.e('no cache group %s to iterate on', group);
}
}
};
}
}
/**
* Cache instance for master process:
* - (1) listen for requests from workers;
* - (2) listen for updates from capped collection;
* - send (1) to (2) and vice versa;
* - call group operators to read / write / udpate data from db / whatever.
*/
class CacheMaster {
/**
* Constructor
*
* @param {Mongo} db database instance
* @param {Number} size max number of cache groups
*/
constructor(db, size = 100) {
this.data = new DataStore(Number.MAX_SAFE_INTEGER, Number.MAX_SAFE_INTEGER);
this.operators = {};
this.col = new StreamedCollection(db, CENTRAL, doc => {
log.d('collection doc %j', doc);
if (doc.o === OP.READ) {
return;
}
let store = this.data.read(doc.g);
if (!store) {
store = this.data.write(doc.g, new DataStore(size));
}
if (doc.o === OP.PURGE || doc.o === OP.REMOVE) {
store.write(doc.k, null);
}
else if (doc.o === OP.WRITE) {
store.write(doc.k, doc.d);
}
else if (doc.o === OP.READ) {
log.w('Reading from collection instruction shouldn\'t happen');
store.write(doc.k, doc.d);
}
else if (doc.o === OP.UPDATE) {
store.update(doc.k, doc.d);
}
else {
throw new Error('Bad op ' + doc.o + ': ' + JSON.stringify(doc));
}
this.ipc.send(0, doc);
});
this.ipc = new CentralMaster(CENTRAL, ({o, g, k, d}, reply, from) => {
log.d('handling %s: %j / %j / %j / %j', reply ? 'reply' : 'broadcast', o, g, k, d);
if (o === OP.INIT) {
let ret = {};
this.data.iterate((group, store) => {
ret[group] = {size: store.size, age: store.age};
});
return ret;
}
let store = this.data.read(g);
if (!store) {
throw new Error('No such store ' + g);
}
if (o === OP.PURGE) {
return this.purge(g, k, from);
}
else if (o === OP.READ) {
return this.read(g, k, from);
}
else if (o === OP.WRITE) {
return this.write(g, k, d, from);
}
else if (o === OP.UPDATE) {
return this.update(g, k, d, from);
}
else if (o === OP.REMOVE) {
return this.remove(g, k, from);
}
else {
throw new Error(`Illegal cache operaion: ${o}, ${k}, ${g}, ${d}`);
}
});
}
/**
* Attach to IPC & tailable cursor.
*
* @return {Promise} with cursor open result
*/
start() {
log.d('starting master');
this.ipc.attach();
return this.col.start().then(() => new Promise(res => setTimeout(() => {
log.d('started master');
res();
}, 10000)));
}
/**
* Stop cursor.
*/
stop() {
this.col.stop();
}
/**
* Register a set of operators for a particular cache group.
*
* @param {String} group group key
* @param {Function} options.init initializer - an "async () => [Object]" kind of function, preloads data to cache on startup
* @param {Function} options.read reader - an "async (key) => Object" kind of function, returns data to cache if any for the key supplied
* @param {Function} options.write writer - an "async (key, data) => Object" kind of function, persists the data cached if needed (must return the data persisted on success)
* @param {Function} options.update updater - an "async (key, update) => Object" kind of function, updates persisted data if needed
* @param {Function} options.remove remover - an "async (key) => Object" kind of function, removes persisted data if needed
* @param {int} age how long in ms to keep records in memory for the group
* @param {int} size how much records to keep in memory for the group
*/
init(group, {init, read, write, update, remove}, size = null, age = null) {
this.operators[group] = {init, read, write, update, remove};
if (!size && size !== 0) {
size = config.api && config.api.cache && config.api.cache[group] && config.api.cache[group].size !== undefined ? config.api.cache[group].size : Number.MAX_SAFE_INTEGER;
}
if (!age && age !== 0) {
age = config.api && config.api.cache && config.api.cache[group] && config.api.cache[group].age !== undefined ? config.api.cache[group].age : Number.MAX_SAFE_INTEGER;
}
this.data.write(group, new DataStore(size, age, k => {
this.ipc.send(0, {o: OP.PURGE, g: group, k});
}));
this.ipc.send(0, {o: OP.INIT, g: group, d: {size, age}});
init().then(arr => {
(arr || []).forEach(([k, d]) => {
this.data.read(group).write(k, d);
this.ipc.send(0, {o: OP.READ, g: group, k, d});
});
}, log.e.bind(log, 'Error during initialization of cache group %s', group));
}
/**
* Write data to the cache & db
*
* @param {String} group group key
* @param {String} id data key
* @param {Object} data data to store
* @param {int} from originating pid if any
* @return {Object} data if succeeded, null otherwise, throws in case of an error
*/
async write(group, id, data, from = 0) {
if (!group || !id || (data === undefined) || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`writing ${group}:${id}: %j`, data);
if (group in this.operators) {
return this.operators[group][data === null ? 'remove' : 'write'](id, data).then(rc => {
if (rc) {
if (!data) {
rc = null;
}
this.data.read(group)[data === null ? 'remove' : 'write'](id, rc);
return this.col.put(OP.WRITE, group, id, rc).then(() => {
this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: rc});
return data === null ? true : rc;
});
}
else {
return null;
}
});
}
else {
return null;
}
}
/**
* Update data in the cache
*
* @param {String} group group key
* @param {String} id data key
* @param {Object} update data to store
* @param {int} from originating pid if any
* @return {Object} data if succeeded, null otherwise, throws in case of an error
*/
async update(group, id, update, from = 0) {
if (!group || !id || !update || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`updating ${group}:${id} with %j`, update);
if (group in this.operators) {
return this.operators[group].update(id, update).then(() => {
this.data.read(group).update(id, update);
return this.col.put(OP.UPDATE, group, id, update).then(() => {
this.ipc.send(-from, {o: OP.UPDATE, g: group, k: id, d: update});
return update;
});
});
}
else {
return null;
}
}
/**
* Remove a record from cache and database.
*
* @param {String} group group key
* @param {String} id data key
* @param {int} from originating pid if any
* @return {Boolean} true if removed
*/
async remove(group, id, from) {
if (!group || !id || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`removing ${group}:${id}`);
if (group in this.operators) {
return this.operators[group].remove(id).then(rc => {
if (rc) {
this.data.read(group).remove(id);
return this.col.put(OP.WRITE, group, id, null).then(() => {
this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: null});
return true;
});
}
else {
return null;
}
});
}
else {
return null;
}
}
/**
* Remove a record from cache.
*
* @param {String} group group key
* @param {String} id data key
* @param {int} from originating pid if any
* @return {Boolean} true if removed
*/
async purge(group, id, from = 0) {
if (!group || !id || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
log.d(`purging ${group}:${id}`);
this.data.read(group).write(id, null);
this.ipc.send(-from, {o: OP.PURGE, g: group, k: id});
return this.col.put(OP.PURGE, group, id).then(() => {
return true;
});
}
/**
* Read a record from cache:
* - from local copy if exists;
* - send a read request to master otherwise.
*
* @param {String} group group key
* @param {String} id data key
* @param {int} from originating pid if any
* @return {Object} data if any, null otherwise
*/
async read(group, id, from = 0) {
if (!group || !id || typeof id !== 'string') {
throw new Error('Where are my args?!');
}
else if (!this.data.read(group)) {
throw new Error('No such cache group');
}
let store = this.data.read(group),
rc = store.read(id);
if (rc) {
return rc;
}
else if (group in this.operators) {
return this.operators[group].read(id).then(x => {
if (x) {
this.ipc.send(-from, {o: OP.READ, g: group, k: id, d: x});
store.write(id, x);
return x;
}
else {
return null;
}
});
}
else {
return null;
}
}
/**
* Check if local copy has data under the key.
*
* @param {String} group group key
* @param {String} id data key
* @return {Object} data if any, undefined otherwise
*/
has(group, id) {
if (!group) {
throw new Error('Where are my args?!');
}
let store = this.data.read(group);
if (id) {
return store && store.read(id) || undefined;
}
else {
return store;
}
}
/**
* Just a handy method which returns an object with partials with given group.
*
* @param {String} group group name
* @return {Object} object with all the {@code CacheWorker} methods without group
*/
cls(group) {
return {
read: this.read.bind(this, group),
write: this.write.bind(this, group),
update: this.update.bind(this, group),
remove: this.remove.bind(this, group),
purge: this.purge.bind(this, group),
has: this.has.bind(this, group),
iterate: f => {
let g = this.data.read(group);
if (g) {
g.iterate(f);
}
else {
log.e('no cache group %s to iterate on', group);
}
}
};
}
}
/**
* Ensure capped collection exists and return latest document in it to start streaming from.
*
* @param {Mongo} db db object
* @param {String} name collection name
* @param {Number} size collection size
* @return {Promise} resolves to array of [collection, _id of last record]
*/
function createCollection(db, name, size = 1e7) {
return new Promise((resolve, reject) => {
db.createCollection(name, {capped: true, size: size}, (e) => {
if (e && e.codeName !== "NamespaceExists") {
log.e(`Error while creating capped collection ${name}:`, e);
return reject(e);
}
let col = db.collection(name);
col.find().sort({_id: -1}).limit(1).toArray((err, arr) => {
if (err) {
log.e(`Error while looking for last record in ${name}:`, err);
return reject(err);
}
if (arr && arr.length) {
log.d('Last change id %s', arr[0]._id);
resolve([col, arr[0]._id]);
}
else {
col.insertOne({first: true}, (error, res) => {
if (error) {
log.e(`Error while looking for last record in ${name}:`, error);
return reject(e);
}
log.d('Inserted first change id %s', res.insertedId);
resolve([col, res.insertedId]);
});
}
});
});
});
}
/**
* Class encapsulating a capped collection watching a particular cache modifications.
*/
class StreamedCollection {
/**
* Constructor
* @param {Mongo} db db object
* @param {String} name collection name
* @param {Function} handler a function handling IPC requests
*/
constructor(db, name, handler) {
this.db = db;
this.name = name;
this.handler = handler;
this.inserts = [];
}
/**
* Start change stream
*/
async start() {
if (this.stream) {
log.w('Stream already started');
return;
}
log.i('Starting watcher stream in %d', process.pid);
try {
let [col, last] = await createCollection(this.db, this.name, 1e7);
this.col = col;
this.stream = col.find({_id: {$gt: last}}, {tailable: true, awaitData: true, noCursorTimeout: true, numberOfRetries: -1}).stream();
this.stream.on('data', doc => {
if (this.inserts.indexOf(doc._id.toString()) !== -1) {
return this.inserts.splice(this.inserts.indexOf(doc._id.toString()), 1);
}
log.d('new in the collection', doc);
if (doc.d !== undefined && doc.d !== null) {
try {
doc.d = JSON.parse(doc.d);
this.handler(doc);
}
catch (e) {
log.e(e);
}
}
else {
this.handler(doc);
}
});
this.stream.on('end', () => {
log.w('Stream ended');
this.stop();
});
this.stream.on('close', () => {
log.d('Stream closed');
this.stream = undefined;
setImmediate(() => {
this.start().catch(e => {
log.e('Cannot start watcher', e);
});
});
});
this.stream.on('error', error => {
log.e('Stream error', error);
this.stop();
});
}
catch (e) {
setTimeout(() => {
try {
this.start();
}
catch (ignored) {
// ignored
}
}, 1000);
}
}
/**
* Close change stream
*/
stop() {
if (this.stream) {
this.stream.close(e => {
this.stream = undefined;
if (e) {
log.e('Error while stopping stream', e);
}
log.d('Stream stopped');
});
}
}
/**
* Add a record to the collection
* @param {int} o operation type (see OP above)
* @param {String} g group name
* @param {String} k key - document key
* @param {Object} d data
* @return {Promise} resolves to the inserted change document
*/
put(o, g, k, d) {
let doc = {
_id: new this.db.ObjectID(),
o: o,
g: g,
k: k,
d: d ? JSON.stringify(d) : d
};
log.d('putting to the collection', d);
this.inserts.push(doc._id.toString());
return new Promise((res, rej) => this.col.insertOne(doc, err => {
if (err) {
this.inserts.splice(this.inserts.indexOf(doc._id.toString()), 1);
rej(err);
}
else {
res(doc);
}
}));
}
}
module.exports = {CacheMaster, CacheWorker};