/*
 * Decompiled with CFR 0.152.
 */
import ca.nanometrics.naqs.stndb.StationDatabase;
import ca.nanometrics.packet.EventHandler;
import ca.nanometrics.packet.EventPacket;
import ca.nanometrics.packet.NmxPacket;
import ca.nanometrics.packet.NmxPacketHandler;
import ca.nanometrics.packet.TriggerHandler;
import ca.nanometrics.packet.TriggerPacket;
import ca.nanometrics.util.BufferedConsumer;
import ca.nanometrics.util.Log;
import ca.nanometrics.util.QueueImpl;
import java.io.IOException;
import java.net.Socket;

public class DSSubscriber
extends BufferedConsumer
implements NmxPacketHandler,
TriggerHandler,
EventHandler {
    static final int INITIAL_TIMEOUT = 30000;
    private static final int RETX_THRESHOLD = 2;
    private static final int RETX_LIMIT = 50;
    private DSRequestBroker broker;
    private DSSubscription subscription;
    private QueueImpl triggerQueue = new QueueImpl();
    private QueueImpl eventQueue = new QueueImpl();

    public DSSubscriber(Socket socket, StationDatabase stndb, NmxBufferTable bufferTable, NmxPacketSource retxSource, MsgHandler msgHandler) throws IOException {
        super("DSSubscriber", 1000);
        DSConnection connection = new DSConnection(socket);
        connection.setTimeout(30000);
        this.subscription = new DSSubscription(connection, stndb, bufferTable, retxSource, this);
        this.broker = new DSRequestBroker(connection, this.subscription, msgHandler);
    }

    protected void open() {
        this.setPriority(4);
        Log.report(this, 5, 2, "starting with priority " + this.getPriority());
        this.broker.start();
    }

    protected void close() {
        Log.report(this, 8, 2, "closing subscription " + this.subscription);
    }

    protected void process(Object obj) {
        this.subscription.put((NmxPacket)obj);
        this.checkForRetx();
    }

    private void sendTriggers() {
        while (this.triggerQueue.size() > 0) {
            try {
                TriggerPacket packet = (TriggerPacket)this.triggerQueue.get(1);
                if (packet == null) continue;
                this.subscription.put(packet);
            }
            catch (InterruptedException packet) {
            }
            catch (Exception e) {
                Log.report(this, 4, 4, "Exception in sendTriggers(): " + e);
            }
        }
    }

    private void sendEvents() {
        while (this.eventQueue.size() > 0) {
            try {
                EventPacket packet = (EventPacket)this.eventQueue.get(1);
                if (packet == null) continue;
                this.subscription.put(packet);
            }
            catch (InterruptedException packet) {
            }
            catch (Exception e) {
                Log.report(this, 4, 4, "Exception in sendEvents(): " + e);
            }
        }
    }

    private void checkForRetx() {
        if (this.getQsize() < 2) {
            this.subscription.allowRetxRequest(50);
            this.sleep(50L);
        }
    }

    protected void tick() {
        if (this.broker.isAlive()) {
            this.sendTriggers();
            this.sendEvents();
            this.checkForRetx();
            this.subscription.tick();
        } else {
            this.stayAlive = false;
        }
    }

    public void put(NmxPacket packet) {
        if (this.subscription.contains(packet.getKey())) {
            this.append(packet);
        }
    }

    public void put(TriggerPacket packet) {
        if (this.subscription.wantsTriggers()) {
            this.triggerQueue.put(packet);
        }
    }

    public void put(EventPacket packet) {
        if (this.subscription.wantsEvents()) {
            this.eventQueue.put(packet);
        }
    }

    public void stop(boolean doJoin) {
        this.broker.stop(false);
        super.stop(doJoin);
    }

    public String toString() {
        return this.subscription.toString();
    }
}

