jobs/userMerge.js

'use strict';

const job = require('../parts/jobs/job.js'),
    plugins = require('../../plugins/pluginManager.js'),
    log = require('../utils/log.js')('job:userMerge');
var Promise = require("bluebird");
var usersApi = require('../parts/mgmt/app_users.js');


var getMergeDoc = function(data) {
    if (data && data.list && data.list.length > data.pointer) {
        var copy = data.pointer;
        data.pointer++;
        return data.list[copy];
    }
    else {
        return null;
    }
};


var handleMerges = function(db, callback) {
    log.d('looking for unfinished merges ...');
    var paralel_cn = plugins.getConfig("api").user_merge_paralel;
    try {
        paralel_cn = parseInt(paralel_cn);
    }
    catch {
        paralel_cn = 1;
    }
    paralel_cn = Math.max(1, paralel_cn);

    var date = Math.round(new Date().getTime() / 1000) - 60;
    var limit = 100;
    if (paralel_cn && paralel_cn > limit) {
        limit = paralel_cn;
    }

    /**
     * process merging
     * @param {object} dataObj - data object
     * @param {function} resolve - callback
     **/
    function processMerging(dataObj, resolve) {
        var user = getMergeDoc(dataObj);
        if (user) {
            mergeUserData(user, function() {
                processMerging(dataObj, resolve);
            });
        }
        else {
            resolve();
        }
    }

    /** 
     *  merge user data
     * @param {object} user - user object
     * @param {function} resolve - callback
     * **/
    function mergeUserData(user, resolve) {
        var dd = user._id.split("_");
        if (dd.length !== 3) {
            log.e("deleting unexpected document in merges with bad _id: " + user._id);
            db.collection('app_user_merges').remove({"_id": user._id}, (err2)=>{
                if (err2) {
                    log.e("error deleting document in merges with bad _id: " + user._id);
                    log.e(err2);
                }
                resolve();
            });
        }
        else if (user.t > 100) {
            log.e("deleting document in merges with too many retries: " + user._id);
            db.collection('app_user_merges').remove({"_id": user._id}, (err2)=>{
                if (err2) {
                    log.e("error deleting document in merges with _id: " + user._id);
                    log.e(err2);
                }
                resolve();
            });
        }
        else {
            var app_id = dd[0];
            var olduid = dd[2];
            //user docuument is not saved  merged - try merginfg it at first
            if (user.merged_to) {
                if (!user.u) { //user documents are not merged. Could be just failed state.
                    log.e("user doc not saved as merged. Processing it.");
                    db.collection('app_users' + app_id).find({"uid": {"$in": [olduid, user.merged_to]}}).toArray((err5, docs)=>{
                        if (err5) {
                            log.e("error fetching users for merge", err5);
                            resolve();
                            return;
                        }
                        var oldAppUser;
                        var newAppUser;
                        for (var z = 0; z < docs.length;z++) {
                            if (docs[z].uid === olduid) {
                                oldAppUser = docs[z];
                            }
                            if (docs[z].uid === user.merged_to) {
                                newAppUser = docs[z];
                            }
                        }
                        if (!oldAppUser && newAppUser) {
                            //old user was merged to new user, but state update failed - we can mark it as merged and process other plugins
                            usersApi.mergeOtherPlugins({db: db, app_id: app_id, newAppUser: {uid: user.merged_to}, oldAppUser: {uid: olduid}, updateFields: {"mc": true, "cc": true, "u": true}, mergeDoc: user}, resolve);
                        }
                        if (!newAppUser) {
                            //new user do not exists - we can delete merging record
                            db.collection('app_user_merges').remove({"_id": user._id}, (err4)=>{
                                if (err4) {
                                    log.e("error deleting document in merges with bad _id: " + user._id);
                                    log.e(err4);
                                }
                                resolve();
                            });
                        }
                        else if (oldAppUser && newAppUser) {
                            db.collection('app_user_merges').update({"_id": user._id}, {"$inc": {"t": 1}}, {upsert: false}, function(err0) {
                                if (err0) {
                                    log.e(err0);
                                }
                                //Both documents exists. We can assume that documents were not merged
                                plugins.dispatch("/i/user_merge", {
                                    app_id: app_id,
                                    newAppUser: newAppUser,
                                    oldAppUser: oldAppUser
                                }, function() {
                                    //merge user data
                                    usersApi.mergeUserProperties(newAppUser, oldAppUser);
                                    //update new user

                                    db.collection('app_users' + app_id).update({_id: newAppUser._id}, {'$set': newAppUser}, function(err6) {
                                        //Dispatch to other plugins only after callback.
                                        if (!err6) {
                                            //update metric changes document
                                            db.collection("metric_changes" + app_id).update({uid: oldAppUser.uid}, {'$set': {uid: newAppUser.uid}}, {multi: true}, function(err7) {
                                                if (err7) {
                                                    log.e("Failed metric changes update in app_users merge", err7);
                                                }
                                            });
                                            //delete old app users document
                                            db.collection('app_users' + app_id).remove({_id: oldAppUser._id}, function(errRemoving) {
                                                if (errRemoving) {
                                                    log.e("Failed to remove merged user from database", errRemoving);
                                                }
                                                else {
                                                    usersApi.mergeOtherPlugins({db: db, app_id: app_id, newAppUser: {uid: user.merged_to}, oldAppUser: {uid: olduid}, updateFields: {"cc": true, "u": true}, mergeDoc: user}, resolve);
                                                }
                                            });
                                        }
                                        else {
                                            resolve();//will retry after
                                        }
                                    });
                                });
                            });
                        }
                    });
                }
                else if (!user.mc) { //documents are merged, but metric changes and other plugins are not yet
                    db.collection('app_user_merges').update({"_id": user._id}, {"$inc": {"t": 1}}, {upsert: false}, function(err0) {
                        if (err0) {
                            log.e(err0);
                        }
                        db.collection("metric_changes" + app_id).update({uid: olduid}, {'$set': {uid: usersApi.merged_to}}, {multi: true}, function(err7) {
                            if (err7) {
                                log.e("Failed metric changes update in app_users merge", err7);
                            }
                            else {
                                usersApi.mergeOtherPlugins({db: db, app_id: app_id, newAppUser: {uid: user.merged_to}, oldAppUser: {uid: olduid}, updateFields: {"cc": true, "mc": true}, mergeDoc: user}, resolve);
                            }
                        });
                    });
                }
                else {
                    usersApi.mergeOtherPlugins({db: db, app_id: app_id, newAppUser: {uid: user.merged_to}, oldAppUser: {uid: olduid}, updateFields: {"cc": true}, mergeDoc: user}, resolve);
                }
            }
            else {
                resolve();
            }
        }
    }

    db.collection('app_user_merges').find({"lu": {"$lt": date}}).limit(limit).toArray(function(err, mergedocs) {
        if (err) {
            callback(err);
        }
        if (mergedocs && mergedocs.length > 0) {
            var dataObj = {'list': mergedocs, pointer: 0};
            log.d('found ' + mergedocs.length + ' unfinished merges');
            var promises = [];

            for (var z = 0; z < paralel_cn; z++) {
                promises.push(new Promise((resolve)=>{
                    processMerging(dataObj, resolve);

                }));
            }

            Promise.all(promises).then(()=>{
                if (mergedocs.length === 100) {
                    setTimeout(()=>{
                        handleMerges(db, callback);
                    }, 0); //To do not grow stack.
                }
                else {
                    callback();
                }
            }).catch((errThrown)=>{
                log.e("finished with errors");
                log.e(errThrown);
                callback(errThrown);
            });
        }
        else {
            log.d('all users merged');
            callback();
        }
    });
};
/** Class for the user mergind job **/
class UserMergeJob extends job.Job {
    /**
     * Run the job
     * @param {Db} db connection
     * @param {done} done callback
     * @param {function} progressJob - callback when progress made
     */
    run(db, done, progressJob) {
        var total = 0;
        var current = 0;
        var bookmark = "";

        /**
         * check job status periodically
         */
        function ping() {
            log.d('Pinging user merging job');
            if (timeout) {
                progressJob(total, current, bookmark);
                timeout = setTimeout(ping, 10000);
            }
        }
        /**
         * end job
         * @returns {varies} job done
         */
        function endJob() {
            log.d('Ending user merging job');
            clearTimeout(timeout);
            timeout = 0;
            return done();
        }
        var timeout = setTimeout(ping, 10000);

        log.d('finishing up not finished merges merges...');
        plugins.loadConfigs(db, function() {
            handleMerges(db, ()=>{
                endJob();
            });
        });
    }
}

module.exports = UserMergeJob;