package com.cloudsoftcorp.monterey.amqp;

import com.cloudsoftcorp.monterey.comms.api.Communications;
import com.cloudsoftcorp.monterey.comms.api.Message;
import com.cloudsoftcorp.monterey.comms.api.MessageEnvelope;
import com.cloudsoftcorp.monterey.comms.api.PubSubCommunications;
import com.cloudsoftcorp.monterey.comms.basic.BasicMessageSerialisation;
import com.cloudsoftcorp.monterey.comms.basic.BasicPreSerializedMessage;
import com.cloudsoftcorp.monterey.comms.basic.DelegatingComms;
import com.cloudsoftcorp.monterey.control.workrate.api.WorkrateContributor;
import com.cloudsoftcorp.util.Loggers;
import com.cloudsoftcorp.util.exception.RuntimeWrappedException;
import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Address;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.ConnectionParameters;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;

/* loaded from: input_file:com/cloudsoftcorp/monterey/amqp/AmqpRabbitComms.class */
public class AmqpRabbitComms extends DelegatingComms implements PubSubCommunications, WorkrateContributor {
    private static final Logger LOG = Loggers.getLogger(AmqpRabbitComms.class);
    private String userName;
    private String password;
    private Address[] hostName;
    private int portNumber;
    Map<com.cloudsoftcorp.monterey.comms.api.Address, Connection> openConnections;
    Map<com.cloudsoftcorp.monterey.comms.api.Address, Channel> openChannels;
    BasicMessageSerialisation serialisation;

    public AmqpRabbitComms(Communications.InjectableCommunications injectableCommunications) {
        super(injectableCommunications);
        this.userName = "guest";
        this.password = "guest";
        this.hostName = new Address[]{new Address("dev.rabbitmq.com")};
        this.portNumber = 5672;
        this.openConnections = new LinkedHashMap();
        this.openChannels = new LinkedHashMap();
        this.serialisation = new BasicMessageSerialisation();
    }

    Connection getConnection(com.cloudsoftcorp.monterey.comms.api.Address address) {
        synchronized (this.openConnections) {
            Connection connection = this.openConnections.get(address);
            if (connection != null) {
                return connection;
            }
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("AMQP opening connection to " + address);
            }
            ConnectionParameters connectionParameters = new ConnectionParameters();
            connectionParameters.setUsername(this.userName);
            connectionParameters.setPassword(this.password);
            connectionParameters.setVirtualHost("/");
            connectionParameters.setRequestedHeartbeat(0);
            try {
                Connection newConnection = new ConnectionFactory(connectionParameters).newConnection(this.hostName, this.portNumber);
                this.openConnections.put(address, newConnection);
                this.openChannels.put(address, newConnection.createChannel());
                return newConnection;
            } catch (IOException e) {
                throw new RuntimeWrappedException("cannot open AMQP connection to " + address, e);
            }
        }
    }

    Channel getChannel(com.cloudsoftcorp.monterey.comms.api.Address address) {
        synchronized (this.openConnections) {
            Channel channel = this.openChannels.get(address);
            if (channel != null) {
                return channel;
            }
            getConnection(address);
            Channel channel2 = this.openChannels.get(address);
            if (channel2 != null) {
                return channel2;
            }
            try {
                throw new IOException("connection missing its channel");
            } catch (IOException e) {
                throw new RuntimeWrappedException("cannot open AMQP channel with " + address, e);
            }
        }
    }

    public void addSubscriptions(Collection<String> collection, Collection<? extends com.cloudsoftcorp.monterey.comms.api.Address> collection2) {
        String obj = getAddress().toString();
        if (LOG.isLoggable(Level.FINER)) {
            LOG.finer("AMQP adding subscriptions for " + getAddress() + " to " + collection + " at " + collection2);
        }
        for (com.cloudsoftcorp.monterey.comms.api.Address address : collection2) {
            for (String str : collection) {
                try {
                    Channel channel = getChannel(address);
                    String obj2 = address.toString();
                    channel.exchangeDeclare(obj2, "direct");
                    channel.queueDeclare(getAddress().toString());
                    channel.basicConsume(getAddress().toString(), true, new DefaultConsumer(channel) { // from class: com.cloudsoftcorp.monterey.amqp.AmqpRabbitComms.1
                        public void handleDelivery(String str2, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
                            if (AmqpRabbitComms.LOG.isLoggable(Level.FINER)) {
                                AmqpRabbitComms.LOG.finer("AMQP received at " + AmqpRabbitComms.this.getAddress() + " message on " + envelope.getRoutingKey() + " from " + envelope.getExchange() + ", length " + bArr.length);
                            }
                            AmqpRabbitComms.this.comms.injectMessage(AmqpRabbitComms.this.serialisation.readMessage(new ByteArrayInputStream(bArr), (MessageEnvelope.Backstamp) null));
                        }
                    });
                    channel.queueBind(obj, obj2, str);
                } catch (IOException e) {
                    throw new RuntimeWrappedException("cannot open AMQP connection for " + str + " to " + address, e);
                }
            }
        }
    }

    public void removeSubscriptions(Collection<String> collection, Collection<? extends com.cloudsoftcorp.monterey.comms.api.Address> collection2) {
        String obj = getAddress().toString();
        if (LOG.isLoggable(Level.FINER)) {
            LOG.finer("AMQP removing subscriptions for " + getAddress() + " to " + collection + " at " + collection2);
        }
        for (com.cloudsoftcorp.monterey.comms.api.Address address : collection2) {
            for (String str : collection) {
                try {
                    getChannel(address).queueUnbind(obj, address.toString(), str);
                } catch (IOException e) {
                    throw new RuntimeWrappedException("cannot unsubscribe on AMQP queue for " + str + " at " + address, e);
                }
            }
        }
    }

    public void publish(String str, com.cloudsoftcorp.monterey.comms.api.Address address, Message message) {
        Channel channel = getChannel(address);
        try {
            BasicPreSerializedMessage basicPreSerializedMessage = new BasicPreSerializedMessage(message, this.serialisation);
            if (LOG.isLoggable(Level.FINER)) {
                LOG.finer("AMQP " + getAddress() + " publishing on " + str + " at " + address + ", msg " + message + ", length " + basicPreSerializedMessage.getCachedSerialization().length);
            }
            channel.basicPublish(address.toString(), str, (AMQP.BasicProperties) null, basicPreSerializedMessage.getCachedSerialization());
        } catch (IOException e) {
            throw new RuntimeWrappedException("cannot send to AMQP router " + address, e);
        }
    }

    public void dispose() {
        super.dispose();
        synchronized (this.openConnections) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("AMQP closing all connections (" + this.openConnections + ")");
            }
            for (Connection connection : this.openConnections.values()) {
                try {
                    connection.close();
                } catch (IOException e) {
                    throw new RuntimeWrappedException("cannot close connection " + connection, e);
                }
            }
            this.openConnections.clear();
            this.openChannels.clear();
        }
    }
}
