utils/taskmanager.js

/**
* Module for handling possibly long running tasks
* @module api/utils/taskmanager
*/

/** @lends module:api/utils/taskmanager */
var taskmanager = {};
var common = require("./common.js");
var countlyConfig = require("../../frontend/express/config.js");
var countlyFs = require("./countlyFs.js");
var crypto = require("crypto");
var plugins = require('../../plugins/pluginManager.js');


const log = require('./log.js')('core:taskmanager');

/**
* Monitors DB query or some other potentially long task and switches to long task manager if it exceeds threshold
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {params} options.params - params object
* @param {number} options.threshold - amount of seconds to wait before switching to long running task
* @param {number} options.force - force to use taskmanager, ignoring threshold
* @param {string} options.type - type of data, as which module or plugin uses this data
* @param {string} options.meta - any information about the tast
* @param {string} options.name - provide user friendly task running condition string(Like, "Session (sc > 1024)" shows the report will filter by session count bigger than 1024)
* @param {string} options.report_name - name inputed by user create from report form
* @param {string} options.report_desc - report desc from report form
* @param {string} options.period_desc - target period report data from report form
* @param {string} options.name - provide user friendly task running condition string
* @param {string} options.view - browser side view hash prepended with job id to display result
* @param {string} options.app_id - id of the app for which data is meant for
* @param {function} options.processData - function to which to feed fetched data to post process it if needed, should accept err, data and callback to which to feed processed data
* @param {function} options.outputData - function to which to feed post processed data, if task did not exceed threshold
* @param {string} options.creator - the task creator
* @param {boolean} options.global - the task is private or global visit. 
* @param {boolean} options.autoRefresh - the task is will auto run periodically or not. 
* @param {number} options.r_hour - the task local hour of time to run, when autoRefresh is true.
* @param {boolean} options.forceCreateTask - force createTask with id supplied ( for import)
* @param {boolean} options.gridfs - store result in gridfs instead of MongoDB document
* @returns {function} standard nodejs callback function accepting error as first parameter and result as second one. This result is passed to processData function, if such is available.
* @example
* common.db.collection("data").findOne({_id:"test"}, taskmanager.longtask({
*   db:common.db, 
*   threshold:30, 
*   force:false,
*   app_id:"58b6d13bf1de9562e5a8029f",
*   params: params,
*   type:"funnels", 
*   meta: {},
*   name:"FunnelName",
*   view:"#/funnels/task/",
*   processData:function(err, res, callback){
*       if(!err)
*           callback(null, res);
*       else
*           callback(null, {});
*   }, outputData:function(err, data){
*       common.returnOutput(params, data);
*   }
* }));
*/
taskmanager.longtask = function(options) {
    options.db = options.db || common.db;
    var exceeds = false;
    var start = new Date().getTime();
    var timeout;

    /** switching to long task */
    function switchToLongTask() {
        timeout = null;
        exceeds = true;

        if (!options.request && options.params && options.params.qstring) {
            var json = options.params.qstring || {};
            json = JSON.parse(JSON.stringify(json));
            //we don't need to have task_id, it will be automatically applied
            delete json.task_id;
            //we want to get raw json data without jsonp
            delete json.callback;
            //delete jquery param to prevent caching
            delete json._;
            //delete api_key param
            delete json.api_key;

            options.request = {
                uri: (process.env.COUNTLY_CONFIG_PROTOCOL || "http") + "://" + (process.env.COUNTLY_CONFIG_HOSTNAME || "localhost") + (countlyConfig.path || "") + options.params.fullPath,
                method: 'POST',
                json: json
            };
        }

        if (!options.id) {
            if (options.params && options.params.member && options.params.member._id) {
                options.creator = options.params.member._id + "";
            }
            if (!options.app_id) {
                if (options.params) {
                    options.app_id = (options.params.app_id || (options.params.app && options.params.app._id) || options.params.qstring.app_id) + "";
                }
            }
            if (options.params && options.params.qstring && options.params.qstring.task_id) {
                options.id = options.params.qstring.task_id;
            }
            else {
                options.id = taskmanager.getId();
                options.start = start;
                taskmanager.createTask(options);
            }
        }
        // force createTask with id supplied ( for import)
        if (options.id && options.forceCreateTask) {
            if (options.params && options.params.member && options.params.member._id) {
                options.creator = options.params.member._id + "";
            }
            if (!options.app_id) {
                if (options.params) {
                    options.app_id = (options.params.app_id || options.params.app._id || options.params.qstring.app_id) + "";
                }
            }
            options.start = start;
            taskmanager.createTask(options);
        }
        options.outputData(null, {task_id: options.id});
    }
    if (options.force) {
        switchToLongTask();
    }
    else {
        timeout = setTimeout(switchToLongTask, options.threshold * 1000);
    }
    return function(err, res) {
        if (timeout) {
            clearTimeout(timeout);
            timeout = null;
        }
        if (typeof options.processData === "function") {
            options.processData(err, res, function(err1, res1) {
                if (!exceeds) {
                    options.outputData(err1, res1);
                }
                else {
                    if (err1) {
                        options.errored = true;

                        if (typeof err1 === "object") {
                            options.error = err1;
                        }
                        else {
                            options.errormsg = err1;
                        }
                    }
                    taskmanager.saveResult(options, res1);
                }
            });
        }
        else {
            if (!exceeds) {
                options.outputData(err, res);
            }
            else {
                if (err) {
                    options.errored = true;
                    options.errormsg = err;
                }
                taskmanager.saveResult(options, res);
            }
        }

    };
};

