api.js

const http = require('http');
const https = require('https');
const fs = require('fs');
const cluster = require('cluster');
const formidable = require('formidable');
const os = require('os');
const countlyConfig = require('./config', 'dont-enclose');
const plugins = require('../plugins/pluginManager.js');
const jobs = require('./parts/jobs');
const log = require('./utils/log.js')('core:api');
const common = require('./utils/common.js');
const {processRequest} = require('./utils/requestProcessor');
const frontendConfig = require('../frontend/express/config.js');
const {CacheMaster, CacheWorker} = require('./parts/data/cache.js');
const {WriteBatcher, ReadBatcher, InsertBatcher} = require('./parts/data/batcher.js');
const pack = require('../package.json');
const versionInfo = require('../frontend/express/version.info.js');
const moment = require("moment");
const tracker = require('./parts/mgmt/tracker.js');

var t = ["countly:", "api"];
common.processRequest = processRequest;

if (cluster.isMaster) {
    console.log("Starting Countly", "version", versionInfo.version, "package", pack.version);
    if (!common.checkDatabaseConfigMatch(countlyConfig.mongodb, frontendConfig.mongodb)) {
        log.w('API AND FRONTEND DATABASE CONFIGS ARE DIFFERENT');
    }
    t.push("master");
    t.push("node");
    t.push(process.argv[1]);
}
else {
    t.push("worker");
    t.push("node");
}

// Finaly set the visible title
process.title = t.join(' ');

