package com.cloudsoftcorp.monterey.comms.socket;

import com.cloudsoftcorp.monterey.comms.api.Address;
import com.cloudsoftcorp.monterey.comms.api.CommsException;
import com.cloudsoftcorp.monterey.comms.api.Message;
import com.cloudsoftcorp.monterey.comms.basic.BasicMessageSerialisation;
import com.cloudsoftcorp.monterey.comms.basic.MessageReader;
import com.cloudsoftcorp.monterey.comms.simlatency.AddressWithLocation;
import com.cloudsoftcorp.util.Loggers;
import com.cloudsoftcorp.util.NetworkUtil;
import com.cloudsoftcorp.util.annotation.Nullable;
import com.cloudsoftcorp.util.condition.Consumer;
import com.cloudsoftcorp.util.condition.Filter;
import com.cloudsoftcorp.util.exception.ExceptionUtils;
import com.cloudsoftcorp.util.exception.RuntimeInterruptedException;
import com.cloudsoftcorp.util.executors.CallbackWithResult;
import com.cloudsoftcorp.util.proc.CloudsoftThreadFactory;
import com.cloudsoftcorp.util.proc.ResourceUsageUtils;
import com.cloudsoftcorp.util.proc.ThreadStack;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.mortbay.jetty.HttpVersions;

/* loaded from: input_file:com/cloudsoftcorp/monterey/comms/socket/SocketMessageReader.class */
public class SocketMessageReader implements MessageReader {
    private static final Logger LOG;
    private static final int JAVA_BUFFER_SOCKET_INPUT_SIZE = 1024;
    public static final int MAX_READER_MESSAGE_QUEUE_LENGTH = 0;
    private static final int NEXT_MESSAGE_WAKEUP_PERIOD = 1000;
    public static boolean INCLUDE_BACKSTAMPS;
    private final ServerSocket serverSocket;
    private final BasicMessageSerialisation serialisation;
    private final SocketAddress socketAddress;
    private final BlockingQueue<Message> incomingMessages;
    private final Collection<ResponseCorrelator> expectedResponses;
    private final Collection<FilteredMessageConsumer> unqueuedMessageConsumers;
    private volatile boolean disposed;
    private final List<SocketChildReader> children;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/comms/socket/SocketMessageReader$FilteredMessageConsumer.class */
    public static class FilteredMessageConsumer {
        final Filter<Message> filter;
        final Consumer<Message> consumer;