/**
* Generates ID for the task
* @returns {string} id to be used when saving the task
*/
taskmanager.getId = function() {
    return crypto.createHash('sha1').update(crypto.randomBytes(16).toString("hex") + "" + new Date().getTime()).digest('hex');
};

/**
* Create task with data, without result
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id to use for this task
* @param {string} options.type - type of data, as which module or plugin uses this data
* @param {string} options.meta - any information about the taskManager
* @param {string} options.name - provide user friendly task running condition string(Like, "Session (sc > 1024)" shows the report will filter by session count bigger than 1024)
* @param {string} options.report_name - name inputed by user create from report form
* @param {string} options.report_desc - report desc from report form
* @param {string} options.period_desc - target period report data from report form
* @param {string} options.view - browser side view hash prepended with job id to display result
* @param {object} options.request - api request to be able to rerun this task
* @param {string} options.app_id - id of the app for which data is for
* @param {number} options.start - start time of the task in miliseconds (by default now)
* @param {string} options.creator - the task creator
* @param {string} options.global - the task is private or global visit. 
* @param {boolean} options.autoRefresh - the task is will auto run periodically or not. 
* @param {number} options.r_hour - the task local hour of time to run, when autoRefresh is true.
* @param {boolean} options.manually_create - the task is create from form input
* @param {boolean} options.gridfs - store result in gridfs instead of MongoDB document
*  @param {function=} callback - callback when data is stored
*/
taskmanager.createTask = function(options, callback) {
    options.db = options.db || common.db;
    var update = {};
    update.ts = new Date().getTime();
    update.start = options.start || new Date().getTime();
    update.status = "running";
    update.type = options.type || "";
    update.meta = options.meta || "";
    update.name = options.name || null;
    update.view = options.view || "";
    update.request = JSON.stringify(options.request || {});
    update.app_id = options.app_id || "";
    update.creator = options.creator;
    update.global = options.global || false;
    update.r_hour = options.r_hour || 0;
    update.autoRefresh = options.autoRefresh || false;
    update.report_name = options.report_name || "";
    update.report_desc = options.report_desc || "";
    update.period_desc = options.period_desc || "";
    update.manually_create = options.manually_create || false;
    update.subtask_key = options.subtask_key || "";
    update.taskgroup = options.taskgroup || false;
    update.linked_to = options.linked_to;
    if (options.subtask && options.subtask !== "") {
        update.subtask = options.subtask;
        var updateSub = {$set: {}};
        updateSub.$set["subtasks." + options.id + ".status"] = "running";
        updateSub.$set["subtasks." + options.id + ".start"] = new Date().getTime();
        options.db.collection("long_tasks").update({_id: update.subtask}, updateSub, {'upsert': false}, function(err, res) {
            if (err) {
                callback(err, res);
            }
            else {
                options.db.collection("long_tasks").update({_id: options.id}, {$set: update}, {'upsert': true}, callback);
            }
        });
    }
    else {
        options.db.collection("long_tasks").update({_id: options.id}, {$set: update}, {'upsert': true}, callback);
        if (options.manually_create) {
            plugins.dispatch("/systemlogs", {params: options.params, action: "task_manager_task_created", data: JSON.stringify(update)});
        }
    }
};