plugins.connectToAllDatabases().then(function() {
    plugins.loadConfigs(common.db, function() {
        tracker.enable();
    });
    common.writeBatcher = new WriteBatcher(common.db);
    common.readBatcher = new ReadBatcher(common.db);
    common.insertBatcher = new InsertBatcher(common.db);
    if (common.drillDb) {
        common.drillReadBatcher = new ReadBatcher(common.drillDb);
    }

    let workers = [];

    /**
    * Set Max Sockets
    */
    http.globalAgent.maxSockets = countlyConfig.api.max_sockets || 1024;

    /**
    * Set Plugins APIs Config
    */
    plugins.setConfigs("api", {
        domain: "",
        safe: false,
        session_duration_limit: 86400,
        country_data: true,
        city_data: true,
        event_limit: 500,
        event_segmentation_limit: 100,
        event_segmentation_value_limit: 1000,
        array_list_limit: 10,
        metric_limit: 1000,
        sync_plugins: false,
        session_cooldown: 15,
        request_threshold: 30,
        total_users: true,
        export_limit: 10000,
        prevent_duplicate_requests: true,
        metric_changes: true,
        offline_mode: false,
        reports_regenerate_interval: 3600,
        send_test_email: "",
        //data_retention_period: 0,
        batch_processing: true,
        //batch_on_master: false,
        batch_period: 10,
        batch_read_processing: true,
        //batch_read_on_master: false,
        batch_read_ttl: 600,
        batch_read_period: 60,
        user_merge_paralel: 1,
        trim_trailing_ending_spaces: false
    });

    /**
    * Set Plugins APPs Config
    */
    plugins.setConfigs("apps", {
        country: "TR",
        timezone: "Europe/Istanbul",
        category: "6"
    });

    /**
    * Set Plugins Security Config
    */
    plugins.setConfigs("security", {
        login_tries: 3,
        login_wait: 5 * 60,
        password_min: 8,
        password_char: true,
        password_number: true,
        password_symbol: true,
        password_expiration: 0,
        password_rotation: 3,
        password_autocomplete: true,
        robotstxt: "User-agent: *\nDisallow: /",
        dashboard_additional_headers: "X-Frame-Options:deny\nX-XSS-Protection:1; mode=block\nStrict-Transport-Security:max-age=31536000; includeSubDomains; preload\nX-Content-Type-Options: nosniff",
        api_additional_headers: "X-Frame-Options:deny\nX-XSS-Protection:1; mode=block\nStrict-Transport-Security:max-age=31536000; includeSubDomains; preload\nAccess-Control-Allow-Origin:*",
        dashboard_rate_limit_window: 60,
        dashboard_rate_limit_requests: 500,
        proxy_hostname: "",
        proxy_port: "",
        proxy_username: "",
        proxy_password: "",
        proxy_type: "https"
    });

    /**
    * Set Plugins Logs Config
    */
    plugins.setConfigs('logs', {
        debug: (countlyConfig.logging && countlyConfig.logging.debug) ? countlyConfig.logging.debug.join(', ') : '',
        info: (countlyConfig.logging && countlyConfig.logging.info) ? countlyConfig.logging.info.join(', ') : '',
        warn: (countlyConfig.logging && countlyConfig.logging.warn) ? countlyConfig.logging.warn.join(', ') : '',
        error: (countlyConfig.logging && countlyConfig.logging.error) ? countlyConfig.logging.error.join(', ') : '',
        default: (countlyConfig.logging && countlyConfig.logging.default) ? countlyConfig.logging.default : 'warn',
    }, undefined, () => {
        const cfg = plugins.getConfig('logs'), msg = {
            cmd: 'log',
            config: cfg
        };
        if (process.send) {
            process.send(msg);
        }
        require('./utils/log.js').ipcHandler(msg);
    });

    /**
    * Set tracking config
    */
    plugins.setConfigs("tracking", {
        self_tracking_app: "",
        self_tracking_url: "",
        self_tracking_app_key: "",
        self_tracking_id_policy: "_id",
        self_tracking_sessions: true,
        self_tracking_events: true,
        self_tracking_views: true,
        self_tracking_feedback: true,
        self_tracking_user_details: true,
        server_sessions: true,
        server_events: true,
        server_crashes: true,
        server_views: true,
        server_feedback: true,
        server_user_details: true,
        /*user_sessions: true,
        user_events: true,
        user_crashes: true,
        user_views: true,
        user_feedback: true,
        user_details: true*/
    });

    /*plugins.setUserConfigs("tracking", {
        user_sessions: false,
        user_events: false,
        user_crashes: false,
        user_views: false,
        user_feedback: false
    });*/

    /**
    * Initialize Plugins
    */
    plugins.init();

    /**
    *  Trying to gracefully handle the batch state
    *  @param {number} code - error code
    */
    async function storeBatchedData(code) {
        try {
            await common.writeBatcher.flushAll();
            await common.insertBatcher.flushAll();
            console.log("Successfully stored batch state");
        }
        catch (ex) {
            console.log("Could not store batch state", ex);
        }
        process.exit(typeof code === "number" ? code : 1);
    }

    /**
    *  Handle before exit for gracefull close
    */
    process.on('beforeExit', (code) => {
        console.log('Received exit, trying to save batch state: ', code);
        storeBatchedData(code);
    });

    /**
    *  Handle exit events for gracefull close
    */
    ['SIGHUP', 'SIGINT', 'SIGQUIT', 'SIGILL', 'SIGTRAP', 'SIGABRT',
        'SIGBUS', 'SIGFPE', 'SIGSEGV', 'SIGTERM',
    ].forEach(function(sig) {
        process.on(sig, async function() {
            storeBatchedData(sig);
            console.log('Got signal: ' + sig);
        });
    });

    /**
    * Uncaught Exception Handler
    */
    process.on('uncaughtException', (err) => {
        console.log('Caught exception: %j', err, err.stack);
        if (log && log.e) {
            log.e('Logging caught exception');
        }
        console.trace();
        storeBatchedData(1);
    });

    /**
    * Unhandled Rejection Handler
    */
    process.on('unhandledRejection', (reason, p) => {
        console.log('Unhandled rejection for %j with reason %j stack ', p, reason, reason ? reason.stack : undefined);
        if (log && log.e) {
            log.e('Logging unhandled rejection');
        }
        console.trace();
    });

    /**
    * Pass To Master
    * @param {cluster.Worker} worker - worker thatw as spawned by master
    */
    const passToMaster = (worker) => {
        worker.on('message', (msg) => {
            if (msg.cmd === 'log') {
                workers.forEach((w) => {
                    if (w !== worker) {
                        w.send({
                            cmd: 'log',
                            config: msg.config
                        });
                    }
                });
                require('./utils/log.js').ipcHandler(msg);
            }
            else if (msg.cmd === "checkPlugins") {
                plugins.checkPluginsMaster();
            }
            else if (msg.cmd === "startPlugins") {
                plugins.startSyncing();
            }
            else if (msg.cmd === "endPlugins") {
                plugins.stopSyncing();
            }
            else if (msg.cmd === "batch_insert") {
                const {collection, doc, db} = msg.data;
                common.insertBatcher.insert(collection, doc, db);
            }
            else if (msg.cmd === "batch_write") {
                const {collection, id, operation, db} = msg.data;
                common.writeBatcher.add(collection, id, operation, db);
            }
            else if (msg.cmd === "batch_read") {
                const {collection, query, projection, multi, msgId} = msg.data;
                common.readBatcher.get(collection, query, projection, multi).then((data) => {
                    worker.send({ cmd: "batch_read", data: {msgId, data} });
                })
                    .catch((err) => {
                        worker.send({ cmd: "batch_read", data: {msgId, err} });
                    });
            }
            else if (msg.cmd === "batch_invalidate") {
                const {collection, query, projection, multi} = msg.data;
                common.readBatcher.invalidate(collection, query, projection, multi);
            }
            else if (msg.cmd === "dispatchMaster" && msg.event) {
                plugins.dispatch(msg.event, msg.data);
            }
            else if (msg.cmd === "dispatch" && msg.event) {
                workers.forEach((w) => {
                    w.send(msg);
                });
            }
        });
    };

    if (cluster.isMaster) {
        plugins.installMissingPlugins(common.db);
        common.runners = require('./parts/jobs/runner');
        common.cache = new CacheMaster();
        common.cache.start().then(() => {
            setImmediate(() => {
                plugins.dispatch('/cache/init', {});
            });
        }, e => {
            console.log(e);
            process.exit(1);
        });

        const workerCount = (countlyConfig.api.workers)
            ? countlyConfig.api.workers
            : os.cpus().length;

        for (let i = 0; i < workerCount; i++) {
            // there's no way to define inspector port of a worker in the code. So if we don't
            // pick a unique port for each worker, they conflict with each other.
            let nodeOptions = {};
            if (countlyConfig?.symlinked !== true) { // countlyConfig.symlinked is passed when running in a symlinked setup
                const inspectorPort = i + 1 + (common?.config?.masterInspectorPort || 9229);
                nodeOptions = { NODE_OPTIONS: "--inspect-port=" + inspectorPort };
            }
            const worker = cluster.fork(nodeOptions);
            workers.push(worker);
        }

        workers.forEach(passToMaster);

        cluster.on('exit', (worker) => {
            workers = workers.filter((w) => {
                return w !== worker;
            });
            const newWorker = cluster.fork();
            workers.push(newWorker);
            passToMaster(newWorker);
        });

        plugins.dispatch("/master", {});

        // Allow configs to load & scanner to find all jobs classes
        setTimeout(() => {
            jobs.job('api:topEvents').replace().schedule('at 00:01 am ' + 'every 1 day');
            jobs.job('api:ping').replace().schedule('at 00:01 am ' + 'every 1 day');
            jobs.job('api:clear').replace().schedule('every 1 day');
            jobs.job('api:clearTokens').replace().schedule('every 1 day');
            jobs.job('api:clearAutoTasks').replace().schedule('every 1 day');
            jobs.job('api:task').replace().schedule('every 5 minutes');
            jobs.job('api:userMerge').replace().schedule('every 10 minutes');
            jobs.job("api:ttlCleanup").replace().schedule("every 1 minute");
            //jobs.job('api:appExpire').replace().schedule('every 1 day');
        }, 10000);

        //Record as restarted

        var utcMoment = moment.utc();

        var incObj = {};
        incObj.r = 1;
        incObj[`d.${utcMoment.format("D")}.${utcMoment.format("H")}.r`] = 1;
        common.db.collection("diagnostic").updateOne({"_id": "no-segment_" + utcMoment.format("YYYY:M")}, {"$set": {"m": utcMoment.format("YYYY:M")}, "$inc": incObj}, {"upsert": true}, function(err) {
            if (err) {
                log.e(err);
            }
        });
    }
    else {
        console.log("Starting worker", process.pid, "parent:", process.ppid);
        const taskManager = require('./utils/taskmanager.js');

        common.cache = new CacheWorker();
        common.cache.start();

        //since process restarted mark running tasks as errored
        taskManager.errorResults({db: common.db});

        process.on('message', common.log.ipcHandler);

        process.on('message', (msg) => {
            if (msg.cmd === 'log') {
                common.log.ipcHandler(msg);
            }
            else if (msg.cmd === "dispatch" && msg.event) {
                plugins.dispatch(msg.event, msg.data || {});
            }
        });

        process.on('exit', () => {
            console.log('Exiting due to master exited');
        });

        plugins.dispatch("/worker", {common: common});

        const serverOptions = {
            port: common.config.api.port,
            host: common.config.api.host || ''
        };

        let server;
        if (common.config.api.ssl && common.config.api.ssl.enabled) {
            const sslOptions = {
                key: fs.readFileSync(common.config.api.ssl.key),
                cert: fs.readFileSync(common.config.api.ssl.cert)
            };
            if (common.config.api.ssl.ca) {
                sslOptions.ca = fs.readFileSync(common.config.api.ssl.ca);
            }
            server = https.createServer(sslOptions, handleRequest);
        }
        else {
            server = http.createServer(handleRequest);
        }

        server.listen(serverOptions.port, serverOptions.host).timeout = common.config.api.timeout || 120000;
    }
});

