package monterey.actor.impl;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import monterey.actor.ActorRef;
import monterey.actor.impl.HandoverForwarder;
import monterey.logging.Logger;
import monterey.logging.LoggerFactory;
import monterey.venue.ControlMessages;
import monterey.venue.jms.JmsMessageListener;
import monterey.venue.management.BrokerId;
import monterey.venue.management.VenueId;

/* loaded from: input_file:monterey/actor/impl/HandoverForwardeeReceiver.class */
public class HandoverForwardeeReceiver implements JmsMessageListener {
    private static final Logger LOG = new LoggerFactory().getLogger(HandoverForwardeeReceiver.class);
    private final Set<HandoverForwarder.JmsSource> finishedForwardingSubscriptionStreams = new HashSet();
    private final Set<HandoverForwarder.JmsSource> forwardingSubscriptionStreams = new HashSet();
    private final Map<HandoverForwarder.JmsSource, List<BufferedMessage>> messageBuffers = new HashMap();
    private final ActorRef selfRef;
    private final VenueId venueId;
    private final ForwardeeListener delegate;

    /* loaded from: input_file:monterey/actor/impl/HandoverForwardeeReceiver$ForwardeeListener.class */
    public interface ForwardeeListener extends JmsMessageListener {
        void onForwardingComplete();
    }

    public HandoverForwardeeReceiver(ActorRef actorRef, VenueId venueId, Collection<BrokerId> collection, Collection<String> collection2, ForwardeeListener forwardeeListener) {
        this.selfRef = actorRef;
        this.venueId = venueId;
        this.delegate = forwardeeListener;
        for (BrokerId brokerId : collection) {
            Iterator<String> it = collection2.iterator();
            while (it.hasNext()) {
                this.forwardingSubscriptionStreams.add(HandoverForwarder.JmsSource.newSubscriptionSource(brokerId, it.next()));
            }
            this.forwardingSubscriptionStreams.add(HandoverForwarder.JmsSource.newDirectSource(brokerId));
        }
        LOG.debug("Actor moving-in: actor %s in venue %s, waiting for lastToSubscriber from %s", actorRef, venueId, this.forwardingSubscriptionStreams);
    }

    public void onForwardedMessage(ControlMessages.ForwardedMessage forwardedMessage) {
        if (!ControlMessages.isLastToSubscriber(forwardedMessage.getPayload(), forwardedMessage.getProperties())) {
            forwardedMessage.passTo(this.delegate);
            return;
        }
        HandoverForwarder.JmsSource newDirectSource = forwardedMessage.isDirect() ? HandoverForwarder.JmsSource.newDirectSource(forwardedMessage.getBrokerId()) : HandoverForwarder.JmsSource.newSubscriptionSource(forwardedMessage.getBrokerId(), forwardedMessage.getTopicName());
        this.forwardingSubscriptionStreams.remove(newDirectSource);
        this.finishedForwardingSubscriptionStreams.add(newDirectSource);
        List<BufferedMessage> remove = this.messageBuffers.remove(newDirectSource);
        Logger logger = LOG;
        Object[] objArr = new Object[5];
        objArr[0] = this.selfRef;
        objArr[1] = this.venueId;
        objArr[2] = newDirectSource;
        objArr[3] = Integer.valueOf(remove != null ? remove.size() : 0);
        objArr[4] = Integer.valueOf(this.forwardingSubscriptionStreams.size());
        logger.debug("Actor emptying buffer (moving-in, received lastToSubscriber): actor %s in venue %s received from %s, buffer size %d; waiting for %d more stream(s) before handover complete", objArr);
        if (remove != null) {
            Iterator<BufferedMessage> it = remove.iterator();
            while (it.hasNext()) {
                it.next().passTo(this.delegate);
            }
        }
        if (this.forwardingSubscriptionStreams.isEmpty()) {
            this.delegate.onForwardingComplete();
        }
    }

    @Override // monterey.venue.jms.JmsMessageListener
    public void onDirectMessage(BrokerId brokerId, ActorRef actorRef, Serializable serializable, Map<String, ? extends Object> map) {
        onMessage(HandoverForwarder.JmsSource.newDirectSource(brokerId), BufferedMessage.newDirectMessage(brokerId, actorRef, serializable, map));
    }

    @Override // monterey.venue.jms.JmsMessageListener
    public void onPublishedMessage(BrokerId brokerId, ActorRef actorRef, String str, Serializable serializable, Map<String, ? extends Object> map) {
        onMessage(HandoverForwarder.JmsSource.newSubscriptionSource(brokerId, str), BufferedMessage.newSubscriptionMessage(brokerId, actorRef, str, serializable, map));
    }

    private void onMessage(HandoverForwarder.JmsSource jmsSource, BufferedMessage bufferedMessage) {
        if (!this.forwardingSubscriptionStreams.contains(jmsSource)) {
            if (this.finishedForwardingSubscriptionStreams.contains(jmsSource)) {
                LOG.trace("Actor passing through message (moving-in, have received lastToSubscriber): actor %s in venue %s received from %s: %s", this.selfRef, this.venueId, jmsSource, bufferedMessage.getPayload());
            } else {
                LOG.trace("Actor passing through message (moving-in, entirely new subscription): actor %s in venue %s received from %s: %s", this.selfRef, this.venueId, jmsSource, bufferedMessage.getPayload());
            }
            bufferedMessage.passTo(this.delegate);
            return;
        }
        LOG.debug("Actor buffering message (moving-in, waiting for forwarded lastToSubscriber): actor %s in venue %s received from %s: %s", this.selfRef, this.venueId, jmsSource, bufferedMessage.getPayload());
        List<BufferedMessage> list = this.messageBuffers.get(jmsSource);
        if (list == null) {
            list = new ArrayList();
            this.messageBuffers.put(jmsSource, list);
        }
        list.add(bufferedMessage);
    }
}
