package com.cloudsoftcorp.monterey.network.pubsub;

import com.cloudsoftcorp.monterey.comms.api.Message;
import com.cloudsoftcorp.monterey.control.api.SegmentSummary;
import com.cloudsoftcorp.monterey.control.workrate.api.WorkrateItem;
import com.cloudsoftcorp.monterey.control.workrate.api.WorkrateReport;
import com.cloudsoftcorp.monterey.network.basic.Dmn1MessageFactory;
import com.cloudsoftcorp.monterey.network.basic.DmnLoggers;
import com.cloudsoftcorp.monterey.network.control.api.Dmn1NodeType;
import com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor;
import com.cloudsoftcorp.monterey.network.m.AbstractMediationWorkrateItem;
import com.cloudsoftcorp.monterey.network.m.MetricHandoverState;
import com.cloudsoftcorp.monterey.network.m.SegmentProcessorState;
import com.cloudsoftcorp.monterey.network.resilience.lossless.SourceId;
import com.cloudsoftcorp.monterey.node.api.MessageProcessor;
import com.cloudsoftcorp.monterey.node.api.Node;
import com.cloudsoftcorp.monterey.node.api.NodeCommunications;
import com.cloudsoftcorp.monterey.node.api.NodeId;
import com.cloudsoftcorp.monterey.node.api.NodeLoggers;
import com.cloudsoftcorp.monterey.node.api.NodePubSubCommunications;
import com.cloudsoftcorp.monterey.node.api.PropertiesContext;
import com.cloudsoftcorp.monterey.node.basic.BasicControlMessageFactory;
import com.cloudsoftcorp.monterey.node.basic.BasicNode;
import com.cloudsoftcorp.util.Loggers;
import com.cloudsoftcorp.util.exception.ExceptionUtils;
import com.cloudsoftcorp.util.exception.WorkInProgressException;
import com.cloudsoftcorp.util.javalang.ReflectionUtils;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.mortbay.jetty.HttpVersions;

/* loaded from: input_file:com/cloudsoftcorp/monterey/network/pubsub/MediationNodeProcessorForPubSub.class */
public class MediationNodeProcessorForPubSub extends AbstractMediationNodeProcessor {
    private static final Logger LOG = Loggers.getLogger(MediationNodeProcessorForPubSub.class);
    private static final String MARK_TRANSITION_M_BEGIN = "TRANSITION M BEGIN";
    private static final String MARK_TRANSITION_M_END = "TRANSITION M END";
    private final AtomicInteger segmentsInTransition = new AtomicInteger();
    private SimplePubSubCommunications pubSubComms;
    private List<NodeId> upstreamRouters;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/network/pubsub/MediationNodeProcessorForPubSub$HandlerForClientProcessorPubSub.class */
    public class HandlerForClientProcessorPubSub extends AbstractMediationNodeProcessor.HandlerForClientProcessor {
        private String activeHandoverUid;