var checkIfAllRulesMatch = function(rules, data) {
    var match = true;
    for (var key in rules) {
        if (data[key]) {
            if (rules[key] === data[key]) {
                continue;
            }
            else {
                if (typeof rules[key] === "object") {
                    if (!checkIfAllRulesMatch(rules[key], data[key])) {
                        return false;
                    }
                }
                else {
                    if (data[key].$in) {
                        if (data[key].$in.indexOf(rules[key]) === -1) {
                            return false;
                        }
                    }
                    else if (data[key].$nin) {
                        if (data[key].$nin.indexOf(rules[key]) !== -1) {
                            return false;
                        }
                    }
                    else {
                        return false;
                    }
                }
            }
        }
        else {
            return false;
        }
    }
    return match;
};

taskmanager.markReportsDirtyBasedOnRule = function(options, callback) {
    common.db.collection("long_tasks").find({
        autoRefresh: true,
    }).toArray(function(err, tasks) {
        var ids_to_mark_dirty = [];
        if (err) {
            log.e("Error while fetching tasks", err);
            if (callback && typeof callback === "function") {
                callback();
            }
            return;

        }
        tasks = tasks || [];
        for (var z = 0; z < tasks.length; z++) {
            try {
                var req = JSON.parse(tasks[z].request);
                if (checkIfAllRulesMatch(options.rules, req.json.queryObject)) {
                    ids_to_mark_dirty.push(tasks[z]._id);
                }
            }
            catch (e) {
                log.e(' got error while process task request parse', e);
            }

        }
        if (ids_to_mark_dirty.length > 0) {
            common.db.collection("long_tasks").updateMany({_id: {$in: ids_to_mark_dirty}}, {$set: {dirty: new Date().getTime()}}, function(err3) {
                if (err3) {
                    log.e("Error while updating reports", err3);
                }
                if (callback && typeof callback === "function") {
                    callback();
                }
            });
        }
        else {
            if (callback && typeof callback === "function") {
                callback();
            }
        }


    });

};
/**
* Save result from the task
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id to use for this task
* @param {boolean} options.errored - if errored then true
* @param {object} options.errormsg - data object for error msg  - can be also error msg (string)
* @param {string} options.errormsg.message - Optional. if exists check for message here. If not uses options.errormsg. 
* @param {object} data - result data of the task
* @param {boolean} options.gridfs - store result in gridfs instead of MongoDB document
* @param {function=} callback - callback when data is stored
*/
taskmanager.saveResult = function(options, data, callback) {
    options = options || {};
    options.db = options.db || common.db;
    var update = {
        end: new Date().getTime(),
        status: "completed",
        hasData: true
    };

    if (options.errored) {
        var message = "";
        if (options.errormsg) {
            message = options.errormsg;
        }
        else if (options.error && options.error.errormsg) {
            message = options.error.errormsg;
        }

        if (message.message) {
            message = message.message;
        }

        if ('error' in options) {
            if (('code' in options.error) && options.error.code === 11601) {
                update.status = "stopped";
            }
        }
        else {
            update.status = "errored";
        }

        update.errormsg = message;
    }
    else {
        update.errormsg = "";//rewrite any old error message
    }
    /** function to update subtasks
	**/
    function updateSubtasks() {
        var updateObj = {$set: {}};
        updateObj.$set["subtasks." + options.id + ".status"] = options.errored ? "errored" : "completed";
        updateObj.$set["subtasks." + options.id + ".hasData"] = true;
        updateObj.$set["subtasks." + options.id + ".end"] = new Date().getTime();
        if (update.errormsg) {
            updateObj.$set["subtasks." + options.id + ".errormsg"] = update.errormsg;
        }
        else {
            updateObj.$unset = {};
            updateObj.$unset["subtasks." + options.id + ".errormsg"] = "";
        }
        options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, function() {});
    }
    options.db.collection("long_tasks").findOne({_id: options.id}, function(error, task) {
        if (task && task.dirty && task.dirty < task.start) {
            update.dirty = false;
        }
        if (options.gridfs || (task && task.gridfs)) {
            //let's store it in gridfs
            update.data = {};
            update.gridfs = true;
            if (options.errored) {
                options.db.collection("long_tasks").update({_id: options.id}, {$set: update}, function(err) {
                    if (options.subtask && !err) {
                        var updateObj = {$set: {}};
                        updateObj.$set["subtasks." + options.id + ".status"] = options.errored ? "errored" : "completed";
                        updateObj.$set["subtasks." + options.id + ".hasData"] = true;
                        updateObj.$set["subtasks." + options.id + ".end"] = new Date().getTime();
                        if (update.errormsg) {
                            updateObj.$set["subtasks." + options.id + ".errormsg"] = update.errormsg;
                        }
                        else {
                            updateObj.$unset = {};
                            updateObj.$unset["subtasks." + options.id + ".errormsg"] = "";
                        }

                        options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, function() {});
                    }
                    data = JSON.stringify(data || {});
                    countlyFs.gridfs.saveData("task_results", options.id, data, {id: options.id}, function(err2, res2) {
                        if (callback) {
                            callback(err2, res2);
                        }
                    });

                });
            }
            else {
                if (!options.binary) {
                    data = JSON.stringify(data || {});
                    countlyFs.gridfs.saveData("task_results", options.id, data, {id: options.id}, function(err2, res2) {
                        options.db.collection("long_tasks").update({_id: options.id}, {$set: update}, function(err) {
                            if (options.subtask && !err) {
                                updateSubtasks();
                            }
                            if (callback) {
                                callback(err2, res2);
                            }
                        });
                    });
                }
                else {
                    countlyFs.gridfs.saveStream("task_results", options.id, data, {id: options.id}, function(err2, res2) {
                        options.db.collection("long_tasks").update({_id: options.id}, {$set: update}, function(err) {
                            if (options.subtask && !err) {
                                updateSubtasks();
                            }
                            if (callback) {
                                callback(err2, res2);
                            }
                        });
                    });
                }
            }
        }
        else {
            update.data = JSON.stringify(data || {});
            options.db.collection("long_tasks").update({_id: options.id}, {
                $set: update
            }, {'upsert': false}, function(err, res) {
                if (options.subtask && !err) {
                    var updateObj = {$set: {}};
                    updateObj.$set["subtasks." + options.id + ".status"] = options.errored ? "errored" : "completed";
                    updateObj.$set["subtasks." + options.id + ".hasData"] = true;
                    updateObj.$set["subtasks." + options.id + ".end"] = new Date().getTime();
                    if (update.errormsg) {
                        updateObj.$set["subtasks." + options.id + ".errormsg"] = update.errormsg;
                    }
                    else {
                        updateObj.$unset = {};
                        updateObj.$unset["subtasks." + options.id + ".errormsg"] = "";
                    }

                    options.db.collection("long_tasks").update({_id: options.subtask}, updateObj, {'upsert': false}, function() {});
                }

                if (callback) {
                    callback(err, res);
                }
            });
        }
    });
};

