parts/data/cache.js

'use strict';

const log = require('../../utils/log.js')('cache:' + process.pid),
    // common = require('../../utils/common.js'),
    { CentralWorker, CentralMaster } = require('../jobs/ipc.js'),
    { Jsonable } = require('../../utils/models'),
    LRU = require('lru-cache'),
    config = require('../../config.js');

const CENTRAL = 'cache', OP = {INIT: 'i', PURGE: 'p', READ: 'r', WRITE: 'w', UPDATE: 'u'};
// new job: {o: 2, k: 'ObjectId', g: 'jobs', d: '{"name": "jobs:clean", "status": 0, ...}'}
// job update: {o: 3, k: 'ObjectId', g: 'jobs', d: '{"status": 1}'}
// job retreival: {o: 1, k: 'ObjectId', g: 'jobs'}
// job removal: {o: 2, k: 'ObjectId', g: 'jobs', d: null}
// job removal from cache: {o: 0, k: 'ObjectId', g: 'jobs'}

/**
 * Get value in nested objects
 * @param  {object} obj         - object to checl
 * @param  {string|array} is    - keys for nested value
 * @param  {any} value          - if provided acts as setter setting this value in nested object
 * @return {varies} returns value in provided key in nested object
 */
const dot = function(obj, is, value) {
    if (typeof is === 'string') {
        return dot(obj, is.split('.'), value);
    }
    else if (is.length === 1 && value !== undefined) {
        obj[is[0]] = value;
        return value;
    }
    else if (is.length === 0) {
        return obj;
    }
    else if (!obj) {
        return obj;
    }
    else {
        return dot(obj[is[0]], is.slice(1), value);
    }
};

/**
 * Fifo size-bound key-value store.
 */
class DataStore {
    /**
     * Constructor
     * @param  {int} size           max number of items to store
     * @param  {int} age            max life of an object in ms
     * @param  {Function} dispose   called whenever object is shifted from cache
     * @param  {Class} Cls          class for data objects
     */
    constructor(size, age, dispose, Cls) {
        this.size = size;
        this.age = age;
        this.lru = new LRU({max: size || 10000, ttl: age || Number.MAX_SAFE_INTEGER, dispose: dispose, noDisposeOnSet: true, updateAgeOnGet: true});
        if (Cls) {
            this.Cls = Cls;
            this.Clas = require('../../../' + Cls[0])[Cls[1]];
        }
    }

    /**
     * Length getter
     * @return {int} current store size
     */
    get length() {
        return this.lru.size;
    }

    /**
     * Read value by key
     *
     * @param  {String} id key
     * @return {Any}    value or undefined if no value under such key is stored
     */
    read(id) {
        if (id && id.toString) {
            return this.lru.get(id.toString());
        }
    }

    /**
     * Store value
     *
     * @param  {String} id              key
     * @param  {Object} data            value (pass null to delete data record)
     * @return {Object}                 returns the data supplied if it was stored, undefined otherwise
     */
    write(id, data) {
        if (data) {
            if (this.Clas && !(data instanceof this.Clas)) {
                data = new this.Clas(data);
            }
            this.lru.set(id.toString(), data);
            return data;
        }
        else if (!id) {
            this.lru.clear();
        }
        else if (this.read(id) !== null) {
            this.lru.delete(id.toString());
        }
    }

    /**
     * Update value stored under key with $set-like update
     *
     * @param  {String} id  key
     * @param  {Object} set flattened object of {attr1: 2, 'obj.attr2': 3} kind
     * @return {Boolean}    true if updated, false in case no object is stored under key
     */
    update(id, set) {
        let existing = this.read(id);
        if (!existing) {
            return false;
        }
        if (this.Clas) {
            existing.updateData(set);
        }
        else {
            for (let k in set) {
                dot(existing, k, set[k]);
            }
        }
        return true;
    }

    /**
     * Remove an item from the store
     * @param {String} id id of the item to remove
     */
    remove(id) {
        this.write(id, null);
    }

    /**
     * Iterate through all stored items
     * @param {Function} f function(id, item) to call with every item
     */
    iterate(f) {
        this.lru.forEach((v, k) => f(k, v));
    }
}

