// import mqtt from 'mqtt';
import mqtt from "mqtt";
import wildcardDict from './wildcardDict';
import messageTimestampFilter from './messageTimestampFilter';
import callbackModel from './models/callbackModel';
import commManagerMsgTypes from './commManagerMsgTypes';
import heartbeatHelper from './heartbeatHelper';
import imgUtils from './imgUtils';
import mqttConstants from './mqttConstants';

const MAX_HEARTBEAT_RESPONSE = 5000;

class CommManagerWorker {

    constructor(
        product_name,
        broker_config,
        accountId,
        runHeartbeat,
        non_json_topic_transform_dict,
        relayMessageCallback,
        on_connect = null,
        on_disconnect = null) {

        this.relayMessageCallback = relayMessageCallback;
        this.timestampFilter = new messageTimestampFilter();
        this.clients = [];
        this.callbacks = new wildcardDict();
        this.subscriptions = {};
        this.productName = product_name;
        this.accountId = accountId;
        this.non_json_topic_transform_dict = non_json_topic_transform_dict;

        for (const broker of broker_config) {
            console.log(`${this.productName}: Connection to mqtt in CommManager: host: ${broker.host}, ${broker.name}`);

            var mqttc = mqtt.connect(broker.host, broker.options);
            mqttc.on('message', this.handle_message.bind(this, broker.name, broker.host));
            mqttc.on('connect', this.handle_connect.bind(this, broker.name, broker.host));
            mqttc.on('disconnect', this.handle_disconnect.bind(this, broker.name, broker.host));

            mqttc.on('reconnect', this.handle_reconnect.bind(this, broker.name, broker.host));
            mqttc.on('error', this.handle_error.bind(this, broker.name, broker.host));
            mqttc.on('close', this.handle_close.bind(this, broker.name, broker.host));
    
            mqttc.smartag_broker_name = broker.name;
            mqttc.priority_broker = broker.priority_broker;
            this.clients.push(mqttc);
            // initialze an empty list of subscriptions for the new broker
            this.subscriptions[broker.name] = []
        }
        this.connect_cb = on_connect;
        this.disconnect_cb = on_disconnect;

        this.manualResubscribe.bind(this);
        this.getSubscriptions.bind(this);

        if (runHeartbeat) {
            this.heartbeat = new heartbeatHelper(this, MAX_HEARTBEAT_RESPONSE);
            this.heartbeat.start();
        }
    }

    getSubscriptions() {
        return this.subscriptions;
    }

    handle_error(broker_name, broker_host, error) {
        console.log(`${this.productName}: handle mqtt error for broker: ${broker_name} :${broker_host}`);

        console.log(`${this.productName}: handle mqtt error: ${JSON.stringify(error)}`);
    }

    manualResubscribe(broker_name) {
        var manualResubscribeClients = this.clients.filter(x => x.smartag_broker_name === broker_name);
        manualResubscribeClients.forEach(client => {
            console.log(`${this.productName}: resubbing for: ${client.options.href}`)
            this.subscriptions[client.smartag_broker_name].forEach(topic => {
                console.log(`${this.productName}: MRS: unsubscribe: ${topic}`);
                client.unsubscribe(topic);
                client.subscribe(topic, { qos: 0 });
                console.log(`${this.productName}: MRS: subscribe: ${topic}`);
            });
        });
    }

    updateIotToken(iotToken) {
        var cloudClients = this.clients.filter(x => x.smartag_broker_name === mqttConstants.CLOUD_BROKER); 
        cloudClients.forEach(client => {
            client.options.properties.userProperties.iotToken = iotToken;
        });
    }

    handle_reconnect(broker_name, broker_host) {
        console.log(`${this.productName}: handle mqtt reconnect for broker: ${broker_name} :${broker_host}`);
    }

    handle_close(broker_name, broker_host) {
        console.log(`${this.productName}: handle mqtt close for broker: ${broker_name} :${broker_host}`);
    }

    handle_connect(broker_name, broker_host) {
        console.log(`${this.productName}: handle mqtt connect for broker: ${broker_name} :${broker_host}`);

        this.manualResubscribe(broker_name);

        if (this.connect_cb != null) {
            this.connect_cb(broker_name);
        }
    }

    handle_disconnect(broker_name, broker_host) {
        console.log(`${this.productName}: handle mqtt disconnect for broker: ${broker_name} :${broker_host}`);
        if (this.disconnect_cb != null) {
            this.disconnect_cb(broker_name);
        }
    }