/**
* Give a name to task result or rename it
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id to use for this task
* @param {string} options.name - name of the task result, for later reference
* @param {object} data - not used at all. pass anything. left for compability
* @param {function=} callback - callback when data is stored
*/
taskmanager.nameResult = function(options, data, callback) {
    options.db = options.db || common.db;
    options.db.collection("long_tasks").update({_id: options.id}, {$set: {name: options.name}}, {'upsert': false}, callback);
};

/**
* Get specific task result
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id of the task result
* @param {funciton} callback - callback for the result
*/
taskmanager.getResult = function(options, callback) {
    options.db = options.db || common.db;
    if (options.only_info) {
        options.db.collection("long_tasks").findOne({_id: options.id}, {data: 0}, getResult(callback, options));
    }
    else {
        options.db.collection("long_tasks").findOne({_id: options.id}, getResult(callback, options));
    }
};

/**
* Get specific task result
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.query - query for the task result
* @param {funciton} callback - callback for the result
*/
taskmanager.getResultByQuery = function(options, callback) {
    options.db = options.db || common.db;
    options.db.collection("long_tasks").findOne(options.query, getResult(callback, options));
};

/**
* Edit specific task
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {object} options.id - ID of the target task 
* @param {string} options.data - data of the task want to modify
* @param {funciton} callback - callback for the result
*/
taskmanager.editTask = function(options, callback) {
    options.db = options.db || common.db;
    options.db.collection("long_tasks").findOne({_id: options.id}, function(err, data) {
        if (!err) {
            try {
                var req = JSON.parse(data.request);
                if (options.data.period_desc && options.data.period_desc !== "" && options.data.period_desc !== "false") {
                    req.json.period = options.data.period_desc === 'today' ? 'hour' : options.data.period_desc;
                    req.json.period_desc = options.data.period_desc;
                }
                options.data.request = JSON.stringify(req);
                options.db.collection("long_tasks").update({_id: options.id}, {$set: options.data}, function() {
                    callback(null, {before: data, after: options.data});
                });
            }
            catch (e) {
                log.e(' got error while process task request parse', e);
            }
        }
    });

};

