package monterey.venue.jms;

import com.google.common.base.Preconditions;
import java.io.Serializable;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
import javax.jms.Session;
import monterey.actor.ActorRef;
import monterey.logging.Logger;
import monterey.logging.LoggerFactory;
import monterey.venue.ControlMessages;
import monterey.venue.jms.spi.JmsAdmin;
import monterey.venue.jms.spi.JmsNaming;
import monterey.venue.management.ActorMigrationMode;
import monterey.venue.management.BasicActorRef;
import monterey.venue.management.BrokerId;
import monterey.venue.management.TransitionId;
import monterey.venue.management.VenueId;
import org.apache.activemq.transport.stomp.Stomp;

/* loaded from: input_file:monterey/venue/jms/JmsMessageConsumer.class */
public class JmsMessageConsumer {
    private static final Logger LOG;
    private final JmsAdmin jmsAdmin;
    private final JmsNaming jmsNaming;
    private final ActorRef actorRef;
    private final JmsMessageListener messageListener;
    private final ActorMigrationMode actorMigrationMode;
    static final /* synthetic */ boolean $assertionsDisabled;
    private final Map<BrokerId, MessageConsumer> directConsumers = new HashMap();
    private final Map<String, Map<BrokerId, MessageConsumer>> topicConsumers = Collections.synchronizedMap(new HashMap());
    private final AtomicBoolean running = new AtomicBoolean(true);

    public JmsMessageConsumer(JmsAdmin jmsAdmin, ActorRef actorRef, JmsMessageListener jmsMessageListener) {
        this.jmsAdmin = (JmsAdmin) Preconditions.checkNotNull(jmsAdmin, "jmsAdmin");
        this.jmsNaming = jmsAdmin.getNaming();
        this.messageListener = (JmsMessageListener) Preconditions.checkNotNull(jmsMessageListener, "messageListener");
        this.actorRef = (ActorRef) Preconditions.checkNotNull(actorRef, "actorRef");
        this.actorMigrationMode = jmsAdmin.getActorMigrationMode();
    }

    public void start(boolean z) throws JMSException {
        for (Map.Entry<BrokerId, Session> entry : this.jmsAdmin.getConsumerSessions(this.actorRef).entrySet()) {
            subscribeToDirect(entry.getKey(), entry.getValue(), z);
        }
    }

    public Set<String> getTopics() {
        return this.topicConsumers.keySet();
    }

    public synchronized void addBroker(TransitionId transitionId, BrokerId brokerId) throws JMSException {
        Preconditions.checkState(this.running.get(), "Message producer %s not running", this.actorRef);
        Session session = this.jmsAdmin.getConsumerSessions(this.actorRef).get(brokerId);
        subscribeToDirect(brokerId, session, false);
        Iterator<String> it = this.topicConsumers.keySet().iterator();
        while (it.hasNext()) {
            subscribeToTopic(brokerId, session, it.next(), false);
        }
    }

    public synchronized void removeBroker(TransitionId transitionId, BrokerId brokerId) throws JMSException {
        Preconditions.checkState(this.running.get(), "Message producer %s not running", this.actorRef);
        MessageConsumer remove = this.directConsumers.remove(brokerId);
        if (remove != null) {
            remove.close();
        }
        Iterator<Map<BrokerId, MessageConsumer>> it = this.topicConsumers.values().iterator();
        while (it.hasNext()) {
            MessageConsumer remove2 = it.next().remove(brokerId);
            if (remove2 != null) {
                remove2.close();
            }
        }
    }

    public synchronized void close() throws JMSException {
        if (this.running.compareAndSet(true, false)) {
            this.jmsAdmin.closeActorConsumer(this.actorRef);
        }
    }

    public synchronized void registerTopicListener(String str, boolean z) throws JMSException {
        Preconditions.checkNotNull(str, "topicName");
        Preconditions.checkState(this.running.get(), "Message producer %s not running", this.actorRef);
        if (this.topicConsumers.containsKey(str)) {
            LOG.info("Actor %s already subscribed to topic %s", this.actorRef, str);
            return;
        }
        for (Map.Entry<BrokerId, Session> entry : this.jmsAdmin.getConsumerSessions(this.actorRef).entrySet()) {
            subscribeToTopic(entry.getKey(), entry.getValue(), str, z);
        }
    }