/**
 * Cache instance for a worker process:
 * - keeps local copy of a cache;
 * - listens for updates from master;
 * - notifies master about updates;
 * - loads data from master when local copy misses a particular key.
 */
class CacheWorker {
    /**
     * Constructor
     *
     * @param  {Number} size max number of cache groups
     */
    constructor(size = 100) {
        this.data = new DataStore(size);
        this.started = false;

        this.ipc = new CentralWorker(CENTRAL, (m, reply) => {
            let {o, k, g, d} = m || {};

            if (!g) {
                return;
            }
            log.d('handling %s: %j', reply ? 'reply' : 'broadcast', m);

            let store = this.data.read(g);

            if (o === OP.INIT) {
                this.data.write(g, new DataStore(d.size, d.age, undefined, d.Cls));
                return;
            }
            else if (!store) {
                log.d('Group store is not initialized');
                return;
            }

            if (o === OP.PURGE) {
                if (k) {
                    store.write(k, null);
                }
                else { // purgeAll
                    store.iterate(id => store.write(id, null));
                }
            }
            else if (o === OP.READ) {
                store.write(k, d);
            }
            else if (o === OP.WRITE) {
                store.write(k, d);
            }
            else if (o === OP.UPDATE) {
                store.update(k, d);
            }
            else {
                throw new Error(`Illegal cache operaion: ${o}, ${k}, ${g}, ${d}`);
            }

            // store.iterate((k, v) => {
            //     log.d('have %s: %j', k, v);
            // });
        });
    }

    /**
     * Start listening to IPC messages
     */
    async start() {
        if (this.started === true) {
            return;
        }

        if (this.started === false) {
            log.d('starting worker');
            this.started = new Promise((resolve, reject) => {
                let timeout = setTimeout(() => {
                    reject(new Error('Failed to start CacheWorker on timeout'));
                }, 10000);
                this.ipc.attach();
                this.ipc.request({o: OP.INIT}).then(ret => {
                    log.d('got init response: %j', ret);
                    Object.keys(ret).forEach(g => {
                        if (!this.data.read(g)) {
                            this.data.write(g, new DataStore(ret[g].size, ret[g].age, undefined, ret[g].Cls));
                            if (ret[g].data) {
                                log.d('got %d data objects in init response', Object.keys(ret[g].data).length);
                                for (let k in ret[g].data) {
                                    this.data.read(g).write(k, ret[g].data[k]);
                                }
                            }
                        }
                    });
                    this.started = true;
                    clearTimeout(timeout);
                    resolve();
                });
            });
        }

        await this.started;
    }

    /**
     * Stop worker
     */
    async stop() {
        this.ipc.detach();
    }

    /**
     * Write data to cache:
     * - send a write to the master;
     * - wait for a response with write status;
     * - write the data to local copy and return it in case of success, throw error otherwise.
     *
     * @param  {String} group group key
     * @param  {String} id    data key
     * @param  {Object} data  data to store
     * @return {Object}       data if succeeded, null otherwise, throws in case of an error
     */
    async write(group, id, data) {
        await this.start();

        if (!group || !id || !data || typeof id !== 'string') {
            throw new Error('Where are my args?');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`writing ${group}:${id}`);
        let rsp = await this.ipc.request({o: OP.WRITE, g: group, k: id, d: data instanceof Jsonable ? data.json : data});
        if (rsp) {
            this.data.read(group).write(id, rsp);
        }

        return this.has(group, id);
    }

    /**
     * Update data in the cache:
     * - send an update to the master;
     * - wait for a response with update status;
     * - update the data in the local copy and return updated object in case of success, throw error otherwise.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @param  {Object} update data to store
     * @return {Object}        data if succeeded, null otherwise, throws in case of an error
     */
    async update(group, id, update) {
        await this.start();

        if (!group || !id || !update || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`updating ${group}:${id}`);
        let rsp = await this.ipc.request({o: OP.UPDATE, g: group, k: id, d: update}),
            store = this.data.read(group);
        if (rsp) {
            store.update(id, rsp);
        }
        else {
            store.remove(id);

        }
        return this.has(group, id);
    }

    /**
     * Remove a record from cache.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @return {Boolean}       true if removed
     */
    async remove(group, id) {
        await this.start();

        if (!group || !id || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`removing ${group}:${id}`);
        await this.ipc.request({o: OP.WRITE, g: group, k: id, d: null});
        let store = this.data.read(group);
        if (store) {
            store.remove(id);
        }
        return this.has(group, id) === null;
    }