/**
* Check task's status
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id of the task result
* @param {funciton} callback - callback for the result
*/
taskmanager.checkResult = function(options, callback) {
    options.db = options.db || common.db;
    if (Array.isArray(options.id)) {
        options.db.collection("long_tasks").find({_id: {$in: options.id}}, {
            _id: 1,
            status: 1,
            report_name: 1,
            type: 1,
            manually_create: 1,
            view: 1
        }).toArray(function(err, res) {
            if (err) {
                callback(err);
            }
            else {
                var statuses = {};
                options.id.forEach(function(id) {
                    statuses[id] = {_id: id, status: "deleted"}; // if it is present in res, will be overwritten.
                });
                res.forEach(function(item) {
                    statuses[item._id] = item;
                });
                callback(null, Object.keys(statuses).map(function(_id) {
                    var item = statuses[_id];
                    item.result = item.status;
                    delete item.status;
                    return item;
                }));
            }
        });
    }
    else {
        options.db.collection("long_tasks").findOne({_id: options.id}, {
            _id: 0,
            status: 1
        }, callback);
    }
};

/**
* Check if task like that is arleady running or not
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string=} options.id - id of the task result
* @param {string=} options.type - type of data, as which module or plugin uses this data
* @param {string=} options.meta - any information about the taskManager
* @param {params=} options.params - params object
* @param {object=} options.request - api request to be able to rerun this task
* @param {funciton} callback - callback for the result
*/
taskmanager.checkIfRunning = function(options, callback) {
    options = options || {};
    options.db = options.db || common.db;
    var query = {};
    if (options.id) {
        query._id = options.id;
    }
    if (options.type) {
        query.type = options.type;
    }
    if (options.meta) {
        query.meta = options.meta;
    }
    if (options.request) {
        query.request = options.request;
    }
    if (!query.request && options.params && options.params.qstring) {
        var json = options.params.qstring || {};
        json = JSON.parse(JSON.stringify(json));
        //make sure not to have same task already running
        if (json.task_id) {
            query._id = {$ne: json.task_id};
            delete json.task_id;
        }
        //we want to get raw json data without jsonp && api_key
        delete json.callback;
        delete json.api_key;

        //delete jquery param to prevent caching
        delete json._;
        query.request = {
            uri: (process.env.COUNTLY_CONFIG_PROTOCOL || "http") + "://" + (process.env.COUNTLY_CONFIG_HOSTNAME || "localhost") + (countlyConfig.path || "") + options.params.fullPath,
            method: 'POST',
            json: json
        };
    }
    if (query.request) {
        query.request = JSON.stringify(query.request);
    }
    query.$and = [
        {$or: [ { status: "running" }, { status: "rerunning" } ]},
        {$or: [{"global": {"$ne": false}}, {"creator": options.params.member._id + ""}]}
    ];
    options.db.collection("long_tasks").findOne(query, {status: 1}, function(err, res) {
        if (res && res.status && (res.status === "running" || res.status === "rerunning")) {
            callback(res._id);
        }
        else {
            callback(false);
        }
    });
};

/**
* Get multiple task results based on query
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {object} options.query - mongodb query
* @param {object} options.projection - mongodb projection
* @param {funciton} callback - callback for the result
*/
taskmanager.getResults = function(options, callback) {
    options.db = options.db || common.db;
    options.query = options.query || {};
    options.projection = options.projection || {data: 0};
    options.db.collection("long_tasks").find(options.query, options.projection).toArray(callback);
};

/**
* Get task counts based on query and grouped by app_id
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {object} options.query - mongodb query
* @param {funciton} callback - callback for the result
*/
taskmanager.getCounts = function(options, callback) {
    options.db = options.db || common.db;
    options.query = options.query || {};
    options.db.collection("long_tasks").aggregate([
        {$match: options.query},
        {
            $group:
            {
                _id: '$app_id',
                c: {$sum: 1}
            }
        }
    ], {allowDiskUse: true}, function(err, docs) {
        callback(err, docs);
    });
};