        public FilteredMessageConsumer(Filter<Message> filter, Consumer<Message> consumer) {
            this.filter = filter;
            this.consumer = consumer;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/comms/socket/SocketMessageReader$ResponseCorrelator.class */
    public static class ResponseCorrelator {
        final Address dest;
        final Filter<Message> responseFilter;
        final CallbackWithResult<Message> callback;

        public ResponseCorrelator(Address address, Filter<Message> filter, CallbackWithResult<Message> callbackWithResult) {
            this.dest = address;
            this.responseFilter = filter;
            this.callback = callbackWithResult;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/comms/socket/SocketMessageReader$SocketChildReader.class */
    public class SocketChildReader implements Runnable {
        Socket clientSocket;
        Thread t;
        static final /* synthetic */ boolean $assertionsDisabled;

        public SocketChildReader(Socket socket) throws IOException {
            synchronized (SocketMessageReader.this.children) {
                if (SocketMessageReader.this.disposed) {
                    throw new IOException("server is shutdown");
                }
                this.clientSocket = socket;
                SocketMessageReader.this.children.add(this);
            }
            try {
                socket.setTcpNoDelay(true);
                socket.setKeepAlive(true);
                socket.setPerformancePreferences(0, 2, 1);
            } catch (SocketException e) {
                if (!SocketMessageReader.this.disposed) {
                    throw new CommsException("Failed to configure socket", e);
                }
                throw new IOException("server is shutdown", e);
            }
        }

        public void start() {
            if (!$assertionsDisabled && this.t != null) {
                throw new AssertionError("already started");
            }
            this.t = CloudsoftThreadFactory.createThread("child socket listener spawned from " + SocketMessageReader.this.serverSocket.getLocalPort(), this, true);
        }

        @Override // java.lang.Runnable
        public void run() {
            InetSocketAddress inetSocketAddress = new InetSocketAddress(this.clientSocket.getInetAddress(), this.clientSocket.getPort());
            InetSocketAddress inetSocketAddress2 = new InetSocketAddress(this.clientSocket.getLocalAddress(), this.clientSocket.getLocalPort());
            IOException iOException = null;
            try {
                BufferedInputStream bufferedInputStream = new BufferedInputStream(this.clientSocket.getInputStream(), 1024);
                while (true) {
                    Message readMessage = SocketMessageReader.this.serialisation.readMessage(bufferedInputStream, SocketMessageReader.INCLUDE_BACKSTAMPS ? new SocketBackstamp(new SocketAddress(inetSocketAddress), new SocketAddress(inetSocketAddress2)) : null);
                    if (readMessage == null) {
                        break;
                    }
                    if (SocketMessageReader.LOG.isLoggable(Level.FINER)) {
                        if (SocketMessageReader.LOG.isLoggable(Level.FINEST)) {
                            SocketMessageReader.LOG.finest(this + " read message " + readMessage + ", mem " + ResourceUsageUtils.getMemoryUsageString() + ", total gc " + ResourceUsageUtils.getGarbageCollectionInfo());
                        } else {
                            SocketMessageReader.LOG.finer(this + " read message " + readMessage);
                        }
                    }
                    SocketMessageReader.this.addMessage(readMessage);
                }
                if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                    SocketMessageReader.LOG.fine(this + " detected normal termination (null message)");
                }
                boolean z = false;
                try {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " closing" + (0 != 0 ? " (problem " + ((Object) null) + ")" : HttpVersions.HTTP_0_9));
                    }
                    this.clientSocket.close();
                } catch (IOException e) {
                    if (0 == 0) {
                        z = true;
                        iOException = e;
                    }
                }
                if (SocketMessageReader.this.disposed) {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " after closing had problems (ignoring because disposed): " + iOException);
                    }
                } else if (iOException != null) {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " had problems " + (z ? "during close " : HttpVersions.HTTP_0_9) + "(closed and rethrowing): " + iOException);
                    }
                    throw new CommsException(new SocketAddress(inetSocketAddress), iOException);
                }
            } catch (Exception e2) {
                Exception exc = e2;
                boolean z2 = false;
                try {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " closing" + (exc != null ? " (problem " + exc + ")" : HttpVersions.HTTP_0_9));
                    }
                    this.clientSocket.close();
                } catch (IOException e3) {
                    if (exc == null) {
                        z2 = true;
                        exc = e3;
                    }
                }
                if (SocketMessageReader.this.disposed) {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " after closing had problems (ignoring because disposed): " + exc);
                    }
                } else if (exc != null) {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " had problems " + (z2 ? "during close " : HttpVersions.HTTP_0_9) + "(closed and rethrowing): " + exc);
                    }
                    throw new CommsException(new SocketAddress(inetSocketAddress), exc);
                }
            } catch (Throwable th) {
                boolean z3 = false;
                try {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " closing" + (0 != 0 ? " (problem " + ((Object) null) + ")" : HttpVersions.HTTP_0_9));
                    }
                    this.clientSocket.close();
                } catch (IOException e4) {
                    if (0 == 0) {
                        z3 = true;
                        iOException = e4;
                    }
                }
                if (SocketMessageReader.this.disposed) {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " after closing had problems (ignoring because disposed): " + iOException);
                    }
                } else {
                    if (iOException == null) {
                        throw th;
                    }
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " had problems " + (z3 ? "during close " : HttpVersions.HTTP_0_9) + "(closed and rethrowing): " + iOException);
                    }
                    throw new CommsException(new SocketAddress(inetSocketAddress), iOException);
                }
            }
        }

        public void dispose() {
            try {
                try {
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine("disposing " + this);
                    }
                    this.clientSocket.close();
                    if (this.t != null) {
                        this.t.interrupt();
                    }
                } catch (IOException e) {
                    ExceptionUtils.throwRuntime(e);
                    if (this.t != null) {
                        this.t.interrupt();
                    }
                }
            } catch (Throwable th) {
                if (this.t != null) {
                    this.t.interrupt();
                }
                throw th;
            }
        }

        public String toString() {
            return ThreadStack.getSimpleClassName(getClass()) + "[" + this.clientSocket + ", via " + SocketMessageReader.this.getAddress() + "]";
        }

        static {
            $assertionsDisabled = !SocketMessageReader.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/cloudsoftcorp/monterey/comms/socket/SocketMessageReader$SocketServerReader.class */
    private class SocketServerReader implements Runnable {
        private SocketServerReader() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (SocketMessageReader.this.serverSocket.isBound() && !SocketMessageReader.this.serverSocket.isClosed()) {
                try {
                    Socket accept = SocketMessageReader.this.serverSocket.accept();
                    if (SocketMessageReader.LOG.isLoggable(Level.FINE)) {
                        SocketMessageReader.LOG.fine(this + " spawning new reader for new connection on " + accept);
                    }
                    new SocketChildReader(accept).start();
                } catch (IOException e) {
                    if (!SocketMessageReader.this.disposed) {
                        throw new CommsException("Failed to start socket connection: " + e, e);
                    }
                    return;
                }
            }
        }

        public void start() {
            CloudsoftThreadFactory.createThread("main socket listener for port " + SocketMessageReader.this.serverSocket.getLocalPort(), this, true);
        }
    }

    public SocketMessageReader(@Nullable("to use any address") InetAddress inetAddress, int i) {
        this.serialisation = new BasicMessageSerialisation();
        this.incomingMessages = new LinkedBlockingQueue();
        this.expectedResponses = new LinkedBlockingQueue();
        this.unqueuedMessageConsumers = new LinkedBlockingQueue();
        this.disposed = false;
        this.children = new ArrayList();
        InetAddress findPreferredLocalIp4 = inetAddress != null ? inetAddress : NetworkUtil.findPreferredLocalIp4();
        try {
            this.serverSocket = new ServerSocket(i > 0 ? i : 0, 0);
            this.socketAddress = new SocketAddress(findPreferredLocalIp4, this.serverSocket.getLocalPort());
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("opening new SocketMessageReader on port " + i + ", on " + this.socketAddress);
            }
            new SocketServerReader().start();
        } catch (IOException e) {
            throw new CommsException("Failed to create reader on port " + i + ": " + e, e);
        }
    }

    public SocketMessageReader(@Nullable("to use any address") SocketAddress socketAddress) {
        this(socketAddress == null ? null : socketAddress.getInetSocketAddress().getAddress(), socketAddress == null ? 0 : socketAddress.getInetSocketAddress().getPort());
    }

    @Override // com.cloudsoftcorp.monterey.comms.basic.MessageReader
    public SocketAddress getAddress() {
        return this.socketAddress;
    }

    public int getIncomingQueueLength() {
        return this.incomingMessages.size();
    }

    public void addMessage(Message message) {
        ResponseCorrelator removeMatchingCorrelator = removeMatchingCorrelator(message);
        if (removeMatchingCorrelator != null) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("Inbound message matches response-correlator: msg=" + message + "; correlator=" + removeMatchingCorrelator);
            }
            removeMatchingCorrelator.callback.onSuccess(message);
            return;
        }
        FilteredMessageConsumer findMatchingUnqueuedConsumer = findMatchingUnqueuedConsumer(message);
        if (findMatchingUnqueuedConsumer == null) {
            this.incomingMessages.add(message);
            return;
        }
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Inbound message matches unqueued-consumer: msg=" + message + "; correlator=" + removeMatchingCorrelator);
        }
        findMatchingUnqueuedConsumer.consumer.consume(message);
    }

    public String toString() {
        return SocketMessageReader.class.getSimpleName() + "[" + getAddress() + "]";
    }

    @Override // com.cloudsoftcorp.monterey.comms.basic.MessageReader
    public Message nextMessage() {
        if (this.disposed && this.incomingMessages.isEmpty()) {
            throw new IllegalStateException("reader is disposed and no messages available");
        }
        do {
            try {
                Message poll = this.incomingMessages.poll(1000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    return poll;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeInterruptedException("Interrupted while waiting for message on " + this, e);
            }
        } while (!this.disposed);
        if ($assertionsDisabled || this.disposed) {
            throw new IllegalStateException("reader is disposed and no messages available");
        }
        throw new AssertionError();
    }

    @Nullable("if no message available within the time")
    public Message nextMessage(long j) {
        if (this.disposed && this.incomingMessages.isEmpty()) {
            throw new IllegalStateException("reader is disposed and no messages available");
        }
        try {
            return this.incomingMessages.poll(j, TimeUnit.MILLISECONDS);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeInterruptedException("Interrupted while waiting for message on " + this, e);
        }
    }

    public void expectResponse(Address address, Filter<Message> filter, CallbackWithResult<Message> callbackWithResult) {
        this.expectedResponses.add(new ResponseCorrelator(address, filter, callbackWithResult));
        if (this.disposed) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("Failing send-receive callback because comms is disposed: comms=" + getAddress() + "; callback=" + callbackWithResult);
            }
            callbackWithResult.onFailure(new CommsException("Local comms " + getAddress() + " is disposed"));
        }
    }

    public void addUnqueuedMessageConsumer(Filter<Message> filter, Consumer<Message> consumer) {
        this.unqueuedMessageConsumers.add(new FilteredMessageConsumer(filter, consumer));
    }

    @Override // com.cloudsoftcorp.monterey.comms.basic.MessageReader
    public void dispose() {
        if (this.disposed) {
            return;
        }
        this.disposed = true;
        if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("disposing " + this + ", " + this.children.size() + " active children first");
        }
        synchronized (this.children) {
            for (SocketChildReader socketChildReader : this.children) {
                try {
                    socketChildReader.dispose();
                } catch (Exception e) {
                    LOG.log(Level.WARNING, "Error closing child socket " + socketChildReader.clientSocket.getLocalPort() + " (from " + socketChildReader.clientSocket.getRemoteSocketAddress() + ") on dispose, for " + this + ": " + e, (Throwable) e);
                }
            }
            try {
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("disposing " + this + ", main server socket now");
                }
                this.serverSocket.close();
            } catch (IOException e2) {
                LOG.log(Level.WARNING, "Error closing socket on dispose, for " + this + ": " + e2, (Throwable) e2);
            }
        }
        this.children.clear();
        for (ResponseCorrelator responseCorrelator : removeAllCorrelators()) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("Failing exisiting send-receive callback because comms is disposed: comms=" + getAddress() + "; callback=" + responseCorrelator.callback);
            }
            responseCorrelator.callback.onFailure(new CommsException("Local comms " + getAddress() + " is disposed"));
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void onRemoteCommsDown(Address address) {
        for (ResponseCorrelator responseCorrelator : removeAllCorrelatorsForNode(address)) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("Failing exisiting send-receive callback because remote-comms down: comms=" + getAddress() + "; callback=" + responseCorrelator.callback);
            }
            try {
                responseCorrelator.callback.onFailure(new CommsException("Connection to remote comms " + getAddress() + " is down"));
            } catch (RuntimeInterruptedException e) {
                throw e;
            } catch (Exception e2) {
                if (e2 instanceof InterruptedException) {
                    throw ExceptionUtils.throwRuntime(e2);
                }
                LOG.log(Level.WARNING, "Error notifying callbacks of request-response failure", (Throwable) e2);
            }
        }
    }

    private FilteredMessageConsumer findMatchingUnqueuedConsumer(Message message) {
        for (FilteredMessageConsumer filteredMessageConsumer : this.unqueuedMessageConsumers) {
            if (filteredMessageConsumer.filter.accept(message)) {
                return filteredMessageConsumer;
            }
        }
        return null;
    }

    private ResponseCorrelator removeMatchingCorrelator(Message message) {
        synchronized (this.expectedResponses) {
            for (ResponseCorrelator responseCorrelator : this.expectedResponses) {
                if (responseCorrelator.responseFilter.accept(message)) {
                    this.expectedResponses.remove(responseCorrelator);
                    return responseCorrelator;
                }
            }
            return null;
        }
    }

    private Collection<ResponseCorrelator> removeAllCorrelators() {
        ArrayList arrayList;
        synchronized (this.expectedResponses) {
            arrayList = new ArrayList(this.expectedResponses);
            this.expectedResponses.clear();
        }
        return arrayList;
    }

    private Collection<ResponseCorrelator> removeAllCorrelatorsForNode(Address address) {
        ArrayList arrayList;
        synchronized (this.expectedResponses) {
            arrayList = new ArrayList();
            Iterator<ResponseCorrelator> it = this.expectedResponses.iterator();
            while (it.hasNext()) {
                ResponseCorrelator next = it.next();
                if ((next.dest instanceof AddressWithLocation ? ((AddressWithLocation) next.dest).getWrappedAddress() : next.dest).equals(address)) {
                    arrayList.add(next);
                    it.remove();
                }
            }
        }
        return arrayList;
    }

    static {
        $assertionsDisabled = !SocketMessageReader.class.desiredAssertionStatus();
        LOG = Loggers.getLogger(SocketMessageReader.class);
        INCLUDE_BACKSTAMPS = false;
    }
}