    /**
     * Remove a record from cache.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @return {Boolean}       true if removed
     */
    async purge(group, id) {
        await this.start();

        if (!group || !id || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`purging ${group}:${id}`);
        await this.ipc.request({o: OP.PURGE, g: group, k: id});
        let store = this.data.read(group);
        if (store) {
            store.remove(id);
        }
        return this.has(group, id) === null;
    }

    /**
     * Remove from cache all records for a given group.
     *
     * @param  {String} group  group key
     */
    async purgeAll(group) {
        await this.start();

        if (!group) {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`purging ${group}`);
        await this.ipc.request({o: OP.PURGE, g: group});
        let store = this.data.read(group);
        store.iterate(id => store.write(id, null));
    }

    /**
     * Read a record from cache:
     * - from local copy if exists;
     * - send a read request to master otherwise.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @return {Object}        data if any, null otherwise
     */
    async read(group, id) {
        await this.start();

        if (!group || !id) {
            throw new Error('Where are my args?!');
        }
        let data = this.has(group, id);
        if (data) {
            return data;
        }
        else {
            let rsp = await this.ipc.request({o: OP.READ, g: group, k: id});
            if (rsp) {
                let store = this.data.read(group);
                if (!store) {
                    throw new Error(`No store for a group ${group}?!`);
                    // store = this.data.write(group, new DataStore(this.size));
                }
                store.write(id, rsp);
            }
            return this.has(group, id);
        }
    }

    /**
     * Check if local copy has data under the key.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @return {Object}        data if any, null otherwise
     */
    has(group, id) {
        if (!group) {
            throw new Error('Where are my args?!');
        }
        let store = this.data.read(group);
        if (id) {
            return store && store.read(id) || null;
        }
        else {
            return store;
        }
    }

    /**
     * Just a handy method which returns an object with partials with given group.
     *
     * @param  {String} group group name
     * @return {Object}       object with all the {@code CacheWorker} methods without group
     */
    cls(group) {
        return {
            read: this.read.bind(this, group),
            write: this.write.bind(this, group),
            update: this.update.bind(this, group),
            remove: this.remove.bind(this, group),
            purge: this.purge.bind(this, group),
            purgeAll: this.purgeAll.bind(this, group),
            has: this.has.bind(this, group),
            iterate: f => {
                let g = this.data.read(group);
                if (g) {
                    g.iterate(f);
                }
                else {
                    log.e('no cache group %s to iterate on', group);
                }
            }
        };
    }
}

/**
 * Cache instance for master process:
 * - listen for requests from workers;
 * - call group operators to read/write/udpate
 */
class CacheMaster {
    /**
     * Constructor
     *
     * @param  {Number} size max number of cache groups
     */
    constructor(size = 100) {
        this.data = new DataStore(size, Number.MAX_SAFE_INTEGER);
        this.operators = {};
        this.initialized = {};
        this.delayed_messages = [];
        this.ipc = new CentralMaster(CENTRAL, ({o, g, k, d}, reply, from) => {
            log.d('handling %s: %j / %j / %j / %j', reply ? 'reply' : 'broadcast', o, g, k, d);

            if (o === OP.INIT) {
                this.initialized[from] = true;
                let ret = {};
                this.data.iterate((group, store) => {
                    let data = {};
                    store.iterate((key, obj) => {
                        data[key] = obj instanceof Jsonable ? obj.json : obj;
                    });
                    ret[group] = {size: store.size, age: store.age, Cls: store.Cls, data};
                });
                setImmediate(() => {
                    let remove = [];
                    this.delayed_messages.filter(arr => arr[0] === from).forEach(arr => {
                        remove.push(arr);
                        this.ipc.send(arr[0], arr[1]);
                    });
                    if (remove.length) {
                        log.d('sent %d delayed messages after %d worker\'s init', remove.length, from);
                        remove.forEach(m => {
                            const i = this.delayed_messages.indexOf(m);
                            if (i !== -1) {
                                this.delayed_messages.splice(i, 1);
                            }
                        });
                    }
                });
                return ret;
            }

            let store = this.data.read(g);
            if (!store) {
                log.d(`No store for group ${g}`);
                throw new Error('No such store ' + g);
            }

            if (o === OP.PURGE) {
                if (k) {
                    return this.purge(g, k, from);
                }
                else {
                    return this.purgeAll(g, from);
                }
            }
            else if (o === OP.READ) {
                return this.read(g, k, from);
            }
            else if (o === OP.WRITE) {
                return this.write(g, k, d, from);
            }
            else if (o === OP.UPDATE) {
                return this.update(g, k, d, from);
            }
            else if (o === OP.REMOVE) {
                return this.remove(g, k, from);
            }
            else {
                throw new Error(`Illegal cache operaion: ${o}, ${k}, ${g}, ${d}`);
            }
        });
    }