/**
* Get dataTable query results for tasks
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {object} options.query - mongodb query
* @param {object} options.projection - mongodb projection
* @param {object} options.page - mongodb offset & limit
* @param {object} options.keyword - search task "report_name" or "report_desc"
* @param {funciton} callback - callback for the result
*/
taskmanager.getTableQueryResult = async function(options, callback) {
    options.db = options.db || common.db;
    options.query = options.query || {};
    options.projection = options.projection || {data: 0};

    if (options.keyword) {
        options.query.$and = options.query.$and ? options.query.$and : [];
        const keywordRegx = new RegExp(options.keyword, 'i');
        options.query.$and.push({
            $or: [
                {"report_name": {$regex: keywordRegx}},
                {"report_desc": {$regex: keywordRegx}},
            ]
        });
    }
    let sortBy = {'end': -1};
    if (options.sort.sortBy) {
        const orderbyKey = { 0: 'report_name', 2: 'status', 3: 'type', 7: 'end', 8: 'start'};
        const keyName = orderbyKey[options.sort.sortBy];
        const seq = options.sort.sortSeq === 'desc' ? -1 : 1;
        sortBy = {[keyName]: seq};
    }

    let skip = 0;
    let limit = 10;
    try {
        skip = parseInt(options.page.skip, 10);
        limit = parseInt(options.page.limit, 10);
    }
    catch (e) {
        log.e(' got error while process task request parse', e);
    }
    const count = await options.db.collection("long_tasks").count(options.query);
    return options.db.collection("long_tasks").find(options.query, options.projection).sort(sortBy).skip(skip).limit(limit).toArray((err, list) => {
        //check if there are any reports connected to widgets and change view links to correct dashboards if there are any.
        if (!err) {
            var ids_drill = [];
            var ids_formulas = [];
            for (var z = 0; z < list.length; z++) {
                if (list[z].linked_to) {
                    list[z].dashboard_report = true;
                    if (list[z].linked_to._issuer === "wqm:drill") {
                        ids_drill.push(list[z]._id);
                    }

                    if (list[z].linked_to._issuer === "wqm:formulas") {
                        ids_formulas.push(list[z]._id);
                    }
                }
            }
            if (ids_drill.length > 0 || ids_formulas.length > 0) {
                var query = {};
                if (ids_formulas.length > 0 && ids_drill.length > 0) {
                    query.$or = [{"cmetrics": {$in: ids_formulas}}, {"drill_report": {$in: ids_drill}}];
                }
                else if (ids_formulas.length > 0) {
                    query.cmetrics = {$in: ids_formulas};
                }
                else if (ids_drill.length > 0) {
                    query.drill_report = {$in: ids_drill};
                }
                common.db.collection("widgets").aggregate([
                    {"$match": query},
                    {"$project": {"wid": {"$toString": "$_id"}, "drill_report": "$drill_report", "cmetrics": "$cmetrics"}},
                    {"$unionWith": {"coll": "dashboards", "pipeline": [{"$project": {"did": "$_id", "wid": "$widgets"}}, {"$unwind": "$wid"}, {"$project": {"did": "$did", "wid": {"$toString": "$wid"}}}]}},
                    {"$group": {"_id": "$wid", "did": {"$push": "$did"}, "drill_report": {"$push": "$drill_report"}, "cmetrics": {"$push": "$cmetrics"}}}
                ], function(err1, res) {
                    if (err1) {
                        log.e(err1);
                    }
                    if (res) {
                        var map_report = {};
                        for (var k = 0; k < res.length; k++) {
                            if (Array.isArray(res[k].did)) {
                                res[k].did = res[k].did[0];
                            }

                            if (Array.isArray(res[k].drill_report)) {
                                res[k].drill_report = res[k].drill_report[0];
                            }

                            if (Array.isArray(res[k].cmetrics)) {
                                res[k].cmetrics = res[k].cmetrics[0];
                            }
                            if (res[k].drill_report && res[k].drill_report.length > 0) {
                                for (var p = 0; p < res[k].drill_report.length; p++) {
                                    map_report[res[k].drill_report[p]] = res[k].did;
                                }
                            }
                            if (res[k].cmetrics && res[k].cmetrics.length > 0) {
                                for (var p2 = 0; p2 < res[k].cmetrics.length; p2++) {
                                    map_report[res[k].cmetrics[p2]] = res[k].did;
                                }
                            }
                        }
                        for (var kk = 0; kk < list.length; kk++) {
                            if (map_report[list[kk]._id]) {
                                list[kk].view = "#/custom/" + map_report[list[kk]._id];
                                list[kk].have_dashboard_widget = true;
                            }
                        }
                    }
                    callback(null, {list, count});
                });
            }
            else {
                callback(null, {list, count});
            }
        }
        else {
            callback(err, {});
        }
    });
};