        public HandlerForClientProcessorPubSub(SegmentProcessorState segmentProcessorState) {
            super(segmentProcessorState);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void setActiveHandover(String str) {
            this.activeHandoverUid = str;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public String getActiveHandoverId() {
            return this.activeHandoverUid;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.HandlerForClientProcessor, com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void processMediationRequest(Message message) {
            if (DmnLoggers.MESSAGE_DURING_TRANSITION.isLoggable(Level.FINE) && MediationNodeProcessorForPubSub.this.segmentsInTransition.get() > 0) {
                DmnLoggers.MESSAGE_DURING_TRANSITION.fine("TRANSITION NEAR ON " + this.segmentName + " AT " + MediationNodeProcessorForPubSub.this.node.getAddress() + " FOR " + message.getHeader(Dmn1MessageFactory.USER_HEADER));
            }
            if (DmnLoggers.EVENTSTAMP.isLoggable(Level.FINE) && MediationNodeProcessorForPubSub.this.segmentsInTransition.get() > 0) {
                String header = message.getHeader(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY);
                message.getEnvelope().replaceAtEnd(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY, (header != null ? header + "," : HttpVersions.HTTP_0_9) + "MNEAR");
            }
            super.processMediationRequest(message);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.HandlerForClientProcessor
        public void sendDownstream(Message message) {
            if (this.activeHandoverUid != null) {
                message.addHeader("handover.uid", this.activeHandoverUid);
            }
            sendDownstreamImpl(message);
        }

        private void sendDownstreamImpl(Message message) {
            if (Dmn1MessageFactory.MEDIATION_BROADCAST_MESSAGE_TYPE.equals(Dmn1MessageFactory.INSTANCE.getType(message))) {
                MediationNodeProcessorForPubSub.this.pubSubComms.publish("LPP.broadcast", message);
                return;
            }
            String header = message.getHeader(Dmn1MessageFactory.DESTINATION_ADDRESS_HEADER);
            if (header != null) {
                MediationNodeProcessorForPubSub.this.pubSubComms.publish("LPP.private." + ((SourceId) MediationNodeProcessorForPubSub.this.instantiate(header)), message);
            } else {
                String str = "Required DestinationAddress header missing from message: " + message;
                NodeLoggers.MESSAGES.warning(str);
                MediationNodeProcessorForPubSub.this.node.error(str);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/network/pubsub/MediationNodeProcessorForPubSub$HandlerForHandoverAtNewMediator.class */
    public class HandlerForHandoverAtNewMediator extends AbstractMediationNodeProcessor.MediationSegmentHandler {
        private final String uid;
        private final String segment;
        private final NodeId mOld;
        private final NodeId mNew;
        private final AbstractMediationNodeProcessor.HandlerForClientProcessor newClientHandler;
        private final Collection<NodeId> upstreamRouters;
        private final Map<NodeId, List<Message>> queuedMessages;
        private final Set<NodeId> uncompletedUpstreamRouters;
        private final Set<NodeId> forwardedUpstreamRouters;
        static final /* synthetic */ boolean $assertionsDisabled;

        public HandlerForHandoverAtNewMediator(String str, String str2, NodeId nodeId, AbstractMediationNodeProcessor.HandlerForClientProcessor handlerForClientProcessor, Collection<NodeId> collection) {
            super();
            this.uid = str;
            this.segment = str2;
            this.mOld = nodeId;
            this.mNew = MediationNodeProcessorForPubSub.this.node.getAddress();
            this.newClientHandler = handlerForClientProcessor;
            this.upstreamRouters = collection;
            this.uncompletedUpstreamRouters = new HashSet(collection);
            this.forwardedUpstreamRouters = new HashSet(collection);
            this.queuedMessages = new HashMap(collection.size());
            Iterator<NodeId> it = collection.iterator();
            while (it.hasNext()) {
                this.queuedMessages.put(it.next(), new ArrayList(0));
            }
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public String getSegmentId() {
            return this.segment;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public boolean isReallyHere() {
            return true;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public boolean isArriving() {
            return true;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public boolean isDeparting() {
            return false;
        }

        public void doHandover() {
            MediationNodeProcessorForPubSub.this.segmentsInTransition.incrementAndGet();
            this.newClientHandler.setActiveHandover(this.uid);
            MediationNodeProcessorForPubSub.this.segmentsHere.put(this.segment, this);
            MediationNodeProcessorForPubSub.this.pubSubComms.addUpstreamRouters(this.upstreamRouters);
            MediationNodeProcessorForPubSub.this.pubSubComms.addSubscriptions("segments." + this.segment);
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendControlMessage(Dmn1MessageFactory.INSTANCE.newSegmentArrivedStatusMessage(this.segment, this.mOld, this.mNew), NodeCommunications.ControlDestination.MONITOR);
            MediationNodeProcessorForPubSub.this.pubSubComms.publish("LPP.broadcast", Dmn1MessageFactory.INSTANCE.newMediationSegmentHandoverDownstreamFirstFromNew(new BasicControlMessageFactory.TransitionId(this.uid), this.uid, this.segment));
            for (NodeId nodeId : this.upstreamRouters) {
                MediationNodeProcessorForPubSub.this.pubSubComms.publish("segments." + this.segment, nodeId, Dmn1MessageFactory.INSTANCE.newMediationSegmentHandoverCompleteForUpstreamRouter(new BasicControlMessageFactory.TransitionId(this.uid), this.uid, this.segment, nodeId));
            }
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void processMediationRequest(Message message) {
            String type = Dmn1MessageFactory.INSTANCE.getType(message);
            String header = message.getHeader(Dmn1MessageFactory.ROUTED_BY_PROPERTY);
            NodeId nodeId = header != null ? (NodeId) MediationNodeProcessorForPubSub.this.instantiate(header) : null;
            boolean parseBoolean = Boolean.parseBoolean(message.getHeader(Dmn1MessageFactory.FORWARDED_PROPERTY));
            if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                MediationNodeProcessorForPubSub.LOG.fine("Received at node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; upstreamRouter=" + nodeId + "; forwarded=" + parseBoolean + "; type=" + type + "; msg=" + message + "; forwardedUpstreamRouters=" + this.forwardedUpstreamRouters + "; uncompletedUpstreamRouters=" + this.uncompletedUpstreamRouters);
            }
            if (!Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_DOWNSTREAM_LAST_FROM_OLD_M_MESSAGE_TYPE.equals(type) && !Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_DOWNSTREAM_FIRST_FROM_NEW_M_MESSAGE_TYPE.equals(type)) {
                if (parseBoolean) {
                    if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_COMPLETE_FOR_UPSTREAM_ROUTER.equals(type)) {
                        if (!this.forwardedUpstreamRouters.contains(nodeId)) {
                            MediationNodeProcessorForPubSub.LOG.warning("Received forwarded message from " + nodeId + "; type=" + type + "; msg=" + message + "; forwarders=" + this.forwardedUpstreamRouters);
                        }
                        Iterator<Message> it = this.queuedMessages.get(nodeId).iterator();
                        while (it.hasNext()) {
                            passthroughMessage(null, it.next());
                        }
                        this.queuedMessages.remove(nodeId);
                        this.forwardedUpstreamRouters.remove(nodeId);
                        if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                            MediationNodeProcessorForPubSub.LOG.fine("Last forwarded for router: node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; upstreamRouter=" + nodeId + "; forwardedUpstreamRouters=" + this.forwardedUpstreamRouters + "; uncompletedUpstreamRouters=" + this.uncompletedUpstreamRouters);
                        }
                    } else {
                        passthroughMessage(nodeId, message);
                    }
                } else if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_COMPLETE_FOR_UPSTREAM_ROUTER.equals(type)) {
                    this.uncompletedUpstreamRouters.remove(nodeId);
                    if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                        MediationNodeProcessorForPubSub.LOG.fine("Completed for router: node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; upstreamRouter=" + nodeId + "; forwardedUpstreamRouters=" + this.forwardedUpstreamRouters + "; uncompletedUpstreamRouters=" + this.uncompletedUpstreamRouters);
                    }
                } else if (!this.uncompletedUpstreamRouters.contains(nodeId)) {
                    if (this.forwardedUpstreamRouters.contains(nodeId)) {
                        queueMessage(nodeId, message);
                    } else {
                        passthroughMessage(nodeId, message);
                    }
                }
            }
            if (isComplete()) {
                completed();
            }
        }

        private boolean isComplete() {
            return this.uncompletedUpstreamRouters.isEmpty() && this.forwardedUpstreamRouters.isEmpty();
        }

        private void completed() {
            MediationNodeProcessorForPubSub.this.segmentsInTransition.decrementAndGet();
            if (DmnLoggers.TRANSITIONS.isLoggable(Level.FINE)) {
                DmnLoggers.TRANSITIONS.fine("TRANSITION END FOR " + this.segment + " AT NEW NODE " + this.uid);
            }
            MediationNodeProcessorForPubSub.this.recorder.markTransition("TRANSITION M END NEW " + this.uid + " " + this.segment, Level.FINE, this.uid);
            AbstractMediationNodeProcessor.MediationSegmentHandler mediationSegmentHandler = (AbstractMediationNodeProcessor.MediationSegmentHandler) MediationNodeProcessorForPubSub.this.segmentsHere.put(this.segment, this.newClientHandler);
            if (!$assertionsDisabled && !equals(mediationSegmentHandler)) {
                throw new AssertionError();
            }
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendControlMessage(Dmn1MessageFactory.INSTANCE.newMediationSegmentHandoverCrossStreamCompleteAtNew(new BasicControlMessageFactory.TransitionId(this.uid), this.uid, this.segment, this.mNew), NodeCommunications.ControlDestination.MONITOR);
        }

        private void queueMessage(NodeId nodeId, Message message) {
            if (DmnLoggers.MESSAGE_DURING_TRANSITION.isLoggable(Level.FINE)) {
                DmnLoggers.MESSAGE_DURING_TRANSITION.fine("TRANSITION MQUEUE ON " + this.segment + " AT " + this.uid + " FOR " + message.getHeader(Dmn1MessageFactory.USER_HEADER) + " FROM " + nodeId);
            }
            if (DmnLoggers.EVENTSTAMP.isLoggable(Level.FINE)) {
                String header = message.getHeader(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY);
                message.getEnvelope().replaceAtEnd(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY, (header != null ? header + "," : HttpVersions.HTTP_0_9) + "MQUEUE");
            }
            this.queuedMessages.get(nodeId).add(message);
        }

        private void passthroughMessage(NodeId nodeId, Message message) {
            if (DmnLoggers.MESSAGE_DURING_TRANSITION.isLoggable(Level.FINE)) {
                DmnLoggers.MESSAGE_DURING_TRANSITION.fine("TRANSITION P-THRU ON " + this.segment + " AT " + this.uid + " FOR " + message.getHeader(Dmn1MessageFactory.USER_HEADER) + " FROM " + (nodeId != null ? nodeId : "M-OLD"));
            }
            if (DmnLoggers.EVENTSTAMP.isLoggable(Level.FINE)) {
                String header = message.getHeader(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY);
                message.getEnvelope().replaceAtEnd(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY, (header != null ? header + "," : HttpVersions.HTTP_0_9) + "P-THRU");
            }
            this.newClientHandler.processMediationRequest(message);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public AbstractMediationWorkrateItem.BasicSegmentWorkrateItem peekStats() {
            return this.newClientHandler.peekStats();
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public AbstractMediationNodeProcessor.MediationWorkrateStats contributeStats(WorkrateReport workrateReport, boolean z) {
            return this.newClientHandler.contributeStats(workrateReport, z);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void contributeAppMetrics(WorkrateReport workrateReport) {
            this.newClientHandler.contributeAppMetrics(workrateReport);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public Collection<WorkrateItem> peekAppMetrics() {
            return this.newClientHandler.peekAppMetrics();
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void setActiveHandover(String str) {
            throw new IllegalStateException("active handover should not apply to segment moving in");
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public String getActiveHandoverId() {
            return this.newClientHandler.getActiveHandoverId();
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public Serializable shutdown() {
            throw new IllegalStateException("attempt to shutdown a handover");
        }

        static {
            $assertionsDisabled = !MediationNodeProcessorForPubSub.class.desiredAssertionStatus();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/network/pubsub/MediationNodeProcessorForPubSub$HandlerForHandoverAtOldMediator.class */
    public class HandlerForHandoverAtOldMediator extends AbstractMediationNodeProcessor.MediationSegmentHandler {
        private final String uid;
        private final NodeId mNew;
        private final NodeId mOld;
        private final String segment;
        private final Collection<NodeId> upstreamRouters;
        private final Collection<NodeId> uncompletedUpstreamRouters;
        private final AbstractMediationNodeProcessor.HandlerForClientProcessor oldHandler;
        static final /* synthetic */ boolean $assertionsDisabled;

        HandlerForHandoverAtOldMediator(String str, NodeId nodeId, String str2, Collection<NodeId> collection) {
            super();
            this.uid = str;
            this.mNew = nodeId;
            this.mOld = MediationNodeProcessorForPubSub.this.node.getAddress();
            this.segment = str2;
            this.upstreamRouters = collection;
            this.uncompletedUpstreamRouters = new HashSet(collection);
            synchronized (MediationNodeProcessorForPubSub.this.segmentsHere) {
                if (!(MediationNodeProcessorForPubSub.this.segmentsHere.get(str2) instanceof AbstractMediationNodeProcessor.HandlerForClientProcessor)) {
                    throw new IllegalStateException("Attempt to migrate segment when not mastered here: " + toString());
                }
                this.oldHandler = (AbstractMediationNodeProcessor.HandlerForClientProcessor) MediationNodeProcessorForPubSub.this.segmentsHere.get(str2);
            }
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public String getSegmentId() {
            return this.segment;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public boolean isReallyHere() {
            return false;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public boolean isArriving() {
            return false;
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public boolean isDeparting() {
            return true;
        }

        public void doHandover() {
            MediationNodeProcessorForPubSub.this.segmentsInTransition.incrementAndGet();
            SegmentSummary segmentSummary = this.oldHandler.getSegmentSummary();
            Serializable shutdown = this.oldHandler.shutdown();
            MediationNodeProcessorForPubSub.this.resilience.onStoppedMasteringSegment(this.segment);
            MetricHandoverState metricHandoverState = this.oldHandler.getMetricHandoverState();
            MediationNodeProcessorForPubSub.this.segmentsHere.put(this.segment, this);
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendControlMessage(Dmn1MessageFactory.INSTANCE.newSegmentDepartedStatusMessage(this.segment, MediationNodeProcessorForPubSub.this.node.getAddress(), this.mNew), NodeCommunications.ControlDestination.MONITOR);
            MediationNodeProcessorForPubSub.this.pubSubComms.publish("LPP.broadcast", Dmn1MessageFactory.INSTANCE.newMediationSegmentHandoverDownstreamLastFromOld(new BasicControlMessageFactory.TransitionId(this.uid), this.uid, this.segment));
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendMessage(Dmn1MessageFactory.INSTANCE.newMediationSegmentHandoverPrepareForAcceptingSegment(new BasicControlMessageFactory.TransitionId(this.uid), this.mOld, this.mNew, this.upstreamRouters, shutdown, metricHandoverState, segmentSummary), this.mNew);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void processMediationRequest(Message message) {
            String type = Dmn1MessageFactory.INSTANCE.getType(message);
            String header = message.getHeader(Dmn1MessageFactory.ROUTED_BY_PROPERTY);
            NodeId nodeId = header != null ? (NodeId) MediationNodeProcessorForPubSub.this.instantiate(header) : null;
            if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                MediationNodeProcessorForPubSub.LOG.fine("Received at node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; upstreamRouter=" + nodeId + "; type=" + type + "; msg=" + message + "; uncompletedUpstreamRouters=" + this.uncompletedUpstreamRouters);
            }
            if (nodeId == null) {
                String str = "Unexpected message received by old mediator during migration, not from an upstream router: node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; segment=" + this.segment + "; msg=" + message;
                NodeLoggers.MESSAGES.warning(str);
                MediationNodeProcessorForPubSub.this.node.error(str);
                return;
            }
            if (this.uncompletedUpstreamRouters.contains(nodeId)) {
                forwardMessage(message);
                if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_COMPLETE_FOR_UPSTREAM_ROUTER.equals(type)) {
                    if (!$assertionsDisabled && !this.uid.equals(MediationNodeProcessorForPubSub.this.instantiateProperties(message).getProperty("handover.uid"))) {
                        throw new AssertionError();
                    }
                    if (!$assertionsDisabled && !this.segment.equals(MediationNodeProcessorForPubSub.this.instantiateProperties(message).getProperty(Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_PROPERTY_SEGMENT))) {
                        throw new AssertionError();
                    }
                    if (!$assertionsDisabled && !nodeId.equals(MediationNodeProcessorForPubSub.this.instantiateProperty(MediationNodeProcessorForPubSub.this.instantiateProperties(message), Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_PROPERTY_UPSTREAM_ROUTER))) {
                        throw new AssertionError();
                    }
                    this.uncompletedUpstreamRouters.remove(nodeId);
                    MediationNodeProcessorForPubSub.this.pubSubComms.removeSubscriptions("segments." + this.segment, nodeId);
                    if (this.uncompletedUpstreamRouters.isEmpty()) {
                        if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                            MediationNodeProcessorForPubSub.LOG.fine("All upstream routers complete at node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; segment=" + this.segment + "; uid=" + this.uid);
                        }
                        completed();
                    } else if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                        MediationNodeProcessorForPubSub.LOG.fine("Upstream router complete at node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; segment=" + this.segment + "; uid=" + this.uid + "; router=" + nodeId + "; uncompleted=" + this.uncompletedUpstreamRouters);
                    }
                }
            }
        }

        private void forwardMessage(Message message) {
            if (DmnLoggers.TRANSITIONS.isLoggable(Level.FINE)) {
                DmnLoggers.TRANSITIONS.fine("TRANSITION HIT-FORWARD ON " + this.segment + " AT " + this.uid + " FOR " + message.getHeader(Dmn1MessageFactory.USER_HEADER));
            }
            if (DmnLoggers.EVENTSTAMP.isLoggable(Level.FINE)) {
                String header = message.getHeader(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY);
                message.getEnvelope().replaceAtEnd(BasicControlMessageFactory.TRACER_EVENTSTAMP_PROPERTY, (header != null ? header + "," : HttpVersions.HTTP_0_9) + "HIT-FORWARD");
            }
            message.getEnvelope().add(Dmn1MessageFactory.FORWARDED_PROPERTY, Boolean.TRUE.toString());
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendMessage(message, this.mNew);
        }

        private void completed() {
            if (DmnLoggers.TRANSITIONS.isLoggable(Level.FINE)) {
                DmnLoggers.TRANSITIONS.fine("TRANSITION END FOR " + this.segment + " AT OLD NODE " + MediationNodeProcessorForPubSub.this.node.getAddress() + ", " + this.uid);
            }
            DmnLoggers.TRANSITIONS.fine("trans " + this.uid + " segment handover complete at mediator: segment=" + this.segment + "; old=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; new=" + this.mNew);
            MediationNodeProcessorForPubSub.this.recorder.markTransition("TRANSITION M END OLD " + this.uid + " " + this.segment, Level.FINE, this.uid);
            MediationNodeProcessorForPubSub.this.segmentsInTransition.decrementAndGet();
            AbstractMediationNodeProcessor.MediationSegmentHandler mediationSegmentHandler = (AbstractMediationNodeProcessor.MediationSegmentHandler) MediationNodeProcessorForPubSub.this.segmentsHere.put(this.segment, new AbstractMediationNodeProcessor.WorkrateHolderOnly(this.segment, this.oldHandler));
            if (!$assertionsDisabled && !equals(mediationSegmentHandler)) {
                throw new AssertionError();
            }
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendControlMessage(Dmn1MessageFactory.INSTANCE.newMediationSegmentHandoverCrossStreamCompleteAtOld(new BasicControlMessageFactory.TransitionId(this.uid), this.uid, this.segment, this.mOld), NodeCommunications.ControlDestination.MONITOR);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public AbstractMediationWorkrateItem.BasicSegmentWorkrateItem peekStats() {
            return this.oldHandler.peekStats();
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public AbstractMediationNodeProcessor.MediationWorkrateStats contributeStats(WorkrateReport workrateReport, boolean z) {
            return this.oldHandler.contributeStats(workrateReport, false);
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void contributeAppMetrics(WorkrateReport workrateReport) {
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public Collection<WorkrateItem> peekAppMetrics() {
            return Collections.emptyList();
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public void setActiveHandover(String str) {
            throw new IllegalStateException("active handover should not apply to segment moving out");
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public String getActiveHandoverId() {
            throw new IllegalStateException("active handover should not apply to segment moving out");
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.MediationSegmentHandler
        public Serializable shutdown() {
            throw new IllegalStateException("attempt to shutdown a handover");
        }

        public String toString() {
            return "MediationSegmentHandover/OldM[" + this.uid + "; " + this.segment + "; " + this.mOld + "->" + this.mNew + "]";
        }

        static {
            $assertionsDisabled = !MediationNodeProcessorForPubSub.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:com/cloudsoftcorp/monterey/network/pubsub/MediationNodeProcessorForPubSub$InitProcessorForPubSub.class */
    public class InitProcessorForPubSub extends AbstractMediationNodeProcessor.InitProcessor implements MessageProcessor.ControlMessageProcessor {
        public InitProcessorForPubSub() {
            super();
        }

        @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor.InitProcessor
        protected void doInit(PropertiesContext propertiesContext) {
            MediationNodeProcessorForPubSub.this.upstreamRouters = new ArrayList();
            Iterator<String> it = propertiesContext.getProperties().getAll("upstreamRouterAddress").iterator();
            while (it.hasNext()) {
                MediationNodeProcessorForPubSub.this.upstreamRouters.add((NodeId) MediationNodeProcessorForPubSub.this.instantiate(it.next()));
            }
            MediationNodeProcessorForPubSub.this.pubSubComms = new SimplePubSubCommunications((NodePubSubCommunications) MediationNodeProcessorForPubSub.this.node.getCommunications());
            MediationNodeProcessorForPubSub.this.pubSubComms.setDefaultDownstreamRouter(MediationNodeProcessorForPubSub.this.getDownstreamRouter());
            MediationNodeProcessorForPubSub.this.pubSubComms.addUpstreamRouters((NodeId[]) MediationNodeProcessorForPubSub.this.upstreamRouters.toArray(new NodeId[MediationNodeProcessorForPubSub.this.upstreamRouters.size()]));
            MediationNodeProcessorForPubSub.this.node.getCommunications().sendControlMessage(Dmn1MessageFactory.INSTANCE.newNodeTypeGainedStatusMessage(Dmn1NodeType.M, MediationNodeProcessorForPubSub.this.node.getAddress()), NodeCommunications.ControlDestination.MONITOR);
        }
    }

    /* loaded from: input_file:com/cloudsoftcorp/monterey/network/pubsub/MediationNodeProcessorForPubSub$SegmentHandoverProcessor.class */
    private class SegmentHandoverProcessor implements MessageProcessor.ControlMessageProcessor {
        private SegmentHandoverProcessor() {
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public boolean acceptsMessage(Message message) {
            String type = Dmn1MessageFactory.INSTANCE.getType(message);
            return type != null && type.startsWith(Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_PREFIX);
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(Message message) {
            String type = Dmn1MessageFactory.INSTANCE.getType(message);
            if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_BEGIN_MESSAGE_TYPE.equals(type)) {
                onHandoverBeginAtOld(message);
                return;
            }
            if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_STATE_MESSAGE_TYPE.equals(type)) {
                onHandoverBeginAtNew(message);
                return;
            }
            if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_COMPLETE_FOR_UPSTREAM_ROUTER.equals(type)) {
                MediationNodeProcessorForPubSub.this.processMessage(message);
            } else {
                if (Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_DOWNSTREAM_LAST_FROM_OLD_M_MESSAGE_TYPE.equals(type) || Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_DOWNSTREAM_FIRST_FROM_NEW_M_MESSAGE_TYPE.equals(type)) {
                    return;
                }
                MediationNodeProcessorForPubSub.this.node.error("unexpected message type at " + MediationNodeProcessorForPubSub.this.node + ": " + message);
            }
        }

        private void onHandoverBeginAtOld(Message message) {
            PropertiesContext instantiateProperties = MediationNodeProcessorForPubSub.this.instantiateProperties(message);
            String property = instantiateProperties.getProperty("handover.uid");
            String property2 = instantiateProperties.getProperty(Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_PROPERTY_SEGMENT);
            NodeId nodeId = (NodeId) MediationNodeProcessorForPubSub.this.instantiateProperty(instantiateProperties, Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_PROPERTY_M_OLD);
            NodeId nodeId2 = (NodeId) MediationNodeProcessorForPubSub.this.instantiateProperty(instantiateProperties, Dmn1MessageFactory.MEDIATION_SEGMENT_HANDOVER_PROPERTY_M_NEW);
            AbstractMediationNodeProcessor.MediationSegmentHandler mediationSegmentHandler = (AbstractMediationNodeProcessor.MediationSegmentHandler) MediationNodeProcessorForPubSub.this.segmentsHere.get(property2);
            Collection<NodeId> upstreamRouters = MediationNodeProcessorForPubSub.this.pubSubComms.getUpstreamRouters();
            if (MediationNodeProcessorForPubSub.LOG.isLoggable(Level.FINE)) {
                MediationNodeProcessorForPubSub.LOG.fine("Segment handover request: node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; segment=" + property2 + "; old=" + nodeId + "; new=" + nodeId2 + "; uid=" + property);
            }
            if (!MediationNodeProcessorForPubSub.this.node.getAddress().equals(nodeId)) {
                throw new IllegalStateException("Invalid segment handover request - old address does not match: segment=" + property2 + "; node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; old=" + nodeId + "; new=" + nodeId2 + "; uid=" + property);
            }
            if (mediationSegmentHandler == null || (mediationSegmentHandler instanceof AbstractMediationNodeProcessor.WorkrateHolderOnly)) {
                throw new IllegalStateException("Invalid segment handover request - segment not at node " + MediationNodeProcessorForPubSub.this.node.getAddress() + "; cannot handover: segment=" + property2 + "; new=" + nodeId2 + "; uid=" + property);
            }
            if (mediationSegmentHandler instanceof HandlerForHandoverAtOldMediator) {
                throw new IllegalStateException("Invalid segment handover request - segment already migrating " + MediationNodeProcessorForPubSub.this.node.getAddress() + "->" + ((HandlerForHandoverAtOldMediator) mediationSegmentHandler).mNew + ": segment=" + property2 + "; new=" + nodeId2 + "; uid=" + property);
            }
            if (mediationSegmentHandler instanceof HandlerForHandoverAtNewMediator) {
                throw new IllegalStateException("Invalid segment handover request - segment still incoming " + ((HandlerForHandoverAtNewMediator) mediationSegmentHandler).mOld + "->" + MediationNodeProcessorForPubSub.this.node.getAddress() + ": segment=" + property2 + "; new=" + nodeId2 + "; uid=" + property);
            }
            if (!(mediationSegmentHandler instanceof HandlerForClientProcessorPubSub)) {
                throw new IllegalStateException("Invalid segment handover request - unexpected handler type " + mediationSegmentHandler.getClass() + " (" + mediationSegmentHandler + "):: segment=" + property2 + "; new=" + nodeId2 + "; uid=" + property);
            }
            if (DmnLoggers.TRANSITIONS.isLoggable(Level.FINE)) {
                DmnLoggers.TRANSITIONS.fine("TRANSITION BEGIN AT OLD NODE: uid=" + property + "; segment=" + property2 + "; old=" + nodeId + "; new=" + nodeId2);
            }
            MediationNodeProcessorForPubSub.this.recorder.markTransition("TRANSITION M BEGIN OLD " + property + " " + property2, Level.FINE, property);
            new HandlerForHandoverAtOldMediator(property, nodeId2, property2, upstreamRouters).doHandover();
        }

        protected void onHandoverBeginAtNew(Message message) {
            Dmn1MessageFactory.StateTransferRecord stateTransferRecord = (Dmn1MessageFactory.StateTransferRecord) MediationNodeProcessorForPubSub.this.instantiatePayload(message);
            int length = message.getPayload().getLength();
            String id = stateTransferRecord.getId();
            String uid = stateTransferRecord.getSegmentSummary().getUid();
            NodeId sourceNode = stateTransferRecord.getSourceNode();
            NodeId destinationNode = stateTransferRecord.getDestinationNode();
            SegmentProcessorState handoverState = SegmentProcessorState.handoverState(stateTransferRecord);
            AbstractMediationNodeProcessor.MediationSegmentHandler mediationSegmentHandler = (AbstractMediationNodeProcessor.MediationSegmentHandler) MediationNodeProcessorForPubSub.this.segmentsHere.get(uid);
            Collection<NodeId> upstreamRouters = stateTransferRecord.getUpstreamRouters();
            if (!MediationNodeProcessorForPubSub.this.node.getAddress().equals(destinationNode)) {
                throw new IllegalStateException("Invalid segment handover request - new address does not match: segment=" + uid + "; node=" + MediationNodeProcessorForPubSub.this.node.getAddress() + "; old=" + sourceNode + "; new=" + destinationNode + "; uid=" + id);
            }
            if (mediationSegmentHandler != null && !(mediationSegmentHandler instanceof AbstractMediationNodeProcessor.WorkrateHolderOnly)) {
                String str = mediationSegmentHandler instanceof HandlerForHandoverAtOldMediator ? "outbound migration " + MediationNodeProcessorForPubSub.this.node.getAddress() + "->" + ((HandlerForHandoverAtOldMediator) mediationSegmentHandler).mNew : mediationSegmentHandler instanceof HandlerForHandoverAtNewMediator ? "inbound migration " + ((HandlerForHandoverAtOldMediator) mediationSegmentHandler).mNew + "->" + MediationNodeProcessorForPubSub.this.node.getAddress() : mediationSegmentHandler instanceof HandlerForClientProcessorPubSub ? null : "unexpected handler type " + mediationSegmentHandler.getClass();
                throw new IllegalStateException("Invalid segment handover request - segment already at new node " + MediationNodeProcessorForPubSub.this.node.getAddress() + (str != null ? "(" + str + ")" : HttpVersions.HTTP_0_9) + ": segment=" + uid + "; old=" + sourceNode + "; new=" + destinationNode + "; uid=" + id);
            }
            if (DmnLoggers.TRANSITIONS.isLoggable(Level.FINE)) {
                DmnLoggers.TRANSITIONS.fine("TRANSITION BEGIN AT NEW NODE: uid=" + id + "; segment=" + uid + "; old=" + sourceNode + "; new=" + destinationNode + "; payloadSize=" + length + "b" + (DmnLoggers.TRANSITIONS.isLoggable(Level.FINER) ? "; upstreamRouters=" + upstreamRouters : HttpVersions.HTTP_0_9));
            }
            MediationNodeProcessorForPubSub.this.recorder.markTransition("TRANSITION M BEGIN NEW " + id + " " + uid, Level.FINE, id);
            try {
                new HandlerForHandoverAtNewMediator(id, uid, sourceNode, MediationNodeProcessorForPubSub.this.newClientProcessor(handoverState), upstreamRouters).doHandover();
            } catch (ReflectionUtils.ReflectionNotFoundException e) {
                throw ExceptionUtils.throwRuntime(e);
            }
        }
    }

    @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor, com.cloudsoftcorp.monterey.node.api.NodeAttachable
    public void preAttach(Node node) {
        super.preAttach(node);
        ((BasicNode) node).addProcessor(new SegmentHandoverProcessor());
    }

    @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor, com.cloudsoftcorp.monterey.node.api.NodeAttachable
    public void postDetach(Node node) {
        super.postDetach(node);
        if (this.pubSubComms != null) {
            this.pubSubComms.close();
        }
    }

    @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor
    public void addSegments(Collection<SegmentSummary> collection) {
        ArrayList arrayList = new ArrayList(collection.size());
        Iterator<SegmentSummary> it = collection.iterator();
        while (it.hasNext()) {
            arrayList.add("segments." + it.next().getUid());
        }
        this.pubSubComms.addSubscriptions(arrayList);
        super.addSegments(collection);
    }

    @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor
    protected AbstractMediationNodeProcessor.HandlerForClientProcessor newClientProcessor(SegmentProcessorState segmentProcessorState) throws ReflectionUtils.ReflectionNotFoundException {
        return new HandlerForClientProcessorPubSub(segmentProcessorState);
    }

    @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor
    protected AbstractMediationNodeProcessor.InitProcessor newInitProcessor() {
        return new InitProcessorForPubSub();
    }

    @Override // com.cloudsoftcorp.monterey.network.m.AbstractMediationNodeProcessor
    public void becomeMasterImmediately(String str, SegmentSummary segmentSummary) {
        throw new WorkInProgressException();
    }
}