    /**
     * Attach to IPC
     *
     * @return {Promise<undefined>} void
     */
    async start() {
        this.ipc.attach();
        log.d('started master');
    }

    /**
     * Detaches IPC instance
     */
    stop() {
        this.ipc.detach();
    }

    /**
     * Register a set of operators for a particular cache group.
     *
     * @param  {String} group            group key
     * @param  {Function} options.init   initializer - an "async () => [Object]" kind of function, preloads data to cache on startup
     * @param  {string[]} options.Cls    class - an optional array of ["require path", "export name"] which resolves to a Jsonable subclass to construct instances
     * @param  {Function} options.read   reader - an "async (key) => Object" kind of function, returns data to cache if any for the key supplied
     * @param  {Function} options.write  writer - an "async (key, data) => Object" kind of function, persists the data cached if needed (must return the data persisted on success)
     * @param  {Function} options.update updater - an "async (key, update) => Object" kind of function, updates persisted data if needed
     * @param  {Function} options.remove remover - an "async (key) => Object" kind of function, removes persisted data if needed
     * @param  {int} age                 how long in ms to keep records in memory for the group
     * @param  {int} size                how much records to keep in memory for the group
     */
    init(group, {init, Cls, read, write, update, remove}, size = null, age = null) {
        this.operators[group] = {init, Cls, read, write, update, remove};

        if (!size && size !== 0) {
            size = config.api && config.api.cache && config.api.cache[group] && config.api.cache[group].size !== undefined ? config.api.cache[group].size : 10000;
        }

        if (!age && age !== 0) {
            age = config.api && config.api.cache && config.api.cache[group] && config.api.cache[group].age !== undefined ? config.api.cache[group].age : Number.MAX_SAFE_INTEGER;
        }

        this.data.write(group, new DataStore(size, age, k => {
            this.ipc.send(0, {o: OP.PURGE, g: group, k});
        }, Cls));

        this.ipc.send(0, {o: OP.INIT, g: group, d: {size, age, Cls}});

        init().then(arr => {
            (arr || []).forEach(([k, d]) => {
                this.data.read(group).write(k, d);
                const msg = {o: OP.READ, g: group, k, d: d && (d instanceof Jsonable) ? d.json : d};
                for (const pid in this.ipc.workers) {
                    if (this.initialized[pid]) {
                        this.ipc.send(parseInt(pid), msg);
                    }
                    else {
                        this.delayed_messages.push([parseInt(pid), msg]);
                    }
                }
            });
        }, log.e.bind(log, 'Error during initialization of cache group %s', group));
    }