/**
* Delete specific task result
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id of the task result
* @param {funciton} callback - callback for the result
*/
taskmanager.deleteResult = function(options, callback) {
    options.db = options.db || common.db;
    options.db.collection("long_tasks").findOne({_id: options.id}, function(err, task) {
        if (err || !task) {
            return callback(err);
        }
        if (task.gridfs) {
            countlyFs.gridfs.deleteFile("task_results", options.id, {id: options.id}, function() {});
        }

        // Delete additional results specific to task type
        taskmanager.deleteAdditionalResults(task, options, function() {
            // Continue with normal task deletion
        });
        options.db.collection("long_tasks").remove({_id: options.id}, function() {
            callback(null, task);
        });
        if (task.taskgroup) {
            options.db.collection("long_tasks").find({subtask: options.id}, {_id: 1}).toArray(function(err2, tasks) {
                if (tasks && tasks.length) {
                    for (var i = 0; i < tasks.length; i++) {
                        taskmanager.deleteResult({id: tasks[i]._id, db: options.db}, function() {});
                    }
                }
            });
        }
    });
};

/**
* Delete additional results specific to task type
* @param {object} task - the task object
* @param {object} options - options for the task
* @param {function} callback - callback for the result
*/
taskmanager.deleteAdditionalResults = function(task, options, callback) {
    if (task.type === "journey_engine") {
        const collectionName = "journey_task_data_" + options.id;
        options.db.collection(collectionName).drop(function(dropErr) {
            if (dropErr && dropErr.code !== 26) { // 26 = namespace not found, which is fine
                log.w("Failed to drop journey task data collection:", collectionName, dropErr);
            }
            else {
                log.d("Successfully dropped journey task data collection:", collectionName);
            }
        });

        // Also try to clean up the default collection if it exists and is empty
        // This is a safety measure for cases where the default collection might have been used
        options.db.collection("journey_task_data").countDocuments({}, function(countErr, count) {
            if (!countErr && count === 0) {
                options.db.collection("journey_task_data").drop(function(defaultDropErr) {
                    if (defaultDropErr && defaultDropErr.code !== 26) {
                        log.w("Failed to drop default journey task data collection:", defaultDropErr);
                    }
                    else if (!defaultDropErr) {
                        log.d("Successfully dropped empty default journey task data collection");
                    }
                });
            }
            callback();
        });
    }
    else {
        callback();
    }
};

/**
* Mark all running or rerunning tasks as errored
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {funciton} callback - callback for the result
*/
taskmanager.errorResults = function(options, callback) {
    options.db = options.db || common.db;
    options.db.collection("long_tasks").update({status: "running"}, {$set: {status: "errored", errormsg: "Task was killed during server restart."}}, {multi: true}, function() {
        options.db.collection("long_tasks").update({status: "rerunning"}, {$set: {status: "errored", errormsg: "Task was killed during server restart."}}, {multi: true}, function() {
            options.db.collection("long_tasks").find({status: "errored", subtask: {$exists: true}}).toArray(function(err, res) {
                if (res && res.length > 0) {
                    for (var k = 0; k < res.length; k++) {
                        var updateSub = {$set: {}};
                        updateSub.$set["subtasks." + res[k]._id + ".status"] = "errored";
                        updateSub.$set["subtasks." + res[k]._id + ".errormsg"] = "Task was killed during server restart.";
                        options.db.collection("long_tasks").update({_id: res[k].subtask}, updateSub, {}, function(/*err,res*/) {});
                    }
                }
                if (callback) {
                    callback(err, {});
                }
            });
        });
    });
};