/**
 * Handle incoming HTTP/HTTPS requests
 * @param {http.IncomingMessage} req - The request object
 * @param {http.ServerResponse} res - The response object
 */
function handleRequest(req, res) {
    const params = {
        qstring: {},
        res: res,
        req: req
    };

    if (req.method.toLowerCase() === 'post') {
        const formidableOptions = {};
        if (countlyConfig.api.maxUploadFileSize) {
            formidableOptions.maxFileSize = countlyConfig.api.maxUploadFileSize;
        }

        const form = new formidable.IncomingForm(formidableOptions);
        if (/crash_symbols\/(add_symbol|upload_symbol)/.test(req.url)) {
            req.body = [];
            req.on('data', (data) => {
                req.body.push(data);
            });
        }
        else {
            req.body = '';
            req.on('data', (data) => {
                req.body += data;
            });
        }

        let multiFormData = false;
        // Check if we have 'multipart/form-data'
        if (req.headers['content-type']?.startsWith('multipart/form-data')) {
            multiFormData = true;
        }

        form.parse(req, (err, fields, files) => {
            //handle bakcwards compatability with formiddble v1
            for (let i in files) {
                if (files[i].filepath) {
                    files[i].path = files[i].filepath;
                }
                if (files[i].mimetype) {
                    files[i].type = files[i].mimetype;
                }
                if (files[i].originalFilename) {
                    files[i].name = files[i].originalFilename;
                }
            }
            params.files = files;
            if (multiFormData) {
                let formDataUrl = [];
                for (const i in fields) {
                    params.qstring[i] = fields[i];
                    formDataUrl.push(`${i}=${fields[i]}`);
                }
                params.formDataUrl = formDataUrl.join('&');
            }
            else {
                for (const i in fields) {
                    params.qstring[i] = fields[i];
                }
            }
            if (!params.apiPath) {
                processRequest(params);
            }
        });
    }
    else if (req.method.toLowerCase() === 'options') {
        const headers = {};
        headers["Access-Control-Allow-Origin"] = "*";
        headers["Access-Control-Allow-Methods"] = "POST, GET, OPTIONS";
        headers["Access-Control-Allow-Headers"] = "countly-token, Content-Type";
        res.writeHead(200, headers);
        res.end();
    }
    //attempt process GET request
    else if (req.method.toLowerCase() === 'get') {
        processRequest(params);
    }
    else {
        common.returnMessage(params, 405, "Method not allowed");
    }
}