    /**
     * Write data to the cache
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @param  {Object} data   data to store
     * @param  {int} from      originating pid if any
     * @return {Object}        data if succeeded, null otherwise, throws in case of an error
     */
    async write(group, id, data, from = 0) {
        if (!group || !id || (data === undefined) || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`writing ${group}:${id}: %j`, data);

        if (group in this.operators) {
            return this.operators[group][data === null ? 'remove' : 'write'](id, data).then(rc => {
                if (rc) {
                    if (data === null) {
                        rc = null;
                    }
                    if (rc instanceof Jsonable) {
                        rc = rc.json;
                    }
                    this.data.read(group)[data === null ? 'remove' : 'write'](id, rc);
                    this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: rc});
                    return data === null ? true : rc;
                }
                else {
                    return null;
                }
            });
        }
        else {
            return null;
        }
    }

    /**
     * Update data in the cache
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @param  {Object} update data to store
     * @param  {int} from      originating pid if any
     * @return {Object}        data if succeeded, null otherwise, throws in case of an error
     */
    async update(group, id, update, from = 0) {
        if (!group || !id || !update || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`updating ${group}:${id} with %j`, update);

        if (group in this.operators) {
            return this.operators[group].update(id, update).then(() => {
                this.data.read(group).update(id, update);
                this.ipc.send(-from, {o: OP.UPDATE, g: group, k: id, d: update});
                return update;
            });
        }
        else {
            return null;
        }
    }

    /**
     * Remove a record from cache.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @param  {int} from      originating pid if any
     * @return {Boolean}       true if removed
     */
    async remove(group, id, from) {
        if (!group || !id || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }

        log.d(`removing ${group}:${id}`);
        if (group in this.operators) {
            return this.operators[group].remove(id).then(rc => {
                if (rc) {
                    this.data.read(group).remove(id);
                    this.ipc.send(-from, {o: OP.WRITE, g: group, k: id, d: null});
                    return true;
                }
                else {
                    return null;
                }
            });
        }
        else {
            return null;
        }
    }

    /**
     * Remove a record from cache.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @param  {int} from      originating pid if any
     * @return {Boolean}       true if removed
     */
    async purge(group, id, from = 0) {
        if (!group || !id || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }
        log.d(`purging ${group}:${id}`);

        this.data.read(group).write(id, null);
        this.ipc.send(-from, {o: OP.PURGE, g: group, k: id});
        return true;
    }

    /**
     * Remove from cache all record for given group.
     *
     * @param  {String} group  group key
     * @param  {int} from      originating pid if any
     * @return {Boolean}       true if removed
     */
    async purgeAll(group, from = 0) {
        if (!group) {
            throw new Error('Where are my args?!');
        }
        log.d(`purging ${group}`);

        let grp = this.data.read(group);
        grp.iterate(k => grp.write(k, null));
        this.ipc.send(-from, {o: OP.PURGE, g: group});
        return true;
    }

    /**
     * Read a record from cache:
     * - from local copy if exists;
     * - send a read request to master otherwise.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @param  {int} from      originating pid if any
     * @return {Object}        data if any, null otherwise
     */
    async read(group, id, from = 0) {
        if (!group || !id || typeof id !== 'string') {
            throw new Error('Where are my args?!');
        }
        else if (!this.data.read(group)) {
            throw new Error('No such cache group');
        }

        let store = this.data.read(group),
            rc = store.read(id);
        if (rc) {
            return rc;
        }
        else if (group in this.operators) {
            return this.operators[group].read(id).then(x => {
                if (x) {
                    this.ipc.send(-from, {o: OP.READ, g: group, k: id, d: x instanceof Jsonable ? x.json : x});
                    store.write(id, x);
                    return x;
                }
                else {
                    return null;
                }
            });
        }
        else {
            return null;
        }
    }

    /**
     * Check if local copy has data under the key.
     *
     * @param  {String} group  group key
     * @param  {String} id     data key
     * @return {Object}        data if any, undefined otherwise
     */
    has(group, id) {
        if (!group) {
            throw new Error('Where are my args?!');
        }
        let store = this.data.read(group);
        if (id) {
            return store && store.read(id) || null;
        }
        else {
            return store;
        }
    }

    /**
     * Just a handy method which returns an object with partials with given group.
     *
     * @param  {String} group group name
     * @return {Object}       object with all the {@code CacheWorker} methods without group
     */
    cls(group) {
        return {
            read: this.read.bind(this, group),
            write: this.write.bind(this, group),
            update: this.update.bind(this, group),
            remove: this.remove.bind(this, group),
            purge: this.purge.bind(this, group),
            purgeAll: this.purgeAll.bind(this, group),
            has: this.has.bind(this, group),
            iterate: f => {
                let g = this.data.read(group);
                if (g) {
                    g.iterate(f);
                }
                else {
                    log.e('no cache group %s to iterate on', group);
                }
            }
        };
    }

}

/**
 * Data class for tests
 */
class TestDataClass extends Jsonable {
    /**
     * @returns {boolean} true
     */
    get isClassInstance() {
        return true;
    }
}


module.exports = {CacheMaster, CacheWorker, TestDataClass};