/**
* Rerun specific task
* @param {object} options - options for the task
* @param {object} options.db - database connection
* @param {string} options.id - id of the task result
* @param {boolean} options.autoUpdate - if auto update is needed or not
* @param {boolean} options.dirty - if dirty is true then it means some part of report is wrong. It should be regenerated fully.
* @param {funciton} callback - callback for the result
*/
taskmanager.rerunTask = function(options, callback) {
    options.db = options.db || common.db;
    /**
    * Runs task
    * @param {object} options1 - options for the task
    * @param {object} options1.db - database connection
    * @param {string} options1.id - id of the task result
    * @param {object} reqData  -  request data
    * @param {funciton} callback1 - callback for the result
    */
    function runTask(options1, reqData, callback1) {
        options.db.collection("long_tasks").update({_id: options1.id}, {
            $set: {
                status: "rerunning",
                start: new Date().getTime()
            }
        }, function(err1) {
            if (err1) {
                log.e(err1);
            }
            reqData = reqData || {};
            log.d("calling request");
            log.d(JSON.stringify(reqData));
            reqData.url = reqData.uri;
            reqData.body = reqData.json;

            var params = {
                no_checksum: true,
                //providing data in request object
                'req': reqData,
                'APICallback': function(err, responseData, headers, returnCode) {
                    //sending response to client
                    responseData = responseData || {};
                    log.d(JSON.stringify(responseData));
                    log.d(err);
                    if (err) {
                        taskmanager.saveResult({
                            db: options1.db,
                            id: options1.id,
                            subtask: options1.subtask,
                            errormsg: err || responseData,
                            errored: true,
                            request: reqData
                        }, responseData);
                    }
                    else if (!responseData.task_id) {
                        log.d("returned result for this");
                        log.d(JSON.stringify(responseData));
                        var body = responseData;
                        if (returnCode === 200) {
                            taskmanager.saveResult({
                                db: options1.db,
                                id: options1.id,
                                subtask: options1.subtask,
                                request: reqData
                            }, body);
                        }
                        else {
                            if (body.result) {
                                body = body.result;
                            }
                            taskmanager.saveResult({
                                db: options1.db,
                                id: options1.id,
                                subtask: options1.subtask,
                                errormsg: err || body,
                                errored: true,
                                request: reqData
                            }, body);
                        }
                    }
                    if (options.autoUpdate) { //as it is auto from task, do callback only after finishing calculations.
                        callback1(null, "Success");
                    }
                }
            };
            if (common.processRequest) {
                common.processRequest(params);
            }
            if (!options.autoUpdate) {
                callback1(null, "Success");
            }
        });
    }

    var qq = {_id: options.id};
    if (options.additionalQuery) {
        qq = options.additionalQuery;
        qq._id = options.id;
    }
    log.d("Fetching from long_tasks to rerun: " + JSON.stringify(qq));
    options.db.collection("long_tasks").findOne(qq, function(err, res) {
        if (!err && res && res.request) {
            var reqData = {};
            try {
                reqData = JSON.parse(res.request);
            }
            catch (ex) {
                reqData = {};
            }
            if (reqData.uri) {
                reqData.json = reqData.json || {};
                reqData.json.task_id = options.id;
                reqData.strictSSL = false;
                if (reqData.json && reqData.json.period && Array.isArray(reqData.json.period)) {
                    reqData.json.period = JSON.stringify(reqData.json.period);
                }
                options.subtask = res.subtask;
                reqData.json.autoUpdate = ((!options.dirty) && (options.autoUpdate || false)); //If dirty  set autoUpdate to false
                if (!reqData.json.api_key && res.creator) {
                    options.db.collection("members").findOne({_id: common.db.ObjectID(res.creator)}, function(err1, member) {
                        if (member && member.api_key) {
                            reqData.json.api_key = member.api_key;
                            runTask(options, reqData, callback);
                        }
                        else if (res.global) {
                            //AD and other outer login users might not have their user documents
                            options.db.collection("members").findOne({global_admin: true}, function(err2, admin) {
                                if (admin && admin.api_key) {
                                    reqData.json.api_key = admin.api_key;
                                    runTask(options, reqData, callback);
                                }
                                else {
                                    callback(null, "No permission to run this task");
                                }
                            });
                        }
                        else {
                            callback(null, "No permission to run this task");
                        }
                    });

                }
                else {
                    runTask(options, reqData, callback);
                }
            }
            else {
                callback(null, "This task cannot be run again");
            }
        }
        else {
            callback(null, "This task cannot be run again");
        }
    });
};

/**
 *  Create a callback for getting result, including checking gridfs
 *  @param {function} callback - callback for the result
 *  @param {object} options - options object
 *  @returns {function} callback to use for db query
 */
function getResult(callback, options) {
    return function(err, data) {
        if (!err) {
            if (options && options.only_info) {
                callback(err, data);
            }
            else if (data && options && options.subtask_key && data.taskgroup === true) {
                taskmanager.getResultByQuery({db: options.db, query: {subtask: data._id, subtask_key: options.subtask_key}}, getResult(function(err2, subtask) {
                    if (!subtask) {
                        taskmanager.rerunTask({db: options.db, id: data._id}, function() {});
                    }
                    callback(err2, subtask);
                }));
            }
            else if (data && data.gridfs) {
                countlyFs.gridfs.getData("task_results", data._id + "", {id: data._id}, function(err2, largeData) {
                    if (!err2) {
                        data.data = largeData;
                        callback(null, data);
                    }
                    else {
                        callback(err2, data);
                    }
                });
            }
            else {
                callback(err, data);
            }
        }
        else {
            callback(err, data);
        }
    };
}
module.exports = taskmanager;