    handle_message(broker_name, broker_host, topic, message) {

        if (this.timestampFilter.shouldThrottleMessage(topic)) {
            console.log(`${this.productName}: throttling message on --> topic: ${topic}`);
            return;
        }

        //Handle messages which are non json (camera feed)
        for (var t in this.non_json_topic_transform_dict) {
            if (topic.endsWith(t)) {
                var transform_fun = this.non_json_topic_transform_dict[t];
                var transformed_msg = transform_fun(message, t, broker_name);
                var formatted_msg = {
                    message: transformed_msg,
                    broker_name: broker_name,
                    topic: topic,
                    type: commManagerMsgTypes.MACHINE_NON_JSON_MSG
                };
                this.relayMessageCallback(formatted_msg);
                return 1;
            }
        }

        var isMessageOld = false;

        // try/catch in case the message is not an object
        var messageJson = "n/a";
        try {
            messageJson = JSON.parse(message);
            // verify the message has a timestamp property
            isMessageOld = this.timestampFilter.isMessageOldPerMachine(messageJson.timestamp, topic);
        } catch (err) {
            var message = message.toString();
            console.log(`${this.productName}: handle message error: ${err} --> topic: ${topic} -> message: ${message}`);
        }

        /* TEMP USED FOR LOGGING
        if (topic.indexOf('response') > 0) {
            var timestampDiff = Date.now() / 1000 - parseFloat(messageJson.timestamp);
            console.log(`[MC] received response: ${broker_name} \n topic: ${topic} \n roundtrip: ${timestampDiff} seconds`);
        }
        */

        if (!isMessageOld) {

            // check last message received over topic
            var isMessageOldPerTopic = false;
            try {
                // verify the message has a timestamp property
                isMessageOldPerTopic = this.timestampFilter.isMessageOldPerTopic(topic, messageJson.timestamp);
            } catch (err) {
                console.log(`${this.productName}: handle message error: ${err} --> topic: ${topic} -> message: ${message}`);
            }

            if (!isMessageOldPerTopic) {
                var is_coverage_topic = topic.indexOf('coverage') > -1;

                // callback
                var messageJsonString = {
                    message: messageJson,
                    broker_name: broker_name,
                    topic: topic
                };

                // either we run a callback in this thread or relay the message back to the main thread
                var cb_list = this.callbacks.get(topic);
                if (cb_list.length == 0) {
                    if (is_coverage_topic){
                        messageJsonString.type = commManagerMsgTypes.COVERAGE_JSON_MSG;
                    }
                    else{
                        messageJsonString.type = commManagerMsgTypes.MACHINE_MSG;
                    }
                    this.relayMessageCallback(messageJsonString);
                } else {
                    for (const cb of cb_list) {
                        cb.callback(messageJson, message.broker_name, message.topic);
                    }
                }
            }
        } else {
            console.log(`${this.productName}: ignoring found old message. Topic: ${topic}. Broker: ${broker_name} ts: ${messageJson.timestamp}`);
        }
    }

    // we need a default value for qos in CommManagerReceiver and CommManagerWorker publish function
    // since Receiver calls comes from the main thread, but some other publish calls come
    // directly from the worker role
    publish(topic, payload, options={ qos: 0 }) {
        for (const client of this.clients) {
            if (client.priority_broker) {
                client.publish(topic, payload, options);
            }
        }
    }

    // DON'T Call this directly. Use subscribe() return instead
    __unsubscribe(topic, model) {
        for (const client of this.clients) {
            client.unsubscribe(topic);
            var topicIndex = this.subscriptions[client.smartag_broker_name].indexOf(topic);
            if (topicIndex > -1) {
                this.subscriptions[client.smartag_broker_name].splice(topicIndex, 1);
            }
        }

        // remove callback
        var index = this.callbacks.get(topic).indexOf(model);
        if (index > -1) {
            this.callbacks.remove(topic, model);
            console.log(`${this.productName}: Unsubscribing for topic: ${topic}`)
        }
        else {
            console.warn(`We did not find a topic or callback for topic: ${topic}, model: ${model}, cb dict length: ${Object.keys(this.callbacks.dict).length}`);
        }

        this.timestampFilter.remove(topic);
    }

    subscribe(topic, data = null, callback = null, serviceName = null) {
        var model = null;
        if (callback) {
            model = new callbackModel(callback, serviceName);
            this.callbacks.push(topic, model);
        }

        console.log(`${this.productName}: subscribing to: ${topic}`);
        // if for priority brokers, subscribe for all priority brokers
        let clientsToSubscribe = null;
        if (data === null || data.data.priority_broker) {
            clientsToSubscribe = this.clients.filter(x => x.priority_broker === true);
        } else {
            // subscribe for all non-priority brokers
            clientsToSubscribe = this.clients.filter(x => x.priority_broker === false);
        }

        for (const client of clientsToSubscribe) {

            // only priority broker subscribe with the general subscribe function
            this.subscriptions[client.smartag_broker_name].push(topic);
            client.subscribe(topic, { qos: 0 });

            var sub_length = this.subscriptions[client.smartag_broker_name].length;
            if (sub_length > 1000){
                console.error(`Warning!! subscriptions have grown larger than 1k for ${client.smartag_broker_name} client. Size=${sub_length}`);
            }
        }


        if (data && data.data && data.data.max_rate_hertz)
            this.timestampFilter.initThrottleMessage(topic, data.data.max_rate_hertz);


        const cb_len = Object.keys(this.callbacks.dict).length > 1000;
        if (cb_len > 1000) {
            console.error(`Warning!! callbacks have grown larger than 1k. Size=${sub_length}`);
        }

        return () => {
            this.__unsubscribe(topic, model);
        };
    }

    end() {
        console.log(`${this.productName}: mqtt ending`);
        for (const client of this.clients) {
            client.end();
        }
    }
};

export default CommManagerWorker;
