/**
* This module processes events for aggregated data
* @module "api/parts/data/events"
*/
/** @lends module:api/parts/data/events */
var countlyEvents = {},
common = require('./../../utils/common.js'),
async = require('async'),
crypto = require('crypto'),
Promise = require("bluebird"),
plugins = require('../../../plugins/pluginManager.js');
/**
* Process JSON decoded events data from request
* @param {params} params - params object
* @returns {Promise} resolved when procesing finished
**/
countlyEvents.processEvents = function(params) {
return new Promise(function(resolve) {
var forbiddenSegValues = [];
for (let i = 1; i < 32; i++) {
forbiddenSegValues.push(i + "");
}
common.readBatcher.getOne("events", {'_id': params.app_id}, {
list: 1,
segments: 1,
omitted_segments: 1,
whitelisted_segments: 1
}, (err, eventColl) => {
var appEvents = [],
appSegments = {},
metaToFetch = {},
omitted_segments = {},
whitelisted_segments = {},
pluginsGetConfig = plugins.getConfig("api", params.app && params.app.plugins, true);
if (!err && eventColl) {
if (eventColl.list) {
appEvents = eventColl.list;
}
if (eventColl.segments) {
appSegments = eventColl.segments;
}
if (eventColl.omitted_segments) {
omitted_segments = eventColl.omitted_segments;
}
if (eventColl.whitelisted_segments) {
whitelisted_segments = eventColl.whitelisted_segments;
}
}
for (let i = 0; i < params.qstring.events.length; i++) {
if (typeof params.qstring.events[i].key !== 'string') {
try {
params.qstring.events[i].key = JSON.stringify(params.qstring.events[i].key);
}
catch (error) {
params.qstring.events[i].key += "";
}
}
var currEvent = params.qstring.events[i],
shortEventName = "",
eventCollectionName = "";
if (!currEvent.segmentation) {
continue;
}
// Key fields is required
if (!currEvent.key || (currEvent.key.indexOf('[CLY]_') === 0 && plugins.internalEvents.indexOf(currEvent.key) === -1)) {
continue;
}
if (currEvent.count && common.isNumber(currEvent.count)) {
currEvent.count = parseInt(currEvent.count, 10);
}
else {
currEvent.count = 1;
}
if (pluginsGetConfig.event_limit &&
appEvents.length >= pluginsGetConfig.event_limit &&
appEvents.indexOf(currEvent.key) === -1) {
continue;
}
shortEventName = common.fixEventKey(currEvent.key);
if (!shortEventName) {
continue;
}
eventCollectionName = crypto.createHash('sha1').update(shortEventName + params.app_id).digest('hex');
if (currEvent.segmentation) {
for (var segKey in currEvent.segmentation) {
//check if segment should be ommited
if (plugins.internalOmitSegments[currEvent.key] && Array.isArray(plugins.internalOmitSegments[currEvent.key]) && plugins.internalOmitSegments[currEvent.key].indexOf(segKey) !== -1) {
continue;
}
//check if segment should be ommited
if (omitted_segments[currEvent.key] && Array.isArray(omitted_segments[currEvent.key]) && omitted_segments[currEvent.key].indexOf(segKey) !== -1) {
continue;
}
//check if whitelisted is set and this one not in whitelist
if (whitelisted_segments[currEvent.key] && Array.isArray(whitelisted_segments[currEvent.key]) && whitelisted_segments[currEvent.key].indexOf(segKey) === -1) {
continue;
}
if (pluginsGetConfig.event_segmentation_limit &&
appSegments[currEvent.key] &&
appSegments[currEvent.key].indexOf(segKey) === -1 &&
appSegments[currEvent.key].length >= pluginsGetConfig.event_segmentation_limit) {
continue;
}
var tmpSegVal;
var myValues = [];
if (Array.isArray(currEvent.segmentation[segKey])) {
currEvent.segmentation[segKey] = currEvent.segmentation[segKey].splice(0, (pluginsGetConfig.array_list_limit || 10));
//myValues = currEvent.segmentation[segKey];
myValues = []; //ignore array values.
}
else {
myValues = [currEvent.segmentation[segKey]];
}
for (var z = 0; z < myValues.length; z++) {
try {
tmpSegVal = myValues[z] + "";
tmpSegVal = tmpSegVal.replace(/^\$+/, "").replace(/\./g, ":");
tmpSegVal = common.encodeCharacters(tmpSegVal);
if (forbiddenSegValues.indexOf(tmpSegVal) !== -1) {
tmpSegVal = "[CLY]" + tmpSegVal;
}
var postfix = common.crypto.createHash("md5").update(tmpSegVal).digest('base64')[0];
metaToFetch[eventCollectionName + "no-segment_" + common.getDateIds(params).zero + "_" + postfix] = {
coll: eventCollectionName,
id: "no-segment_" + common.getDateIds(params).zero + "_" + postfix,
app_id: params.app_id
};
}
catch (ex) {
console.log("Incorrect segment value", params.app_id, currEvent.key, "segment", segKey, ex);
delete currEvent.segmentation[segKey];
tmpSegVal = "";
}
}
}
}
}
async.map(Object.keys(metaToFetch), fetchEventMeta, function(err2, eventMetaDocs) {
var appSgValues = {};
for (let i = 0; i < eventMetaDocs.length; i++) {
if (eventMetaDocs[i].coll) {
if (eventMetaDocs[i].meta_v2) {
if (!appSgValues[eventMetaDocs[i].coll]) {
appSgValues[eventMetaDocs[i].coll] = {};
}
if (!appSgValues[eventMetaDocs[i].coll][eventMetaDocs[i]._id]) {
appSgValues[eventMetaDocs[i].coll][eventMetaDocs[i]._id] = {};
}
for (var segment in eventMetaDocs[i].meta_v2) {
appSgValues[eventMetaDocs[i].coll][eventMetaDocs[i]._id][segment] = Object.keys(eventMetaDocs[i].meta_v2[segment]);
}
}
}
}
processEvents(appEvents, appSegments, appSgValues, params, omitted_segments, whitelisted_segments, resolve);
});
/**
* Fetch event meta
* @param {string} id - id to of event to fetchEventMeta
* @param {function} callback - for result
**/
function fetchEventMeta(id, callback) {
common.readBatcher.getOne("events_data", {'_id': metaToFetch[id].app_id + "_" + metaToFetch[id].coll + "_" + metaToFetch[id].id}, {meta_v2: 1}, (err2, eventMetaDoc) => {
var retObj = eventMetaDoc || {};
retObj.coll = metaToFetch[id].coll;
callback(null, retObj);
});
}
});
});
};
/**
* Process events from params
* @param {array} appEvents - aray with existing event keys
* @param {object} appSegments - object with event key as key, and segments as array value
* @param {object} appSgValues - object in format [collection][document_id][segment] and array of values as value for inserting in database
* @param {params} params - params object
* @param {array} omitted_segments - array of segments to omit
* @param {array} whitelisted_segments - array of segments to keep
* @param {function} done - callback function to call when done processing
**/
function processEvents(appEvents, appSegments, appSgValues, params, omitted_segments, whitelisted_segments, done) {
var events = [],
eventCollections = {},
eventSegments = {},
eventSegmentsZeroes = {},
tmpEventObj = {},
tmpEventColl = {},
shortEventName = "",
eventCollectionName = "",
eventHashMap = {},
forbiddenSegValues = [],
pluginsGetConfig = plugins.getConfig("api", params.app && params.app.plugins, true);
for (let i = 1; i < 32; i++) {
forbiddenSegValues.push(i + "");
}
for (let i = 0; i < params.qstring?.events.length; i++) {
var currEvent = params.qstring.events[i];
tmpEventObj = {};
tmpEventColl = {};
var tmpTotalObj = {};
// Key fields is required
if (!currEvent.key || (currEvent.key.indexOf('[CLY]_') === 0 && plugins.internalEvents.indexOf(currEvent.key) === -1)) {
continue;
}
if (currEvent.count && common.isNumber(currEvent.count)) {
currEvent.count = parseInt(currEvent.count, 10);
}
else {
currEvent.count = 1;
}
if (pluginsGetConfig.event_limit &&
appEvents.length >= pluginsGetConfig.event_limit &&
appEvents.indexOf(currEvent.key) === -1) {
continue;
}
plugins.dispatch("/i/events", {
params: params,
currEvent: currEvent
});
shortEventName = common.fixEventKey(currEvent.key);
if (!shortEventName) {
continue;
}
// Create new collection name for the event
eventCollectionName = crypto.createHash('sha1').update(shortEventName + params.app_id).digest('hex');
eventHashMap[eventCollectionName] = shortEventName;
// If present use timestamp inside each event while recording
var time = params.time;
if (params.qstring.events[i].timestamp) {
params.time = common.initTimeObj(params.appTimezone, params.qstring.events[i].timestamp);
}
common.arrayAddUniq(events, shortEventName);
if (currEvent.sum && common.isNumber(currEvent.sum)) {
currEvent.sum = parseFloat(parseFloat(currEvent.sum).toFixed(5));
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.sum, currEvent.sum);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.sum, currEvent.sum);
}
if (currEvent.dur && common.isNumber(currEvent.dur)) {
currEvent.dur = parseFloat(currEvent.dur);
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.dur, currEvent.dur);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.dur, currEvent.dur);
}
if (currEvent.count && common.isNumber(currEvent.count)) {
currEvent.count = parseInt(currEvent.count, 10);
}
common.fillTimeObjectMonth(params, tmpEventObj, common.dbMap.count, currEvent.count);
common.fillTimeObjectMonth(params, tmpTotalObj, shortEventName + '.' + common.dbMap.count, currEvent.count);
var dateIds = common.getDateIds(params);
var postfix2 = common.crypto.createHash("md5").update(shortEventName).digest('base64')[0];
tmpEventColl["no-segment" + "." + dateIds.month] = tmpEventObj;
if (!eventCollections.all) {
eventCollections.all = {};
}
var ee = {};
ee["key." + dateIds.month + "." + postfix2] = tmpTotalObj;
mergeEvents(eventCollections.all, ee);
//fill time object for total event count
if (currEvent.segmentation) {
for (let segKey in currEvent.segmentation) {
var tmpSegKey = "";
if (segKey.indexOf('.') !== -1 || segKey.substr(0, 1) === '$') {
tmpSegKey = segKey.replace(/^\$+|\./g, "");
currEvent.segmentation[tmpSegKey] = currEvent.segmentation[segKey];
delete currEvent.segmentation[segKey];
}
}
for (let segKey in currEvent.segmentation) {
//check if segment should be ommited
if (plugins.internalOmitSegments[currEvent.key] && Array.isArray(plugins.internalOmitSegments[currEvent.key]) && plugins.internalOmitSegments[currEvent.key].indexOf(segKey) !== -1) {
continue;
}
//check if segment should be ommited
if (omitted_segments[currEvent.key] && Array.isArray(omitted_segments[currEvent.key]) && omitted_segments[currEvent.key].indexOf(segKey) !== -1) {
continue;
}
if (whitelisted_segments[currEvent.key] && Array.isArray(whitelisted_segments[currEvent.key]) && whitelisted_segments[currEvent.key].indexOf(segKey) === -1) {
continue;
}
//if segKey is empty
if (segKey === "") {
continue;
}
if (pluginsGetConfig.event_segmentation_limit &&
appSegments[currEvent.key] &&
appSegments[currEvent.key].indexOf(segKey) === -1 &&
appSegments[currEvent.key].length >= pluginsGetConfig.event_segmentation_limit) {
continue;
}
var myValues = [];
var tmpSegVal;
if (Array.isArray(currEvent.segmentation[segKey])) {
//myValues = currEvent.segmentation[segKey];
myValues = [];//ignore array values
}
else {
myValues = [currEvent.segmentation[segKey]];
}
for (var z = 0; z < myValues.length; z++) {
tmpEventObj = {};
tmpSegVal = myValues[z];
try {
tmpSegVal = tmpSegVal + "";
}
catch (ex) {
tmpSegVal = "";
}
if (tmpSegVal === "") {
continue;
}
// Mongodb field names can't start with $ or contain .
tmpSegVal = tmpSegVal.replace(/^\$+/, "").replace(/\./g, ":");
if (forbiddenSegValues.indexOf(tmpSegVal) !== -1) {
tmpSegVal = "[CLY]" + tmpSegVal;
}
tmpSegVal = common.encodeCharacters(tmpSegVal);
var postfix = common.crypto.createHash("md5").update(tmpSegVal).digest('base64')[0];
if (pluginsGetConfig.event_segmentation_value_limit &&
appSgValues[eventCollectionName] &&
appSgValues[eventCollectionName]["no-segment" + "_" + dateIds.zero + "_" + postfix] &&
appSgValues[eventCollectionName]["no-segment" + "_" + dateIds.zero + "_" + postfix][segKey] &&
appSgValues[eventCollectionName]["no-segment" + "_" + dateIds.zero + "_" + postfix][segKey].indexOf(tmpSegVal) === -1 &&
appSgValues[eventCollectionName]["no-segment" + "_" + dateIds.zero + "_" + postfix][segKey].length >= pluginsGetConfig.event_segmentation_value_limit) {
continue;
}
if (currEvent.sum && common.isNumber(currEvent.sum)) {
common.fillTimeObjectMonth(params, tmpEventObj, tmpSegVal + '.' + common.dbMap.sum, currEvent.sum);
}
if (currEvent.dur && common.isNumber(currEvent.dur)) {
common.fillTimeObjectMonth(params, tmpEventObj, tmpSegVal + '.' + common.dbMap.dur, currEvent.dur);
}
common.fillTimeObjectMonth(params, tmpEventObj, tmpSegVal + '.' + common.dbMap.count, currEvent.count);
if (!eventSegmentsZeroes[eventCollectionName]) {
eventSegmentsZeroes[eventCollectionName] = [];
common.arrayAddUniq(eventSegmentsZeroes[eventCollectionName], dateIds.zero + "." + postfix);
}
else {
common.arrayAddUniq(eventSegmentsZeroes[eventCollectionName], dateIds.zero + "." + postfix);
}
if (!eventSegments[eventCollectionName + "." + dateIds.zero + "." + postfix]) {
eventSegments[eventCollectionName + "." + dateIds.zero + "." + postfix] = {};
}
eventSegments[eventCollectionName + "." + dateIds.zero + "." + postfix]['meta_v2.' + segKey + '.' + tmpSegVal] = true;
eventSegments[eventCollectionName + "." + dateIds.zero + "." + postfix]["meta_v2.segments." + segKey] = true;
tmpEventColl[segKey + "." + dateIds.month + "." + postfix] = tmpEventObj;
}
}
}
if (!eventCollections[eventCollectionName]) {
eventCollections[eventCollectionName] = {};
}
if (!eventSegmentsZeroes.all) {
eventSegmentsZeroes.all = [];
common.arrayAddUniq(eventSegmentsZeroes.all, dateIds.zero + "." + postfix2);
}
else {
common.arrayAddUniq(eventSegmentsZeroes.all, dateIds.zero + "." + postfix2);
}
if (!eventSegments["all." + dateIds.zero + "." + postfix2]) {
eventSegments["all." + dateIds.zero + "." + postfix2] = {};
}
eventSegments["all." + dateIds.zero + "." + postfix2]['meta_v2.key.' + shortEventName] = true;
eventSegments["all." + dateIds.zero + "." + postfix2]["meta_v2.segments.key"] = true;
mergeEvents(eventCollections[eventCollectionName], tmpEventColl);
//switch back to request time
params.time = time;
}
if (!pluginsGetConfig.safe && !(params.qstring?.safe_api_response)) {
for (let collection in eventCollections) {
if (eventSegmentsZeroes[collection] && eventSegmentsZeroes[collection].length) {
for (let i = 0; i < eventSegmentsZeroes[collection].length; i++) {
let zeroId = "";
if (!eventSegmentsZeroes[collection] || !eventSegmentsZeroes[collection][i]) {
continue;
}
else {
zeroId = eventSegmentsZeroes[collection][i];
}
eventSegments[collection + "." + zeroId].m = zeroId.split(".")[0];
eventSegments[collection + "." + zeroId].s = "no-segment";
eventSegments[collection + "." + zeroId].a = params.app_id + "";
eventSegments[collection + "." + zeroId].e = eventHashMap[collection] || collection;
eventSegments[collection + "." + zeroId]._id = params.app_id + "_" + collection + "_" + "no-segment_" + zeroId.replace(".", "_");
common.writeBatcher.add("events_data", params.app_id + "_" + collection + "_" + "no-segment_" + zeroId.replace(".", "_"), {$set: eventSegments[collection + "." + zeroId]});
}
}
for (let segment in eventCollections[collection]) {
let collIdSplits = segment.split("."),
collId = params.app_id + "_" + collection + "_" + segment.replace(/\./g, "_");
common.writeBatcher.add("events_data", collId, {
$set: {
"m": collIdSplits[1],
"s": collIdSplits[0],
"a": params.app_id + "",
"e": eventHashMap[collection] || collection
},
"$inc": eventCollections[collection][segment]
});
}
}
}
else {
var eventDocs = [];
for (let collection in eventCollections) {
if (eventSegmentsZeroes[collection] && eventSegmentsZeroes[collection].length) {
for (let i = 0; i < eventSegmentsZeroes[collection].length; i++) {
let zeroId = "";
if (!eventSegmentsZeroes[collection] || !eventSegmentsZeroes[collection][i]) {
continue;
}
else {
zeroId = eventSegmentsZeroes[collection][i];
}
eventSegments[collection + "." + zeroId].m = zeroId.split(".")[0];
eventSegments[collection + "." + zeroId].s = "no-segment";
eventSegments[collection + "." + zeroId].a = params.app_id + "";
eventSegments[collection + "." + zeroId].e = eventHashMap[collection] || collection;
eventDocs.push({
"collection": "events_data",
"_id": params.app_id + "_" + collection + "_" + "no-segment_" + zeroId.replace(".", "_"),
"updateObj": {$set: eventSegments[collection + "." + zeroId]}
});
}
}
for (let segment in eventCollections[collection]) {
let collIdSplits = segment.split("."),
collId = params.app_id + "_" + collection + "_" + segment.replace(/\./g, "_");
eventDocs.push({
"collection": "events_data",
"_id": collId,
"updateObj": {
$set: {
"m": collIdSplits[1],
"s": collIdSplits[0],
"a": params.app_id + "",
"e": eventHashMap[collection] || collection
},
"$inc": eventCollections[collection][segment]
},
"rollbackObj": eventCollections[collection][segment]
});
}
}
for (var k = 0; k < eventDocs.length; k++) {
common.writeBatcher.add(eventDocs[k].collection, eventDocs[k]._id, eventDocs[k].updateObj);
}
/*async.map(eventDocs, updateEventDb, function(err, eventUpdateResults) {
var needRollback = false;
for (let i = 0; i < eventUpdateResults.length; i++) {
if (eventUpdateResults[i].status === "failed") {
needRollback = true;
break;
}
}
if (needRollback) {
async.map(eventUpdateResults, rollbackEventDb, function() {
if (!params.bulk) {
common.returnMessage(params, 500, 'Failure');
}
});
}
else if (!params.bulk) {
common.returnMessage(params, 200, 'Success');
}
});*/
}
if (events.length) {
var eventSegmentList = {'$addToSet': {'list': {'$each': events}}};
for (let event in eventSegments) {
var eventSplits = event.split("."),
eventKey = eventSplits[0];
var realEventKey = ((eventHashMap[eventKey] || "") + "").replace(/\./g, ':');
//for segment describing all events there is no event key.
if (realEventKey && !eventSegmentList.$addToSet["segments." + realEventKey]) {
eventSegmentList.$addToSet["segments." + realEventKey] = {};
}
if (eventSegments[event] && realEventKey) {
for (let segment in eventSegments[event]) {
if (segment.indexOf("meta_v2.segments.") === 0) {
var name = segment.replace("meta_v2.segments.", "");
if (eventSegmentList.$addToSet["segments." + realEventKey] && eventSegmentList.$addToSet["segments." + realEventKey].$each) {
common.arrayAddUniq(eventSegmentList.$addToSet["segments." + realEventKey].$each, name);
}
else {
eventSegmentList.$addToSet["segments." + realEventKey] = {$each: [name]};
}
}
}
}
}
common.writeBatcher.add('events', common.db.ObjectID(params.app_id), eventSegmentList);
}
done();
}
/**
* Merge multiple event document objects
* @param {object} firstObj - first object to merge
* @param {object} secondObj - second object to merge
**/
function mergeEvents(firstObj, secondObj) {
for (let firstLevel in secondObj) {
if (!Object.prototype.hasOwnProperty.call(secondObj, firstLevel)) {
continue;
}
if (!firstObj[firstLevel]) {
firstObj[firstLevel] = secondObj[firstLevel];
continue;
}
for (var secondLevel in secondObj[firstLevel]) {
if (!Object.prototype.hasOwnProperty.call(secondObj[firstLevel], secondLevel)) {
continue;
}
if (firstObj[firstLevel][secondLevel]) {
firstObj[firstLevel][secondLevel] += secondObj[firstLevel][secondLevel];
}
else {
firstObj[firstLevel][secondLevel] = secondObj[firstLevel][secondLevel];
}
}
}
}
/**
* Merge multiple event document objects
* @param {object} eventDoc - document with information about event
* @param {function} callback - to call when update done
**/
/*function updateEventDb(eventDoc, callback) {
common.db.collection(eventDoc.collection).update({'_id': eventDoc._id}, eventDoc.updateObj, {
'upsert': true,
'safe': true
}, function(err, result) {
if (!err && result && result.result && result.result.ok === 1) {
callback(null, {
status: "ok",
obj: eventDoc
});
}
else {
callback(null, {
status: "failed",
obj: eventDoc
});
}
});
}*/
/**
* Rollback already updated events in case error happened and we have safe api enabled
* @param {object} eventUpdateResult - db result object of updating event document
* @param {function} callback - to call when rollback done
**/
/*function rollbackEventDb(eventUpdateResult, callback) {
if (eventUpdateResult.status === "failed") {
callback(null, {});
}
else {
var eventDoc = eventUpdateResult.obj;
if (eventDoc.rollbackObj) {
common.db.collection(eventDoc.collection).update({'_id': eventDoc._id}, {'$inc': getInvertedValues(eventDoc.rollbackObj)}, {'upsert': false}, function() {});
callback(true, {});
}
else {
callback(true, {});
}
}
}*/
/**
* Invert updated object to deduct updated values
* @param {object} obj - object with properties and values to deduct
* @returns {object} inverted update object, to deduct inserted values
**/
/*function getInvertedValues(obj) {
var invObj = {};
for (var objProp in obj) {
invObj[objProp] = -obj[objProp];
}
return invObj;
}*/
module.exports = countlyEvents;