package monterey.venue.jms.spi;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Queue;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import monterey.actor.ActorRef;
import monterey.logging.Logger;
import monterey.logging.LoggerFactory;
import monterey.venue.management.ActorMigrationMode;
import monterey.venue.management.BrokerId;
import monterey.venue.management.VenueId;

/* loaded from: input_file:monterey/venue/jms/spi/AbstractJmsAdmin.class */
public abstract class AbstractJmsAdmin implements JmsAdmin {
    protected static final Logger LOG = new LoggerFactory().getLogger(JmsAdmin.class);
    protected final Map<BrokerId, ConnectionFactory> factories;
    protected final ConcurrentMap<ActorRef, ActorBrokerContext> actorBrokerContexts;
    protected final AtomicBoolean killed;
    protected ActorMigrationMode actorMigrationMode;
    protected int connectionPoolSize;
    private volatile BrokerId primaryBroker;
    private volatile VenueBrokerContext venueBrokerContext;
    private JmsNaming naming;
    private volatile VenueId venueId;
    private final ConcurrentMap<ConnectionFactory, List<Connection>> connectionPool;
    private final Random random;

    /* renamed from: monterey.venue.jms.spi.AbstractJmsAdmin$1, reason: invalid class name */
    /* loaded from: input_file:monterey/venue/jms/spi/AbstractJmsAdmin$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$monterey$venue$management$ActorMigrationMode = new int[ActorMigrationMode.values().length];

        static {
            try {
                $SwitchMap$monterey$venue$management$ActorMigrationMode[ActorMigrationMode.USE_BROKER_WITH_ATOMIC_SUBSCRIBER_SWITCH.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$monterey$venue$management$ActorMigrationMode[ActorMigrationMode.USE_DURABLE_SUBSCRIPTION.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    public abstract ConnectionFactory newConnectionFactory(String str);

    public AbstractJmsAdmin() {
        this((Map) ImmutableMap.of());
    }

    public AbstractJmsAdmin(ActorMigrationMode actorMigrationMode) {
        this((Map) ImmutableMap.of(JmsAdmin.ACTOR_MIGRATION_MODE, actorMigrationMode.name()));
    }

    public AbstractJmsAdmin(Map map) {
        this.factories = new HashMap();
        this.actorBrokerContexts = new ConcurrentHashMap();
        this.killed = new AtomicBoolean(false);
        this.actorMigrationMode = ActorMigrationMode.USE_DURABLE_SUBSCRIPTION;
        this.connectionPoolSize = 5;
        this.primaryBroker = null;
        this.connectionPool = new ConcurrentHashMap();
        this.random = new Random();
        configure(map);
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public void configure(Map map) {
        if (map.containsKey(JmsAdmin.ACTOR_MIGRATION_MODE)) {
            this.actorMigrationMode = ActorMigrationMode.valueOf("" + map.get(JmsAdmin.ACTOR_MIGRATION_MODE));
        }
        if (map.containsKey(JmsAdmin.JMS_CONNECTION_POOL_SIZE)) {
            this.connectionPoolSize = Integer.parseInt("" + map.get(JmsAdmin.JMS_CONNECTION_POOL_SIZE));
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public void initSubscriptions(ActorRef actorRef) throws JMSException {
        LOG.debug("initSubscriptions for actorRef %s", new Object[]{actorRef});
        switch (AnonymousClass1.$SwitchMap$monterey$venue$management$ActorMigrationMode[this.actorMigrationMode.ordinal()]) {
            case 1:
                return;
            case 2:
                for (Session session : getConsumerSessions(actorRef).values()) {
                    session.createDurableSubscriber(doLookupTopic(session, getNaming().toJmsConsumerTopicName(actorRef)), actorRef.getId()).close();
                }
                return;
            default:
                throw new IllegalStateException("Unhandeled actorMigrationMode: " + this.actorMigrationMode);
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void start(VenueId venueId) {
        LOG.debug("Starting jms-admin for %s", new Object[]{venueId});
        this.venueId = venueId;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized JmsNaming getNaming() {
        if (this.naming == null) {
            this.naming = newNaming(this.venueId);
        }
        return this.naming;
    }

    protected JmsNaming newNaming(VenueId venueId) {
        return new DefaultJmsNaming(this.actorMigrationMode, venueId);
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public ActorMigrationMode getActorMigrationMode() {
        return this.actorMigrationMode;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized boolean addBroker(BrokerId brokerId) throws JMSException {
        assertNotKilled();
        if (this.factories.containsKey(brokerId)) {
            return false;
        }
        ConnectionFactory newConnectionFactory = newConnectionFactory(brokerId.getUrl());
        this.factories.put(brokerId, newConnectionFactory);
        Iterator<ActorBrokerContext> it = this.actorBrokerContexts.values().iterator();
        while (it.hasNext()) {
            it.next().addConnection(this.factories.get(brokerId), brokerId);
        }
        if (this.primaryBroker == null) {
            this.primaryBroker = brokerId;
        }
        if (this.venueBrokerContext == null) {
            return true;
        }
        this.venueBrokerContext.addConnection(newConnectionFactory, brokerId);
        return true;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized boolean removeBroker(BrokerId brokerId) throws JMSException {
        assertNotKilled();
        if (brokerId.equals(this.primaryBroker)) {
            throw new IllegalArgumentException("Cannot remove the primary broker for this venue");
        }
        Iterator<ActorBrokerContext> it = this.actorBrokerContexts.values().iterator();
        while (it.hasNext()) {
            it.next().removeBroker(brokerId);
        }
        if (this.venueBrokerContext != null) {
            this.venueBrokerContext.removeBroker(brokerId);
        }
        return this.factories.remove(brokerId) != null;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public void kill() {
        if (this.killed.compareAndSet(false, true)) {
            this.factories.clear();
            Iterator<ActorBrokerContext> it = this.actorBrokerContexts.values().iterator();
            while (it.hasNext()) {
                it.next().kill();
            }
            this.actorBrokerContexts.clear();
            if (this.venueBrokerContext != null) {
                this.venueBrokerContext.kill();
            }
            for (List<Connection> list : this.connectionPool.values()) {
                synchronized (list) {
                    for (Connection connection : list) {
                        try {
                            connection.close();
                        } catch (JMSException e) {
                            LOG.warn(e, "Error killing connection %s; continuing...", new Object[]{connection});
                        }
                    }
                    list.clear();
                }
            }
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void close() {
        assertNotKilled();
        LOG.debug("Closing jms-admin for %s", new Object[]{this.venueId});
        Iterator it = ImmutableList.copyOf(this.actorBrokerContexts.keySet()).iterator();
        while (it.hasNext()) {
            closeActorComms((ActorRef) it.next());
        }
        if (!this.actorBrokerContexts.isEmpty()) {
            throw new RuntimeException("Did not close all actor-broker contexts?");
        }
        if (this.venueBrokerContext != null) {
            this.venueBrokerContext.close();
        }
        for (List<Connection> list : this.connectionPool.values()) {
            synchronized (list) {
                for (Connection connection : list) {
                    try {
                        connection.close();
                    } catch (JMSException e) {
                        LOG.warn(e, "Error killing connection %s; continuing...", new Object[]{connection});
                    }
                }
                list.clear();
            }
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized BrokerId getPrimaryBroker() {
        return this.primaryBroker;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized Set<BrokerId> getBrokerIds() {
        return ImmutableSet.copyOf(this.factories.keySet());
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void setPrimaryBroker(BrokerId brokerId) throws JMSException {
        assertNotKilled();
        if (LOG.isDebugEnabled()) {
            LOG.debug("For jms-admin at " + this.venueId + ", setting primary broker to " + brokerId + (this.primaryBroker != null ? "; previously " + this.primaryBroker : "") + (this.actorBrokerContexts.size() > 0 ? " " + this.actorBrokerContexts.size() + " actor(s) require switchover" : ""), new Object[0]);
        }
        this.primaryBroker = brokerId;
        addBroker(brokerId);
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public final Connection getConnection(ConnectionFactory connectionFactory, String str) throws JMSException {
        Connection newTransientConnection;
        switch (AnonymousClass1.$SwitchMap$monterey$venue$management$ActorMigrationMode[this.actorMigrationMode.ordinal()]) {
            case 1:
                if (this.connectionPoolSize <= 0) {
                    newTransientConnection = newTransientConnection(connectionFactory);
                    break;
                } else {
                    newTransientConnection = getPooledTransientConnection(connectionFactory);
                    break;
                }
            case 2:
                newTransientConnection = createDurableConnection(connectionFactory, str);
                break;
            default:
                throw new IllegalStateException("Unrecognized actorMigrationMode: " + this.actorMigrationMode);
        }
        return newTransientConnection;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public final Connection createDurableConnection(ConnectionFactory connectionFactory, String str) throws JMSException {
        LOG.debug("creating connection for durable subscription", new Object[0]);
        return newDurableConnection(connectionFactory, str);
    }

    protected Connection newDurableConnection(ConnectionFactory connectionFactory, String str) throws JMSException {
        assertNotKilled();
        Connection createConnection = connectionFactory.createConnection();
        createConnection.setClientID(str);
        createConnection.start();
        return createConnection;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public final Connection getPooledTransientConnection(ConnectionFactory connectionFactory) throws JMSException {
        Connection connection;
        LOG.debug("Creating transient connection", new Object[0]);
        List<Connection> list = this.connectionPool.get(connectionFactory);
        if (list == null) {
            LinkedList linkedList = new LinkedList();
            List<Connection> putIfAbsent = this.connectionPool.putIfAbsent(connectionFactory, linkedList);
            list = putIfAbsent == null ? linkedList : putIfAbsent;
        }
        synchronized (list) {
            if (list.isEmpty()) {
                LOG.debug("Filling connection pool", new Object[0]);
                for (int i = 0; i < this.connectionPoolSize; i++) {
                    list.add(newTransientConnection(connectionFactory));
                }
            }
            connection = list.get(this.random.nextInt(this.connectionPoolSize));
        }
        return connection;
    }

    protected Connection newTransientConnection(ConnectionFactory connectionFactory) throws JMSException {
        assertNotKilled();
        Connection createConnection = connectionFactory.createConnection();
        createConnection.start();
        return createConnection;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized Map<BrokerId, Session> getConsumerSessions(ActorRef actorRef) throws JMSException {
        assertNotKilled();
        return buildActorConnections(actorRef).getConsumerSessions();
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized Map<BrokerId, Session> getConsumerSessions(VenueId venueId) throws JMSException {
        assertNotKilled();
        buildVenueConnections(venueId);
        return this.venueBrokerContext.getConsumerSessions();
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized Session getProducerSession(ActorRef actorRef) throws JMSException {
        assertNotKilled();
        ActorBrokerContext buildActorConnections = buildActorConnections(actorRef);
        if (buildActorConnections.getProducerSession() == null) {
            buildActorConnections.setPrimaryBroker(this.factories.get(this.primaryBroker), this.primaryBroker);
        }
        return buildActorConnections.getProducerSession();
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized Session getProducerSession(VenueId venueId) throws JMSException {
        assertNotKilled();
        buildVenueConnections(venueId);
        if (this.venueBrokerContext.getProducerSession() == null) {
            this.venueBrokerContext.setPrimaryBroker(this.factories.get(this.primaryBroker), this.primaryBroker);
        }
        return this.venueBrokerContext.getProducerSession();
    }

    private ActorBrokerContext buildActorConnections(ActorRef actorRef) throws JMSException {
        assertNotKilled();
        if (!this.actorBrokerContexts.containsKey(actorRef)) {
            LOG.debug("Creating jms-actor-context for %s @ %s", new Object[]{actorRef, this.venueId});
            this.actorBrokerContexts.put(actorRef, new ActorBrokerContext(this, this.factories, actorRef, this.actorMigrationMode));
        }
        return this.actorBrokerContexts.get(actorRef);
    }

    private void buildVenueConnections(VenueId venueId) throws JMSException {
        assertNotKilled();
        if (this.venueBrokerContext == null) {
            LOG.debug("Creating jms-venue-comms for %s", new Object[]{venueId});
            this.venueBrokerContext = new VenueBrokerContext(this, this.factories, venueId, ActorMigrationMode.USE_DURABLE_SUBSCRIPTION);
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void terminateActorComms(ActorRef actorRef) {
        assertNotKilled();
        LOG.debug("Terminating durable subscriptions for %s @ %s", new Object[]{actorRef, this.venueId});
        ActorBrokerContext remove = this.actorBrokerContexts.remove(actorRef);
        if (remove != null) {
            remove.unsubscribeAndClose();
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void closeActorComms(ActorRef actorRef) {
        assertNotKilled();
        LOG.trace("Closing jms-actor-comms for %s @ %s", new Object[]{actorRef, this.venueId});
        ActorBrokerContext remove = this.actorBrokerContexts.remove(actorRef);
        if (remove != null) {
            remove.close();
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void closeActorConsumer(ActorRef actorRef) {
        assertNotKilled();
        ActorBrokerContext actorBrokerContext = this.actorBrokerContexts.get(actorRef);
        if (actorBrokerContext != null) {
            actorBrokerContext.closeConsumer();
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void terminateActorConsumer(ActorRef actorRef) {
        assertNotKilled();
        ActorBrokerContext actorBrokerContext = this.actorBrokerContexts.get(actorRef);
        if (actorBrokerContext != null) {
            actorBrokerContext.unsubscribeAndCloseConsumer();
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void closeActorProducer(ActorRef actorRef) {
        assertNotKilled();
        ActorBrokerContext actorBrokerContext = this.actorBrokerContexts.get(actorRef);
        if (actorBrokerContext != null) {
            actorBrokerContext.closeProducer();
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void resetActorProducer(ActorRef actorRef) throws JMSException {
        assertNotKilled();
        LOG.trace("Resetting jms-actor-producer for %s @ %s", new Object[]{actorRef, this.venueId});
        ActorBrokerContext actorBrokerContext = this.actorBrokerContexts.get(actorRef);
        if (actorBrokerContext != null) {
            actorBrokerContext.closeProducer();
            actorBrokerContext.setPrimaryBroker(this.factories.get(this.primaryBroker), this.primaryBroker);
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void resetVenueProducer() throws JMSException {
        assertNotKilled();
        LOG.trace("Resetting jms-venue-producer for %s", new Object[]{this.venueId});
        if (this.venueBrokerContext != null) {
            this.venueBrokerContext.closeProducer();
            this.venueBrokerContext.setPrimaryBroker(this.factories.get(this.primaryBroker), this.primaryBroker);
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public synchronized void terminateVenueComms(VenueId venueId) {
        assertNotKilled();
        LOG.trace("Terminating jms-venue-comms for venue %s", new Object[]{venueId});
        if (this.venueBrokerContext != null) {
            this.venueBrokerContext.unsubscribeAndClose();
        }
    }

    protected MessageProducer configureProducer(MessageProducer messageProducer) throws JMSException {
        messageProducer.setDeliveryMode(2);
        return messageProducer;
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public MessageProducer createProducerToActorTopic(Session session, ActorRef actorRef) throws JMSException {
        assertNotKilled();
        String jmsTopicName = getNaming().toJmsTopicName(actorRef);
        LOG.debug("Creating producer for actor topic %s", new Object[]{jmsTopicName});
        return configureProducer(session.createProducer(doLookupTopic(session, jmsTopicName)));
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public MessageProducer createProducerToTopic(Session session, String str) throws JMSException {
        assertNotKilled();
        return configureProducer(session.createProducer(doLookupTopic(session, getNaming().toJmsTopicName(str))));
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public MessageProducer createProducerToVenueTopic(Session session, VenueId venueId) throws JMSException {
        assertNotKilled();
        return configureProducer(session.createProducer(doLookupTopic(session, getNaming().toJmsTopicName(venueId))));
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public MessageConsumer createSubscriptionToTopic(Session session, String str, String str2) throws JMSException {
        assertNotKilled();
        return session.createConsumer(doLookupTopic(session, getNaming().toJmsConsumerTopicName(str, str2)));
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public MessageConsumer createSubscriptionToActorTopic(Session session, ActorRef actorRef) throws JMSException {
        assertNotKilled();
        ActorBrokerContext actorBrokerContext = this.actorBrokerContexts.get(actorRef);
        Topic doLookupTopic = doLookupTopic(session, getNaming().toJmsConsumerTopicName(actorRef));
        switch (AnonymousClass1.$SwitchMap$monterey$venue$management$ActorMigrationMode[actorBrokerContext.getActorMigrationMode().ordinal()]) {
            case 1:
                return session.createConsumer(doLookupTopic);
            case 2:
                return session.createDurableSubscriber(doLookupTopic, actorRef.getId());
            default:
                throw new IllegalStateException("Unrecognized actorMigrationMode: " + actorBrokerContext.getActorMigrationMode());
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public void unsubscribeToActorTopic(Session session, ActorRef actorRef) throws JMSException {
        assertNotKilled();
        ActorBrokerContext actorBrokerContext = this.actorBrokerContexts.get(actorRef);
        switch (AnonymousClass1.$SwitchMap$monterey$venue$management$ActorMigrationMode[actorBrokerContext.getActorMigrationMode().ordinal()]) {
            case 1:
                return;
            case 2:
                session.unsubscribe(actorRef.getId());
                return;
            default:
                throw new IllegalStateException("Unrecognized actorMigrationMode: " + actorBrokerContext.getActorMigrationMode());
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public void unsubscribeToVenueTopic(Session session, VenueId venueId) throws JMSException {
        assertNotKilled();
        session.unsubscribe(venueId.getId());
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public TopicSubscriber createSubscriptionToVenueTopic(Session session, VenueId venueId) throws JMSException {
        assertNotKilled();
        return session.createDurableSubscriber(doLookupTopic(session, getNaming().toJmsTopicName(venueId)), venueId.getId());
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public boolean destroyActorTopic(Session session, ActorRef actorRef) {
        assertNotKilled();
        throw new UnsupportedOperationException();
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public boolean destroyTopic(Session session, String str) {
        assertNotKilled();
        throw new UnsupportedOperationException();
    }

    protected Topic doLookupTopic(Session session, String str) throws JMSException {
        return session.createTopic(str);
    }

    protected void assertNotKilled() {
        if (this.killed.get()) {
            throw new IllegalStateException("JMS Admin has been killed");
        }
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public String getMessageSender(Message message) throws JMSException {
        return message.getStringProperty(JmsAdmin.SENDER_ID_PROPERTY);
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public String getMessageDestination(Message message) throws JMSException {
        Topic jMSDestination = message.getJMSDestination();
        return jMSDestination instanceof Topic ? jMSDestination.getTopicName() : ((Queue) jMSDestination).getQueueName();
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public Serializable getMessagePayload(Message message) throws JMSException {
        return ((ObjectMessage) message).getObject();
    }

    @Override // monterey.venue.jms.spi.JmsAdmin
    public Map<String, Object> getMessageProperties(Message message) throws JMSException {
        ImmutableMap.Builder builder = ImmutableMap.builder();
        Enumeration propertyNames = message.getPropertyNames();
        while (propertyNames.hasMoreElements()) {
            String str = (String) propertyNames.nextElement();
            builder.put(str, message.getObjectProperty(str));
        }
        return builder.build();
    }

    public String toString() {
        return getClass().getSimpleName() + "@" + this.venueId;
    }
}
