/*
 * Decompiled with CFR 0.152.
 */
package edu.jas.util;

import edu.jas.util.SocketChannel;
import edu.jas.util.TaggedMessage;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.log4j.Logger;

public class TaggedSocketChannel
extends Thread {
    private static final Logger logger = Logger.getLogger(TaggedSocketChannel.class);
    private static final boolean debug = logger.isDebugEnabled();
    private volatile boolean isRunning = false;
    private static final String DONE = "TaggedSocketChannel Done";
    private final AtomicInteger blockedCount;
    protected final SocketChannel sc;
    protected final Map<Integer, BlockingQueue> queues;

    public TaggedSocketChannel(SocketChannel s) {
        this.sc = s;
        this.blockedCount = new AtomicInteger(0);
        this.queues = new HashMap<Integer, BlockingQueue>();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void init() {
        Map<Integer, BlockingQueue> map = this.queues;
        synchronized (map) {
            if (!this.isRunning) {
                this.start();
                this.isRunning = true;
            }
        }
        logger.info((Object)("TaggedSocketChannel at " + this.sc));
    }

    public SocketChannel getSocket() {
        return this.sc;
    }

    public void send(Integer tag, Object v) throws IOException {
        if (tag == null) {
            throw new IllegalArgumentException("tag " + tag + " not allowed");
        }
        if (v instanceof Exception) {
            throw new IllegalArgumentException("message " + v + " not allowed");
        }
        TaggedMessage tm = new TaggedMessage(tag, v);
        this.sc.send(tm);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public Object receive(Integer tag) throws InterruptedException, IOException, ClassNotFoundException {
        BlockingQueue tq = null;
        int i = 0;
        do {
            Map<Integer, BlockingQueue> map = this.queues;
            synchronized (map) {
                tq = this.queues.get(tag);
                if (tq == null) {
                    if (!this.isRunning) {
                        throw new IOException("receiver not running for " + this);
                    }
                    try {
                        try {
                            logger.debug((Object)("receive wait, tag = " + tag));
                            i = this.blockedCount.incrementAndGet();
                            this.queues.wait();
                        }
                        catch (InterruptedException e) {
                            logger.info((Object)("receive wait exception, tag = " + tag + ", blockedCount = " + i));
                            throw e;
                        }
                    }
                    finally {
                        i = this.blockedCount.decrementAndGet();
                    }
                }
            }
        } while (tq == null);
        Object v = null;
        try {
            i = this.blockedCount.incrementAndGet();
            v = tq.take();
        }
        finally {
            i = this.blockedCount.decrementAndGet();
        }
        if (v instanceof IOException) {
            throw (IOException)v;
        }
        if (v instanceof ClassNotFoundException) {
            throw (ClassNotFoundException)v;
        }
        if (v instanceof Exception) {
            throw new RuntimeException(v.toString());
        }
        return v;
    }

    public void close() {
        this.terminate();
    }

    @Override
    public String toString() {
        return "socketChannel(" + this.sc + ", tags = " + this.queues.keySet() + ")";
    }

    public int tagSize() {
        return this.queues.keySet().size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int messages() {
        int m = 0;
        Map<Integer, BlockingQueue> map = this.queues;
        synchronized (map) {
            for (BlockingQueue tq : this.queues.values()) {
                m += tq.size();
            }
        }
        return m;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        Map<Integer, BlockingQueue> map;
        int i;
        if (this.sc == null) {
            this.isRunning = false;
            return;
        }
        this.isRunning = true;
        while (this.isRunning) {
            try {
                Map<Integer, BlockingQueue> tm;
                Object r = null;
                try {
                    logger.debug((Object)"waiting for tagged object");
                    r = this.sc.receive();
                    if (this.isInterrupted()) {
                        this.isRunning = false;
                    }
                }
                catch (IOException e) {
                    r = e;
                }
                catch (ClassNotFoundException e) {
                    r = e;
                }
                catch (Exception e) {
                    r = e;
                }
                logger.debug((Object)"object recieved");
                if (r instanceof TaggedMessage) {
                    tm = (TaggedMessage)r;
                    LinkedBlockingQueue<Object> tq = null;
                    Iterator<BlockingQueue> iterator = this.queues;
                    synchronized (iterator) {
                        tq = this.queues.get(((TaggedMessage)((Object)tm)).tag);
                        if (tq == null) {
                            tq = new LinkedBlockingQueue<Object>();
                            this.queues.put(((TaggedMessage)((Object)tm)).tag, tq);
                            this.queues.notifyAll();
                        }
                    }
                    tq.put(((TaggedMessage)((Object)tm)).msg);
                    continue;
                }
                if (r instanceof Exception) {
                    if (debug) {
                        logger.debug((Object)("exception " + r));
                    }
                    tm = this.queues;
                    synchronized (tm) {
                        this.isRunning = false;
                        for (BlockingQueue q : this.queues.values()) {
                            int bc = this.blockedCount.get();
                            i = 0;
                            while (i <= bc) {
                                q.put(r);
                                ++i;
                            }
                            if (bc <= 0) continue;
                            logger.debug((Object)("put exception to queue, blockedCount = " + bc));
                        }
                        this.queues.notifyAll();
                        continue;
                    }
                }
                if (debug) {
                    logger.debug((Object)("no tagged message and no exception " + r));
                }
                tm = this.queues;
                synchronized (tm) {
                    this.isRunning = false;
                    Exception e = r.equals(DONE) ? new Exception("DONE message") : new IllegalArgumentException("no tagged message and no exception '" + r + "'");
                    for (BlockingQueue q : this.queues.values()) {
                        int bc = this.blockedCount.get();
                        int i2 = 0;
                        while (i2 <= bc) {
                            q.put(e);
                            ++i2;
                        }
                        if (bc <= 0) continue;
                        logger.debug((Object)("put '" + e.toString() + "' to queue, blockedCount = " + bc));
                    }
                    this.queues.notifyAll();
                }
                if (!r.equals(DONE)) continue;
                logger.info((Object)"run terminating by request");
                try {
                    this.sc.send(DONE);
                }
                catch (IOException e) {
                    logger.warn((Object)("send other done failed " + e));
                }
                return;
            }
            catch (InterruptedException e) {
                if (debug) {
                    logger.debug((Object)("exception " + e));
                }
                map = this.queues;
                synchronized (map) {
                    this.isRunning = false;
                    for (BlockingQueue q : this.queues.values()) {
                        try {
                            int bc = this.blockedCount.get();
                            i = 0;
                            while (i <= bc) {
                                q.put(e);
                                ++i;
                            }
                            if (bc <= 0) continue;
                            logger.debug((Object)("put interrupted to queue, blockCount = " + bc));
                        }
                        catch (InterruptedException bc) {
                            // empty catch block
                        }
                    }
                    this.queues.notifyAll();
                }
            }
        }
        if (this.isInterrupted()) {
            InterruptedException e = new InterruptedException("terminating via interrupt");
            map = this.queues;
            synchronized (map) {
                for (BlockingQueue q : this.queues.values()) {
                    try {
                        int bc = this.blockedCount.get();
                        i = 0;
                        while (i <= bc) {
                            q.put(e);
                            ++i;
                        }
                        if (bc <= 0) continue;
                        logger.debug((Object)("put terminating via interrupt to queue, blockCount = " + bc));
                    }
                    catch (InterruptedException interruptedException) {
                        // empty catch block
                    }
                }
                this.queues.notifyAll();
            }
        }
        logger.info((Object)"run terminated");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void terminate() {
        this.isRunning = false;
        this.interrupt();
        if (this.sc != null) {
            try {
                this.sc.send(DONE);
            }
            catch (IOException e) {
                logger.warn((Object)("send done failed " + e));
            }
            logger.debug((Object)(this.sc + " not yet closed"));
        }
        this.interrupt();
        Map<Integer, BlockingQueue> map = this.queues;
        synchronized (map) {
            this.isRunning = false;
            for (Map.Entry<Integer, BlockingQueue> tq : this.queues.entrySet()) {
                BlockingQueue q = tq.getValue();
                if (q.size() != 0) {
                    logger.info((Object)("queue for tag " + tq.getKey() + " not empty " + q));
                }
                int bc = 0;
                try {
                    bc = this.blockedCount.get();
                    int i = 0;
                    while (i <= bc) {
                        q.put(new IOException("queue terminate"));
                        ++i;
                    }
                }
                catch (InterruptedException interruptedException) {
                    // empty catch block
                }
                if (bc <= 0) continue;
                logger.debug((Object)("put IO-end to queue for tag " + tq.getKey() + ", blockCount = " + bc));
            }
            this.queues.notifyAll();
        }
        try {
            this.join();
        }
        catch (InterruptedException interruptedException) {
            // empty catch block
        }
        logger.info((Object)"terminated");
    }
}

