package monterey.actor.impl;

import java.util.Arrays;
import java.util.Collection;
import java.util.Iterator;
import monterey.actor.ActorRef;
import monterey.actor.impl.MessageTestUtil;
import monterey.venue.ControlMessages;
import monterey.venue.management.BasicActorRef;
import monterey.venue.management.BrokerId;
import monterey.venue.management.VenueId;
import org.testng.Assert;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;

/* loaded from: input_file:monterey/actor/impl/HandoverForwardeeReceiverTest.class */
public class HandoverForwardeeReceiverTest {
    private MessageTestUtil.RecordingForwardeeListener listener;
    private HandoverForwardeeReceiver handoverForwardee;
    private BrokerId broker1 = new BrokerId("broker1");
    private BrokerId broker2 = new BrokerId("broker2");
    private ActorRef sender1 = new BasicActorRef("sender1");
    private String topic1 = "topic1";
    private String topic2 = "topic2";
    private ActorRef receiverRef = new BasicActorRef("myactorref");
    private Collection<BrokerId> upstreamBrokers = Arrays.asList(this.broker1, this.broker2);
    private Collection<String> subscribedTopics = Arrays.asList(this.topic1, this.topic2);

    @BeforeMethod(alwaysRun = true)
    public void setUp() throws Exception {
        this.listener = new MessageTestUtil.RecordingForwardeeListener();
        this.handoverForwardee = new HandoverForwardeeReceiver(this.receiverRef, new VenueId("myvenueid"), this.upstreamBrokers, this.subscribedTopics, this.listener);
    }

    @Test
    public void testPassedThroughMessagesPreservesAllFields() throws Exception {
        MessageTestUtil.Message newMessage = MessageTestUtil.newMessage(1, 99);
        MessageTestUtil.Message newMessage2 = MessageTestUtil.newMessage(2, 99);
        onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(this.broker2, this.sender1, newMessage));
        onForwardedMessage(MessageTestUtil.newForwardedSubscriptionMessage(this.broker2, this.sender1, "topic1", newMessage2));
        Assert.assertEquals(this.listener.msgsReceived, Arrays.asList(Arrays.asList(this.broker2, this.sender1, newMessage), Arrays.asList(this.broker2, this.sender1, "topic1", newMessage2)));
    }

    @Test
    public void testDetectsForwardingCompletion() throws Exception {
        int i = 0;
        int size = this.upstreamBrokers.size() * (this.subscribedTopics.size() + 1);
        for (BrokerId brokerId : this.upstreamBrokers) {
            Iterator<String> it = this.subscribedTopics.iterator();
            while (it.hasNext()) {
                onForwardedMessage(MessageTestUtil.newForwardedSubscriptionMessage(brokerId, null, it.next(), MessageTestUtil.newEmptyLastToSubscriber()));
                i++;
                Assert.assertEquals(this.listener.forwardingComplete.get(), i == size, "count=" + i);
            }
            onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(brokerId, null, MessageTestUtil.newEmptyLastToSubscriber()));
            i++;
            Assert.assertEquals(this.listener.forwardingComplete.get(), i == size, "count=" + i);
        }
    }

    @Test
    public void testNewStreamPassedThroughWhenLastToSubscriberArrivesFirst() throws Exception {
        MessageTestUtil.Message newEmptyLastToSubscriber = MessageTestUtil.newEmptyLastToSubscriber();
        MessageTestUtil.Message newMessage = MessageTestUtil.newMessage(1, 2);
        onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(this.broker1, this.sender1, newEmptyLastToSubscriber));
        onDirectMessage(this.broker1, newMessage);
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1));
    }

    @Test
    public void testForwardedMessagesPassedThroughWhenArriveFirst() throws Exception {
        onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(this.broker1, this.sender1, MessageTestUtil.newMessage(1, 1)));
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1));
    }

    @Test
    public void testBuffersNewStreamUntilLastToSubscriberForwarded() throws Exception {
        MessageTestUtil.Message newEmptyLastToSubscriber = MessageTestUtil.newEmptyLastToSubscriber();
        MessageTestUtil.Message newEmptyLastToSubscriber2 = MessageTestUtil.newEmptyLastToSubscriber();
        MessageTestUtil.Message newMessage = MessageTestUtil.newMessage(1, 1);
        MessageTestUtil.Message newMessage2 = MessageTestUtil.newMessage(2, 1);
        MessageTestUtil.Message newMessage3 = MessageTestUtil.newMessage(3, 1);
        onDirectMessage(this.broker1, newMessage);
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(new Object[0]));
        onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(this.broker1, this.sender1, newEmptyLastToSubscriber));
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1));
        onDirectMessage(this.broker1, newMessage2);
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1, 2));
        onPublishedMessage(this.broker1, "topic1", newMessage3);
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1, 2));
        onForwardedMessage(MessageTestUtil.newForwardedSubscriptionMessage(this.broker1, this.sender1, "topic1", newEmptyLastToSubscriber2));
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1, 2, 3));
    }

    @Test
    public void testForwardedMessagesPassedThroughImmediatelyWhenBufferingNewStream() throws Exception {
        MessageTestUtil.Message newEmptyLastToSubscriber = MessageTestUtil.newEmptyLastToSubscriber();
        MessageTestUtil.Message newMessage = MessageTestUtil.newMessage(1, 1);
        onDirectMessage(this.broker1, MessageTestUtil.newMessage(2, 1));
        onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(this.broker1, this.sender1, newMessage));
        onForwardedMessage(MessageTestUtil.newForwardedDirectMessage(this.broker1, this.sender1, newEmptyLastToSubscriber));
        Assert.assertEquals(this.listener.payloadsReceived, Arrays.asList(1, 2));
    }

    private void onDirectMessage(BrokerId brokerId, MessageTestUtil.Message message) {
        this.handoverForwardee.onDirectMessage(brokerId, this.sender1, message.payload, message.properties);
    }

    private void onPublishedMessage(BrokerId brokerId, String str, MessageTestUtil.Message message) {
        this.handoverForwardee.onPublishedMessage(brokerId, this.sender1, str, message.payload, message.properties);
    }

    private void onForwardedMessage(ControlMessages.ForwardedMessage forwardedMessage) {
        this.handoverForwardee.onForwardedMessage(forwardedMessage);
    }
}