    public synchronized void unregisterTopicListener(String str) throws JMSException {
        Preconditions.checkNotNull(str, "topicName");
        Preconditions.checkState(this.running.get(), "Message producer %s not running", this.actorRef);
        Iterator<MessageConsumer> it = this.topicConsumers.remove(str).values().iterator();
        while (it.hasNext()) {
            it.next().close();
        }
    }

    public void switchoverSubscriptions(Collection<String> collection, VenueId venueId, VenueId venueId2) throws JMSException {
        if (!$assertionsDisabled && this.actorMigrationMode != ActorMigrationMode.USE_BROKER_WITH_ATOMIC_SUBSCRIBER_SWITCH) {
            throw new AssertionError();
        }
        Preconditions.checkState(this.running.get(), "Message producer %s not running", this.actorRef);
        Set<BrokerId> brokerIds = this.jmsAdmin.getBrokerIds();
        LOG.trace("Migrated actor switching over subscribion for actor %s, from venue %s->%s; topics %s @ %s", this.actorRef, venueId, venueId2, collection, brokerIds);
        Iterator<BrokerId> it = brokerIds.iterator();
        while (it.hasNext()) {
            Session session = this.jmsAdmin.getConsumerSessions(this.actorRef).get(it.next());
            publishControlMessageToActor(session, this.actorRef, newBrokerSwitchoverSubscriptionMessage(session, venueId.getId(), venueId2.getId()));
            ObjectMessage newBrokerSwitchoverSubscriptionMessage = newBrokerSwitchoverSubscriptionMessage(session, this.actorRef.getId() + Stomp.Headers.SEPERATOR + venueId.getId(), this.actorRef.getId() + Stomp.Headers.SEPERATOR + venueId2.getId());
            Iterator<String> it2 = collection.iterator();
            while (it2.hasNext()) {
                publishControlMessageToTopic(session, it2.next(), newBrokerSwitchoverSubscriptionMessage);
            }
        }
    }

    private void subscribeToDirect(final BrokerId brokerId, Session session, boolean z) throws JMSException {
        Preconditions.checkState(!this.directConsumers.containsKey(brokerId), "Message consumer already exists for direct @ %s", brokerId);
        MessageConsumer createSubscriptionToActorTopic = this.jmsAdmin.createSubscriptionToActorTopic(session, this.actorRef);
        this.directConsumers.put(brokerId, createSubscriptionToActorTopic);
        createSubscriptionToActorTopic.setMessageListener(new MessageListener() { // from class: monterey.venue.jms.JmsMessageConsumer.1
            @Override // javax.jms.MessageListener
            public void onMessage(Message message) {
                JmsMessageConsumer.this.onMessage(brokerId, message);
            }
        });
        if (this.actorMigrationMode != ActorMigrationMode.USE_BROKER_WITH_ATOMIC_SUBSCRIBER_SWITCH || z) {
            return;
        }
        publishControlMessageToActor(session, this.actorRef, newBrokerSwitchoverSubscriptionMessage(session, this.jmsAdmin.getNaming().toJmsConsumerTopicSuffix(this.actorRef)));
    }

    private void subscribeToTopic(final BrokerId brokerId, Session session, String str, boolean z) throws JMSException {
        Map<BrokerId, MessageConsumer> map = this.topicConsumers.get(str);
        if (map == null) {
            map = new HashMap();
            this.topicConsumers.put(str, map);
        }
        Preconditions.checkState(!map.containsKey(brokerId), "Message consumer already exists for %s @ %s", str, brokerId);
        MessageConsumer createSubscriptionToTopic = this.jmsAdmin.createSubscriptionToTopic(session, str, this.actorRef.getId());
        map.put(brokerId, createSubscriptionToTopic);
        createSubscriptionToTopic.setMessageListener(new MessageListener() { // from class: monterey.venue.jms.JmsMessageConsumer.2
            @Override // javax.jms.MessageListener
            public void onMessage(Message message) {
                JmsMessageConsumer.this.onMessage(brokerId, message);
            }
        });
        if (this.actorMigrationMode != ActorMigrationMode.USE_BROKER_WITH_ATOMIC_SUBSCRIBER_SWITCH || z) {
            return;
        }
        publishControlMessageToTopic(session, str, newBrokerSwitchoverSubscriptionMessage(session, this.jmsAdmin.getNaming().toJmsConsumerTopicSuffix(str, this.actorRef.getId())));
    }

    private ObjectMessage newBrokerSwitchoverSubscriptionMessage(Session session, String str) throws JMSException {
        ObjectMessage newBrokerControlMessage = newBrokerControlMessage(session, null);
        newBrokerControlMessage.setStringProperty(ControlMessages.BROKER_SUBSCRIPTION_NEW_DESTINATION_HEADER_KEY, str);
        return newBrokerControlMessage;
    }

    private ObjectMessage newBrokerSwitchoverSubscriptionMessage(Session session, String str, String str2) throws JMSException {
        ObjectMessage newBrokerControlMessage = newBrokerControlMessage(session, null);
        newBrokerControlMessage.setStringProperty(ControlMessages.BROKER_SUBSCRIPTION_SWITCHOVER_OLD_DESTINATION_HEADER_KEY, str);
        newBrokerControlMessage.setStringProperty(ControlMessages.BROKER_SUBSCRIPTION_SWITCHOVER_NEW_DESTINATION_HEADER_KEY, str2);
        return newBrokerControlMessage;
    }

    private ObjectMessage newBrokerControlMessage(Session session, Serializable serializable) throws JMSException {
        ObjectMessage createObjectMessage = session.createObjectMessage(serializable);
        createObjectMessage.setBooleanProperty(ControlMessages.CONTROL_HEADER_KEY, true);
        createObjectMessage.setBooleanProperty(ControlMessages.BROKER_CONTROL_HEADER_KEY, true);
        return createObjectMessage;
    }

    private void publishControlMessageToTopic(Session session, String str, ObjectMessage objectMessage) throws JMSException {
        MessageProducer createProducerToTopic = this.jmsAdmin.createProducerToTopic(session, str);
        createProducerToTopic.send(objectMessage);
        createProducerToTopic.close();
    }

    private void publishControlMessageToActor(Session session, ActorRef actorRef, ObjectMessage objectMessage) throws JMSException {
        MessageProducer createProducerToActorTopic = this.jmsAdmin.createProducerToActorTopic(session, actorRef);
        createProducerToActorTopic.send(objectMessage);
        createProducerToActorTopic.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onMessage(BrokerId brokerId, Message message) {
        BasicActorRef basicActorRef;
        String str = null;
        String str2 = null;
        try {
            str2 = this.jmsAdmin.getMessageSender(message);
            str = this.jmsAdmin.getMessageDestination(message);
            Serializable messagePayload = this.jmsAdmin.getMessagePayload(message);
            Map<String, ? extends Object> messageProperties = this.jmsAdmin.getMessageProperties(message);
            if (str2 != null) {
                try {
                    basicActorRef = new BasicActorRef(str2);
                } catch (Exception e) {
                    LOG.error(e, "Error in actor.onMessage, for actor %s", this.messageListener);
                }
            } else {
                basicActorRef = null;
            }
            BasicActorRef basicActorRef2 = basicActorRef;
            LOG.trace("Receiving message from actor %s via %s @ %s, at %s: %s (%s)", basicActorRef2, str, brokerId, this.jmsAdmin, message, messagePayload);
            if (this.jmsNaming.isActorDestination(str)) {
                String montereyActorName = this.jmsNaming.toMontereyActorName(str);
                if (this.actorRef.getId().equals(montereyActorName)) {
                    this.messageListener.onDirectMessage(brokerId, basicActorRef2, messagePayload, messageProperties);
                } else {
                    LOG.warn("Unexpected JMS direct-message from %s via %s to %s at actor %s", basicActorRef2, brokerId, montereyActorName, this.actorRef);
                }
            } else if (this.jmsNaming.isTopicDestination(str)) {
                String montereyTopicName = this.jmsNaming.toMontereyTopicName(str);
                if (this.topicConsumers.containsKey(montereyTopicName)) {
                    this.messageListener.onPublishedMessage(brokerId, basicActorRef2, montereyTopicName, messagePayload, messageProperties);
                } else {
                    LOG.warn("Unexpected JMS topic-message from %s via %s to %s at actor %s", basicActorRef2, brokerId, montereyTopicName, this.actorRef);
                }
            } else {
                LOG.warn("Unexpected JMS message via %s @ %s, at actor %s @ %s", str, brokerId, str, this.actorRef, this.jmsAdmin);
            }
        } catch (Exception e2) {
            LOG.error(e2, "Error preparing message for invoking actor.onMessage, from %s to %s, in actor %s", str2, str, this.messageListener);
        }
    }

    static {
        $assertionsDisabled = !JmsMessageConsumer.class.desiredAssertionStatus();
        LOG = new LoggerFactory().getLogger(JmsMessageConsumer.class);
    }
}
