package com.cloudsoftcorp.monterey.node.basic;

import com.cloudsoftcorp.monterey.comms.api.CommsException;
import com.cloudsoftcorp.monterey.comms.api.Communications;
import com.cloudsoftcorp.monterey.comms.api.Message;
import com.cloudsoftcorp.monterey.control.basic.CloudsoftSystemProperties;
import com.cloudsoftcorp.monterey.control.workrate.api.WorkrateContributor;
import com.cloudsoftcorp.monterey.control.workrate.api.WorkrateItem;
import com.cloudsoftcorp.monterey.control.workrate.api.WorkrateReport;
import com.cloudsoftcorp.monterey.control.workrate.basic.BasicNodeCapacityItem;
import com.cloudsoftcorp.monterey.control.workrate.basic.MachineLoadReporter;
import com.cloudsoftcorp.monterey.control.workrate.basic.WorkrateProcessor;
import com.cloudsoftcorp.monterey.node.api.MessageProcessor;
import com.cloudsoftcorp.monterey.node.api.Node;
import com.cloudsoftcorp.monterey.node.api.NodeAttachable;
import com.cloudsoftcorp.monterey.node.api.NodeCommunications;
import com.cloudsoftcorp.monterey.node.api.NodeErrorHandler;
import com.cloudsoftcorp.monterey.node.api.NodeId;
import com.cloudsoftcorp.monterey.node.api.NodeLoggers;
import com.cloudsoftcorp.monterey.node.api.PropertiesContext;
import com.cloudsoftcorp.monterey.node.basic.BasicControlMessageFactory;
import com.cloudsoftcorp.monterey.node.basic.NodeErrorHandlers;
import com.cloudsoftcorp.util.Loggers;
import com.cloudsoftcorp.util.TimeUtils;
import com.cloudsoftcorp.util.annotation.NonNull;
import com.cloudsoftcorp.util.collections.KeyValuePair;
import com.cloudsoftcorp.util.condition.Consumer;
import com.cloudsoftcorp.util.condition.Filter;
import com.cloudsoftcorp.util.exception.ExceptionUtils;
import com.cloudsoftcorp.util.exception.KnownRuntimeException;
import com.cloudsoftcorp.util.exception.RuntimeInterruptedException;
import com.cloudsoftcorp.util.exception.RuntimeWrappedException;
import com.cloudsoftcorp.util.javalang.ReflectionUtils;
import com.cloudsoftcorp.util.osgi.BundleManager;
import com.cloudsoftcorp.util.proc.CloudsoftThreadFactory;
import com.cloudsoftcorp.util.proc.ResourceUsageUtils;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.apache.commons.io.IOUtils;
import org.mortbay.jetty.HttpVersions;

/* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode.class */
public class BasicNode extends AbstractNode implements WorkrateContributor {
    private static final Logger LOG = Loggers.getLogger(BasicNode.class);
    private static final long MIN_TIME_BETWEEN_DUMPS = 600000;
    private static final long NODE_THREAD_SHUTDOWN_TIMEOUT = 10000;
    private static final int HEALTH_MONITOR_PERIOD = 1000;
    private MessageReader messageReader;
    private PrioritisedWorker mainWorker;
    private PrioritisedWorker replicationWorker;
    private Thread healthMonitor;
    private final List<MessageProcessor> processorRegistry;
    private final List<MessageProcessor> unqueuedProcessorRegistry;
    private final AtomicBoolean toldToStart;
    private final AtomicBoolean toldToShutdown;
    private final AtomicBoolean toldToKill;
    private final Collection<LifecycleListener> lifecycleListeners;
    private boolean emergencyDropNonControl;
    private NodeLifecyclePublisher lifecyclePublisher;
    private WorkrateProcessor workrateProcessor;
    private volatile long lastNodeDumpTime;
    private final SupersededTransitionsRecorder supersededTransitionsRecorder;
    private final BundleManager bundleManager;
    long totalProcessingTime;
    long numProcessed;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$AddProcessorProcessor.class */
    public class AddProcessorProcessor extends ControlMessageProcessorForType {
        public AddProcessorProcessor() {
            super(BasicControlMessageFactory.NODE_ADD_PROCESSOR_MESSAGE_TYPE);
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(Message message) {
            try {
                String property = BasicNode.this.getProperties().instantiateProperties(message.getPayload()).getProperty(BasicControlMessageFactory.NODE_ADD_PROCESSOR_PROPERTY);
                if (property == null) {
                    NodeLoggers.MESSAGES.warning("Ignoring null message-processor class: node=" + BasicNode.this.getAddress() + "; msg=" + message);
                }
                try {
                    BasicNode.this.addProcessor((MessageProcessor) BasicNode.this.getProperties().instantiate(property));
                } catch (ReflectionUtils.ReflectionNotFoundException e) {
                    BasicNode.this.error("Failed to instantiate message processor '" + property + "': " + e, e);
                }
            } catch (IOException e2) {
                BasicNode.this.error("Couldn't read propreties payload (" + message + "): " + e2, e2);
            } catch (ClassNotFoundException e3) {
                BasicNode.this.error("Couldn't instantiate propreties payload (" + message + "): " + e3, e3);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$DumpDiagnosticsProcessor.class */
    public class DumpDiagnosticsProcessor extends ControlMessageProcessorForType {
        public DumpDiagnosticsProcessor() {
            super(BasicControlMessageFactory.NODE_DUMP_DIAGNOSTICS_MESSAGE_TYPE);
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(Message message) {
            BasicNode.LOG.warning("Received shutdown request; shutting down node " + BasicNode.this.getAddress());
            BasicNode.this.dumpDiagnostics(Loggers.DIAGNOSTICS, Level.INFO);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$HealthCheckProcessor.class */
    public class HealthCheckProcessor extends ControlMessageProcessorForType {
        public HealthCheckProcessor() {
            super(BasicControlMessageFactory.HEALTH_CHECK_MESSAGE_TYPE);
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(Message message) {
            String header = message.getHeader("correlationId");
            if (header != null) {
                BasicNode.this.getCommunications().sendControlMessage(BasicControlMessageFactory.INSTANCE.newHealthCheckResponseMessage(header), NodeCommunications.ControlDestination.MANAGER);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$KillProcessor.class */
    public class KillProcessor extends ControlMessageProcessorForType {
        public KillProcessor() {
            super("node.kill");
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(Message message) {
            BasicNode.LOG.warning("Received kill request; killing node " + BasicNode.this.getAddress());
            BasicNode.this.kill();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$MessageReader.class */
    public class MessageReader implements Runnable {
        private final Thread thread;

        private MessageReader() {
            this.thread = CloudsoftThreadFactory.createThread(BasicNode.this.getAddress().toString() + "-message-reader", this, false);
        }

        MessageReader start() {
            synchronized (BasicNode.this.lifecyclePublisher.isOnline) {
                this.thread.start();
                try {
                    BasicNode.this.lifecyclePublisher.isOnline.wait(2000L);
                    if (!BasicNode.this.lifecyclePublisher.isOnline[0]) {
                        BasicNode.LOG.warning("node " + BasicNode.this.getAddress() + " did not emit online notification within 2s (may continue to run in background)");
                    }
                } catch (InterruptedException e) {
                    BasicNode.LOG.fine("node " + BasicNode.this.getAddress() + " online notification interrupted: " + e);
                }
            }
            return this;
        }

        void shutdown(long j) throws InterruptedException {
            this.thread.interrupt();
            if (awaitTermination(j)) {
                return;
            }
            BasicNode.LOG.warning("Node " + BasicNode.this.getAddress() + " failed to shutdown message-reader within " + TimeUtils.makeTimeString(j) + "; continuing; " + Arrays.toString(this.thread.getStackTrace()));
        }

        boolean awaitTermination(long j) throws InterruptedException {
            if (j >= 0) {
                this.thread.join(j);
            }
            return !this.thread.isAlive();
        }

        @Override // java.lang.Runnable
        public void run() {
            Exception exc = null;
            try {
                BasicNode.this.lifecyclePublisher.onNodeOnline();
                while (!BasicNode.this.isDisposed()) {
                    if (BasicNode.LOG.isLoggable(Level.FINEST)) {
                        BasicNode.LOG.finest("Dispatch waiting for message: node=" + BasicNode.this.getAddress());
                    }
                    Message nextMessage = BasicNode.this.getCommunications().nextMessage();
                    if (BasicControlMessageFactory.INSTANCE.isType(BasicControlMessageFactory.COMPOSITE_MESSAGE_TYPE, nextMessage)) {
                        Iterator it = ((List) BasicNode.this.getProperties().instantiate(nextMessage.getPayload())).iterator();
                        while (it.hasNext()) {
                            onMessage((Message) it.next());
                        }
                    } else {
                        onMessage(nextMessage);
                    }
                }
            } catch (Exception e) {
                if (e instanceof RuntimeInterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (e instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                }
                if (!BasicNode.this.isDisposed() || !BasicNode.this.isValidErrorDuringShutdown(e)) {
                    exc = e;
                }
            }
            if (exc != null) {
                BasicNode.LOG.log(Level.SEVERE, "Node aborting runtime dispatch loop at " + this + " due to error (rethrowing)", (Throwable) exc);
                ExceptionUtils.throwRuntime(exc);
            }
        }

        private void onMessage(final Message message) {
            final AtomicBoolean atomicBoolean = new AtomicBoolean();
            Runnable runnable = new Runnable() { // from class: com.cloudsoftcorp.monterey.node.basic.BasicNode.MessageReader.1
                @Override // java.lang.Runnable
                public void run() {
                    if (BasicNode.this.handleMessage(message, atomicBoolean.get())) {
                        return;
                    }
                    BasicNode.this.onUnhandledMessage(message);
                }

                public String toString() {
                    return "execute msg=" + message;
                }
            };
            boolean z = message.getHeader(BasicControlMessageFactory.REPLICATION_MESSAGE_HEADER) != null;
            boolean z2 = message.getHeader(BasicControlMessageFactory.CONTROL_MESSAGE_HEADER) != null;
            atomicBoolean.set((z || z2) ? false : true);
            if (z) {
                BasicNode.this.replicationWorker.onPriorityMedWork(runnable);
            } else if (z2) {
                BasicNode.this.mainWorker.onPriorityHighWork(runnable);
            } else {
                BasicNode.this.mainWorker.onPriorityMedWork(runnable);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$NodeLifecyclePublisher.class */
    public static class NodeLifecyclePublisher {
        private final NodeCommunications comms;
        private final NodeId nodeId;
        private final PropertiesContext props;
        private boolean[] isOnline = {false};

        public NodeLifecyclePublisher(NodeId nodeId, NodeCommunications nodeCommunications, PropertiesContext propertiesContext) {
            this.comms = nodeCommunications;
            this.nodeId = nodeId;
            this.props = propertiesContext;
        }

        public void onNodeOnline() {
            sendSafely(BasicControlMessageFactory.INSTANCE.newOnlineMessage(this.nodeId, this.props));
            setOnlineStateAndNotify(true);
        }

        public void onNodeStopping() {
            sendSafely(BasicControlMessageFactory.INSTANCE.newOfflineMessage(this.nodeId));
            setOnlineStateAndNotify(false);
        }

        private void setOnlineStateAndNotify(boolean z) {
            synchronized (this.isOnline) {
                this.isOnline[0] = z;
                this.isOnline.notifyAll();
            }
        }

        private void sendSafely(Message message) {
            try {
                this.comms.sendControlMessage(message, NodeCommunications.ControlDestination.MONITOR);
            } catch (RuntimeInterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (Exception e2) {
                if (e2 instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                } else {
                    BasicNode.LOG.log(Level.WARNING, "Error sending " + BasicControlMessageFactory.INSTANCE.getType(message) + " message from node " + this.nodeId, (Throwable) e2);
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$NodeQueueMonitoringProcessor.class */
    public class NodeQueueMonitoringProcessor extends ControlMessageProcessorForType implements NodeAttachable {
        ScheduledExecutorService nodeQueueMonitorExecutor;
        int spikeWarning;
        int spikeError;
        int spikeClear;
        int period;
        boolean dropOnSpikeError;
        int lastState;
        boolean detached;

        public NodeQueueMonitoringProcessor() {
            super(BasicControlMessageFactory.NodeQueue.NODE_QUEUE_MONITORING_CONTROL_MESSAGE_TYPE);
            this.nodeQueueMonitorExecutor = null;
            this.spikeWarning = 0;
            this.spikeError = 0;
            this.spikeClear = 0;
            this.period = 0;
            this.dropOnSpikeError = false;
            this.lastState = 0;
            this.detached = false;
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public synchronized void processMessage(Message message) {
            if (this.detached) {
                BasicNode.LOG.warning("not setting period because node " + BasicNode.this.getAddress() + " has been shut down");
                return;
            }
            try {
                PropertiesContext instantiateProperties = BasicNode.this.getProperties().instantiateProperties(message.getPayload());
                this.period = Integer.parseInt(instantiateProperties.getProperty("period"));
                if (this.nodeQueueMonitorExecutor != null) {
                    if (BasicNode.LOG.isLoggable(Level.FINE)) {
                        BasicNode.LOG.log(Level.FINE, "On configuring queue-monitor, shutting down old node-queue-monitor executor at node " + BasicNode.this.getAddress());
                    }
                    this.nodeQueueMonitorExecutor.shutdownNow();
                    this.nodeQueueMonitorExecutor = null;
                }
                if (this.period > 0) {
                    this.spikeWarning = Integer.parseInt(instantiateProperties.getProperty(BasicControlMessageFactory.NodeQueue.SPIKE_WARNING_PROPERTY));
                    this.spikeError = Integer.parseInt(instantiateProperties.getProperty(BasicControlMessageFactory.NodeQueue.SPIKE_ERROR_PROPERTY));
                    this.spikeClear = Integer.parseInt(instantiateProperties.getProperty(BasicControlMessageFactory.NodeQueue.SPIKE_CLEARED_PROPERTY));
                    this.dropOnSpikeError = Boolean.parseBoolean(instantiateProperties.getProperty(BasicControlMessageFactory.NodeQueue.DROP_ON_SPIKE_ERROR_PROPERTY));
                    this.nodeQueueMonitorExecutor = Executors.newSingleThreadScheduledExecutor(CloudsoftThreadFactory.newThreadFactory("node-queue-monitor-for-" + BasicNode.this.getAddress()));
                    this.nodeQueueMonitorExecutor.scheduleAtFixedRate(new Runnable() { // from class: com.cloudsoftcorp.monterey.node.basic.BasicNode.NodeQueueMonitoringProcessor.1
                        @Override // java.lang.Runnable
                        public void run() {
                            try {
                                int combinedQueueLength = BasicNode.this.mainWorker.getCombinedQueueLength() + BasicNode.this.replicationWorker.getCombinedQueueLength();
                                if (NodeQueueMonitoringProcessor.this.lastState > 0) {
                                    BasicNode.LOG.info("node " + BasicNode.this.getAddress() + " queue in " + (BasicNode.this.emergencyDropNonControl ? "error (DROPPING)" : "warning") + " state, queue size now " + combinedQueueLength);
                                }
                                if (combinedQueueLength <= NodeQueueMonitoringProcessor.this.spikeClear && NodeQueueMonitoringProcessor.this.lastState >= 1) {
                                    BasicNode.LOG.info("node " + BasicNode.this.getAddress() + " queue appears cleared, queue size now " + combinedQueueLength + "; forcing garbage collection (" + ResourceUsageUtils.getMemoryUsageString() + ") and recomputing queue length to be sure");
                                    System.gc();
                                    System.gc();
                                    combinedQueueLength = BasicNode.this.getCommunications().getIncomingQueueLength();
                                    BasicNode.LOG.info("node " + BasicNode.this.getAddress() + " queue size now " + combinedQueueLength + " after GC (" + ResourceUsageUtils.getMemoryUsageString() + ")");
                                }
                                if (combinedQueueLength <= NodeQueueMonitoringProcessor.this.spikeClear) {
                                    if (NodeQueueMonitoringProcessor.this.lastState >= 1) {
                                        NodeQueueMonitoringProcessor.this.lastState = 0;
                                        BasicNode.LOG.warning("node " + BasicNode.this.getAddress() + " queue cleared, queue size now " + combinedQueueLength);
                                        NodeQueueMonitoringProcessor.this.sendToMonitor(BasicControlMessageFactory.NodeQueue.newEventMessage(BasicControlMessageFactory.NodeQueue.SEVERITY_INFO, BasicNode.this.getAddress(), "Node queue seems healthy again."));
                                        BasicNode.this.emergencyDropNonControl = false;
                                    }
                                } else if (combinedQueueLength >= NodeQueueMonitoringProcessor.this.spikeWarning) {
                                    if (combinedQueueLength < NodeQueueMonitoringProcessor.this.spikeError) {
                                        if (NodeQueueMonitoringProcessor.this.lastState == 0) {
                                            NodeQueueMonitoringProcessor.this.lastState = 1;
                                            BasicNode.LOG.warning("node " + BasicNode.this.getAddress() + " queue long (warning), queue size " + combinedQueueLength);
                                            NodeQueueMonitoringProcessor.this.sendToMonitor(BasicControlMessageFactory.NodeQueue.newEventMessage(BasicControlMessageFactory.NodeQueue.SEVERITY_WARNING, BasicNode.this.getAddress(), "Node queue is long and growing."));
                                        }
                                    } else if (NodeQueueMonitoringProcessor.this.lastState < 2) {
                                        NodeQueueMonitoringProcessor.this.lastState = 2;
                                        if (NodeQueueMonitoringProcessor.this.dropOnSpikeError) {
                                            BasicNode.LOG.severe("node " + BasicNode.this.getAddress() + " queue too long (error), NOW DROPPING, queue size " + combinedQueueLength);
                                            BasicNode.this.emergencyDropNonControl = true;
                                            NodeQueueMonitoringProcessor.this.sendToMonitor(BasicControlMessageFactory.NodeQueue.newEventMessage("ERROR", BasicNode.this.getAddress(), "Node queue is too long. Messages will likely be dropped."));
                                        } else {
                                            BasicNode.LOG.severe("node " + BasicNode.this.getAddress() + " queue too long (error), MAY FAIL, queue size " + combinedQueueLength);
                                            NodeQueueMonitoringProcessor.this.sendToMonitor(BasicControlMessageFactory.NodeQueue.newEventMessage("ERROR", BasicNode.this.getAddress(), "Node queue is too long. Node may die."));
                                        }
                                    }
                                }
                            } catch (Throwable th) {
                                BasicNode.LOG.log(Level.WARNING, "Error during node-queue-monitor at " + BasicNode.this.getAddress(), th);
                                throw ExceptionUtils.throwRuntime(th);
                            }
                        }
                    }, this.period, this.period, TimeUnit.MILLISECONDS);
                }
            } catch (IOException e) {
                BasicNode.this.error("Couldn't read propreties payload (" + message + "): " + e, e);
            } catch (ClassNotFoundException e2) {
                BasicNode.this.error("Couldn't instantiate propreties payload (" + message + "): " + e2, e2);
            }
        }

        public void sendToMonitor(Message message) {
            try {
                BasicNode.this.getCommunications().sendControlMessage(message, NodeCommunications.ControlDestination.MONITOR);
            } catch (RuntimeInterruptedException e) {
                Thread.currentThread().interrupt();
                throw e;
            } catch (Exception e2) {
                if (e2 instanceof InterruptedException) {
                    Thread.currentThread().interrupt();
                    ExceptionUtils.throwRuntime(e2);
                }
                if (!BasicNode.this.isDisposed()) {
                    ExceptionUtils.throwRuntime(e2);
                } else if (BasicNode.LOG.isLoggable(Level.FINE)) {
                    BasicNode.LOG.log(Level.FINE, "Failed to send message to montior: sender=" + BasicNode.this.getAddress() + "; msg=" + message, (Throwable) e2);
                }
            }
        }

        @Override // com.cloudsoftcorp.monterey.node.api.NodeAttachable
        public synchronized void postDetach(Node node) {
            this.detached = true;
            if (this.nodeQueueMonitorExecutor == null) {
                if (BasicNode.LOG.isLoggable(Level.FINE)) {
                    BasicNode.LOG.log(Level.FINE, "On detach, no node-queue-monitor executor at node " + node.getAddress());
                }
            } else {
                if (BasicNode.LOG.isLoggable(Level.FINE)) {
                    BasicNode.LOG.log(Level.FINE, "On detach, shutting down node-queue-monitor executor at node " + node.getAddress());
                }
                this.nodeQueueMonitorExecutor.shutdownNow();
                this.nodeQueueMonitorExecutor = null;
            }
        }

        @Override // com.cloudsoftcorp.monterey.node.api.NodeAttachable
        public void preAttach(Node node) {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$RevertProcessor.class */
    public class RevertProcessor extends ControlMessageProcessorForType {
        public RevertProcessor() {
            super(BasicControlMessageFactory.NODE_REVERT_MESSAGE_TYPE);
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(Message message) {
            BasicNode.this.onRevert(message.getHeader(BasicControlMessageFactory.TRANSITION_ID_HEADER));
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/cloudsoftcorp/monterey/node/basic/BasicNode$ShutdownProcessor.class */
    public class ShutdownProcessor extends ControlMessageProcessorForType {
        public ShutdownProcessor() {
            super(BasicControlMessageFactory.NODE_SHUTDOWN_MESSAGE_TYPE);
        }

        @Override // com.cloudsoftcorp.monterey.node.api.MessageProcessor
        public void processMessage(final Message message) {
            BasicNode.LOG.warning("Received shutdown request; shutting down node " + BasicNode.this.getAddress());
            CloudsoftThreadFactory.createThread(BasicNode.this.getAddress() + "-shutdown", new Runnable() { // from class: com.cloudsoftcorp.monterey.node.basic.BasicNode.ShutdownProcessor.1
                @Override // java.lang.Runnable
                public void run() {
                    BasicNode.this.onShutdown(new BasicControlMessageFactory.TransitionId(message.getHeader(BasicControlMessageFactory.TRANSITION_ID_HEADER)));
                }
            }, true);
        }
    }

    public boolean isControlThread() {
        return this.mainWorker.isCurrentThread() && this.mainWorker.isCurrentHighPriority();
    }

    public boolean isDataplaneThread() {
        return this.mainWorker.isCurrentThread();
    }

    public boolean isMainThread() {
        return this.mainWorker.isCurrentThread();
    }

    public boolean isReplicationThread() {
        return this.replicationWorker.isCurrentThread();
    }

    public Future<Object> executeInResilienceThreadHighPriority(Runnable runnable) {
        return this.replicationWorker.onPriorityHighWork(runnable);
    }

    public BasicNode(PropertiesContext propertiesContext, Communications communications, boolean z, MessageProcessor... messageProcessorArr) {
        this(propertiesContext, communications, z, new BundleManager(propertiesContext.getClassloadingContext()), messageProcessorArr);
    }

    public BasicNode(PropertiesContext propertiesContext, Communications communications, boolean z, BundleManager bundleManager, MessageProcessor... messageProcessorArr) {
        super(propertiesContext, communications);
        this.processorRegistry = new CopyOnWriteArrayList();
        this.unqueuedProcessorRegistry = new CopyOnWriteArrayList();
        this.toldToStart = new AtomicBoolean(false);
        this.toldToShutdown = new AtomicBoolean(false);
        this.toldToKill = new AtomicBoolean(false);
        this.lifecycleListeners = new CopyOnWriteArrayList();
        this.emergencyDropNonControl = false;
        this.supersededTransitionsRecorder = new SupersededTransitionsRecorder();
        this.totalProcessingTime = 0L;
        this.numProcessed = 0L;
        this.bundleManager = bundleManager;
        registerUnqeuedMessageConsumer();
        for (MessageProcessor messageProcessor : messageProcessorArr) {
            addProcessor(messageProcessor);
        }
        if (z) {
            addDefaultProcessors();
        }
        if (LOG.isLoggable(Level.INFO)) {
            LOG.info("Instantiating new node " + this + " with comms " + communications + " and properties: " + Iterables.toString(getProperties()));
        }
        addErrorHandler(new NodeErrorHandlers.ReportingNodeErrorHandler());
        addErrorHandler(new NodeErrorHandlers.MonitoringNodeErrorHandler());
        if (CloudsoftSystemProperties.MAX_MEM_HEALTH_THRESHOLD.isNonEmpty()) {
            startHealthMonitorRequired(Double.parseDouble(CloudsoftSystemProperties.MAX_MEM_HEALTH_THRESHOLD.getValue()));
        }
        this.lifecyclePublisher = new NodeLifecyclePublisher(getAddress(), getCommunications(), getProperties());
    }

    private void registerUnqeuedMessageConsumer() {
        getRawCommunications().addUnqueuedMessageConsumer(new Filter<Message>() { // from class: com.cloudsoftcorp.monterey.node.basic.BasicNode.1
            @Override // com.cloudsoftcorp.util.condition.Filter
            public boolean accept(Message message) {
                Iterator it = BasicNode.this.unqueuedProcessorRegistry.iterator();
                while (it.hasNext()) {
                    if (((MessageProcessor) it.next()).acceptsMessage(message)) {
                        return true;
                    }
                }
                return false;
            }
        }, new Consumer<Message>() { // from class: com.cloudsoftcorp.monterey.node.basic.BasicNode.2
            @Override // com.cloudsoftcorp.util.condition.Consumer
            public void consume(Message message) {
                try {
                    for (MessageProcessor messageProcessor : BasicNode.this.unqueuedProcessorRegistry) {
                        if (messageProcessor.acceptsMessage(message)) {
                            if (BasicNode.LOG.isLoggable(Level.FINER)) {
                                BasicNode.LOG.finer("node " + BasicNode.this.getAddress() + " consuming unqueued message '" + message + "' at " + messageProcessor);
                            }
                            messageProcessor.processMessage(message);
                            return;
                        }
                    }
                    BasicNode.LOG.warning("Node accepted unqueued message '" + message + "', but did not handle it, at " + BasicNode.this.getAddress());
                } catch (RuntimeInterruptedException e) {
                    throw e;
                } catch (RuntimeException e2) {
                    BasicNode.LOG.log(Level.WARNING, "Problem processing message '" + message + "' at node " + BasicNode.this.getAddress(), (Throwable) e2);
                }
            }
        });
    }

    public BundleManager getBundleManager() {
        return this.bundleManager;
    }

    public void addDefaultProcessors() {
        addProcessor(new AddProcessorProcessor());
        addUnqueuedProcessor(new HealthCheckProcessor());
        addProcessor(new ShutdownProcessor());
        addProcessor(new KillProcessor());
        addProcessor(new DumpDiagnosticsProcessor());
        addProcessor(new RevertProcessor());
        addProcessor(new MachineLoadReporter());
        WorkrateProcessor workrateProcessor = new WorkrateProcessor();
        this.workrateProcessor = workrateProcessor;
        addProcessor(workrateProcessor);
        addProcessor(new BundleUpdateProcessor(getProperties().getClassloadingContext()));
        addProcessor(new NodeQueueMonitoringProcessor());
    }

    public void addLifecycleListener(LifecycleListener lifecycleListener) {
        this.lifecycleListeners.add(lifecycleListener);
    }

    public void removeLifecyleListener(LifecycleListener lifecycleListener) {
        this.lifecycleListeners.remove(lifecycleListener);
    }

    public void dumpDiagnosticsIfNotDoneRecently(Logger logger, Level level) {
        dumpDiagnosticsIfNotDoneRecently(logger, level, MIN_TIME_BETWEEN_DUMPS);
    }

    public void dumpDiagnosticsIfNotDoneRecently(Logger logger, Level level, long j) {
        dumpNodeDiagnosticsIfNotDoneRecently(logger, level, j);
        ResourceUsageUtils.dumpJvmDiagnosticsIfNotDoneRecently(logger, level, j);
    }

    public void dumpDiagnostics(Logger logger, Level level) {
        dumpNodeDiagnostics(logger, level);
        ResourceUsageUtils.dumpJvmDiagnostics(logger, level);
    }

    public void dumpNodeDiagnosticsIfNotDoneRecently(Logger logger, Level level, long j) {
        if (System.currentTimeMillis() - this.lastNodeDumpTime > j) {
            dumpNodeDiagnostics(logger, level);
        } else if (LOG.isLoggable(Level.FINE)) {
            LOG.fine("Not dumping node diagnostics because done previously " + TimeUtils.makeTimeString(System.currentTimeMillis() - this.lastNodeDumpTime) + " ago");
        }
    }

    public void dumpNodeDiagnostics(Logger logger, Level level) {
        LOG.log(level, "Dumping node diagnostics to: " + logger.getName());
        this.lastNodeDumpTime = System.currentTimeMillis();
        if (this.workrateProcessor != null) {
            logger.log(level, this.workrateProcessor.peekWorkrateReport().toString());
        }
    }

    public void addProcessor(@NonNull MessageProcessor messageProcessor) {
        if (messageProcessor instanceof NodeAttachable) {
            ((NodeAttachable) messageProcessor).preAttach(this);
        }
        if (this.processorRegistry.contains(messageProcessor)) {
            return;
        }
        this.processorRegistry.add(messageProcessor);
    }

    public void removeProcessor(MessageProcessor messageProcessor) {
        if (this.processorRegistry.remove(messageProcessor)) {
            if (LOG.isLoggable(Level.FINE)) {
                LOG.fine("node " + this + " removed node processor " + messageProcessor);
            }
            if (messageProcessor instanceof NodeAttachable) {
                ((NodeAttachable) messageProcessor).postDetach(this);
            }
        }
    }

    public void addUnqueuedProcessor(@NonNull MessageProcessor messageProcessor) {
        if (messageProcessor instanceof NodeAttachable) {
            ((NodeAttachable) messageProcessor).preAttach(this);
        }
        if (this.unqueuedProcessorRegistry.contains(messageProcessor)) {
            return;
        }
        this.unqueuedProcessorRegistry.add(messageProcessor);
    }

    public void removeUnqueuedProcessor(@NonNull MessageProcessor messageProcessor) {
        if (this.unqueuedProcessorRegistry.remove(messageProcessor) && (messageProcessor instanceof NodeAttachable)) {
            ((NodeAttachable) messageProcessor).postDetach(this);
        }
    }

    public Collection<MessageProcessor> getMessageProcessors() {
        return Collections.unmodifiableList(this.processorRegistry);
    }

    public synchronized BasicNode startThread() {
        return start();
    }

    private BasicNode start() {
        if (this.toldToStart.get()) {
            throw new IllegalStateException("Cannot start multiple times: node=" + getAddress());
        }
        this.replicationWorker = new PrioritisedWorker(getAmalgamatedErrorHandler(), getAddress() + "-replication").start();
        this.mainWorker = new PrioritisedWorker(getAmalgamatedErrorHandler(), getAddress() + "-main").start();
        this.messageReader = new MessageReader().start();
        this.toldToStart.set(true);
        Iterator<LifecycleListener> it = this.lifecycleListeners.iterator();
        while (it.hasNext()) {
            it.next().onStarted();
        }
        return this;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public boolean isValidErrorDuringShutdown(Throwable th) {
        if ((th instanceof InterruptedException) || (th instanceof RuntimeInterruptedException) || (th instanceof IOException) || (th instanceof RejectedExecutionException) || (th instanceof CancellationException)) {
            return true;
        }
        if (th instanceof RuntimeWrappedException) {
            return isValidErrorDuringShutdown(((RuntimeWrappedException) th).getTargetException());
        }
        return false;
    }

    protected boolean handleMessage(Message message, boolean z) {
        if (LOG.isLoggable(Level.FINER)) {
            LOG.finer("node " + getAddress() + " attempting to handle message: node=" + getAddress() + "; msg=" + message);
        }
        String header = message.getHeader(BasicControlMessageFactory.DELIVERY_RECEIPT_REQUEST_HEADER);
        String header2 = message.getHeader(BasicControlMessageFactory.TRANSITION_ID_HEADER);
        String header3 = message.getHeader(BasicControlMessageFactory.SUPERSEDED_TRANSITION_ID_HEADER);
        if (header3 != null) {
            this.supersededTransitionsRecorder.recordSuperseded(header2, header3);
        }
        try {
            if (this.supersededTransitionsRecorder.isSuperseded(header2)) {
                LOG.info("Ignoring message relating to superseded transition " + header2 + "; message=" + message);
                return true;
            }
            try {
                for (MessageProcessor messageProcessor : this.processorRegistry) {
                    if (!this.emergencyDropNonControl || !z || (messageProcessor instanceof MessageProcessor.ControlMessageProcessor)) {
                        if (messageProcessor.acceptsMessage(message)) {
                            if (LOG.isLoggable(Level.FINER)) {
                                LOG.finer("node " + getAddress() + " dispatch loop processing '" + message + "' at " + messageProcessor);
                            }
                            messageProcessor.processMessage(message);
                            if (header != null) {
                                if (LOG.isLoggable(Level.FINE)) {
                                    LOG.fine(new StringBuilder().append("node dispatch loop sending delivery receipt: receipt=").append(header).append("; node=").append(getAddress()).append("; ").append(0 != 0 ? "unprocessed" : "success=" + (0 == 0)).append("; msg=").append(message).toString());
                                }
                                try {
                                    getCommunications().sendControlMessage(0 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, new KnownRuntimeException("Unprocessed message " + BasicControlMessageFactory.INSTANCE.getType(message))) : 0 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, null) : BasicControlMessageFactory.INSTANCE.newDeliveryReceipt(header), NodeCommunications.ControlDestination.MANAGER);
                                } catch (CommsException e) {
                                    Loggers.ERRORS.log(Level.WARNING, "Error sending receipt to manager: receipt=" + header + "; sender=" + getAddress(), (Throwable) e);
                                }
                            }
                            return true;
                        }
                    }
                }
                LOG.warning("Node did NOT process message: node=" + getAddress() + (this.emergencyDropNonControl ? "; emergencyDrop" : HttpVersions.HTTP_0_9) + "; msg=" + message + (LOG.isLoggable(Level.FINER) ? "; processors=" + this.processorRegistry : HttpVersions.HTTP_0_9));
                if (header != null) {
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine(new StringBuilder().append("node dispatch loop sending delivery receipt: receipt=").append(header).append("; node=").append(getAddress()).append("; ").append(1 != 0 ? "unprocessed" : "success=" + (0 == 0)).append("; msg=").append(message).toString());
                    }
                    try {
                        getCommunications().sendControlMessage(1 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, new KnownRuntimeException("Unprocessed message " + BasicControlMessageFactory.INSTANCE.getType(message))) : 0 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, null) : BasicControlMessageFactory.INSTANCE.newDeliveryReceipt(header), NodeCommunications.ControlDestination.MANAGER);
                    } catch (CommsException e2) {
                        Loggers.ERRORS.log(Level.WARNING, "Error sending receipt to manager: receipt=" + header + "; sender=" + getAddress(), (Throwable) e2);
                    }
                }
            } catch (RuntimeInterruptedException e3) {
                throw e3;
            } catch (Throwable th) {
                if (th instanceof InterruptedException) {
                    throw new RuntimeInterruptedException((InterruptedException) th);
                }
                error("Error when handling message " + message + ": " + th, th);
                if (header != null) {
                    if (LOG.isLoggable(Level.FINE)) {
                        LOG.fine(new StringBuilder().append("node dispatch loop sending delivery receipt: receipt=").append(header).append("; node=").append(getAddress()).append("; ").append(1 != 0 ? "unprocessed" : "success=" + (th == null)).append("; msg=").append(message).toString());
                    }
                    try {
                        getCommunications().sendControlMessage(1 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, new KnownRuntimeException("Unprocessed message " + BasicControlMessageFactory.INSTANCE.getType(message))) : th != null ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, th) : BasicControlMessageFactory.INSTANCE.newDeliveryReceipt(header), NodeCommunications.ControlDestination.MANAGER);
                    } catch (CommsException e4) {
                        Loggers.ERRORS.log(Level.WARNING, "Error sending receipt to manager: receipt=" + header + "; sender=" + getAddress(), (Throwable) e4);
                    }
                }
            }
            return 1 == 0;
        } catch (Throwable th2) {
            if (header != null) {
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine(new StringBuilder().append("node dispatch loop sending delivery receipt: receipt=").append(header).append("; node=").append(getAddress()).append("; ").append(1 != 0 ? "unprocessed" : "success=" + (0 == 0)).append("; msg=").append(message).toString());
                }
                try {
                    getCommunications().sendControlMessage(1 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, new KnownRuntimeException("Unprocessed message " + BasicControlMessageFactory.INSTANCE.getType(message))) : 0 != 0 ? BasicControlMessageFactory.INSTANCE.newDeliveryReceiptError(header, null) : BasicControlMessageFactory.INSTANCE.newDeliveryReceipt(header), NodeCommunications.ControlDestination.MANAGER);
                } catch (CommsException e5) {
                    Loggers.ERRORS.log(Level.WARNING, "Error sending receipt to manager: receipt=" + header + "; sender=" + getAddress(), (Throwable) e5);
                }
            }
            throw th2;
        }
    }

    protected void onUnhandledMessage(Message message) {
        if (this.emergencyDropNonControl) {
            return;
        }
        if (LOG.isLoggable(Level.INFO)) {
            StringBuilder sb = new StringBuilder();
            sb.append("This node (" + this + ") dispatch doesn't handle '" + message + "', sending error. Message Headers are:\n");
            Iterator it = message.getEnvelope().iterator();
            while (it.hasNext()) {
                KeyValuePair keyValuePair = (KeyValuePair) it.next();
                sb.append("  -- " + ((String) keyValuePair.getKey()) + "=" + ((String) keyValuePair.getValue()) + IOUtils.LINE_SEPARATOR_UNIX);
            }
            LOG.log(Level.INFO, sb.toString());
        }
        error("No handler for message: " + message);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void onRevert(String str) {
        LOG.info("node " + this + " reverting");
        for (MessageProcessor messageProcessor : this.processorRegistry) {
            if (messageProcessor instanceof NodeAttachable) {
                if (LOG.isLoggable(Level.FINE)) {
                    LOG.fine("node " + this + " reverting: detaching " + messageProcessor);
                }
                ((NodeAttachable) messageProcessor).postDetach(this);
            }
        }
        this.processorRegistry.clear();
        addDefaultProcessors();
    }

    protected void onShutdown(BasicControlMessageFactory.TransitionId transitionId) {
        onShutdown(transitionId, 10000L);
    }

    public boolean awaitTermination(long j) throws InterruptedException {
        return (this.messageReader == null || this.messageReader.awaitTermination(j)) && (this.mainWorker == null || this.mainWorker.awaitTermination(j)) && (this.replicationWorker == null || this.replicationWorker.awaitTermination(j));
    }

    protected void onShutdown(BasicControlMessageFactory.TransitionId transitionId, long j) {
        this.toldToShutdown.set(true);
        for (MessageProcessor messageProcessor : this.processorRegistry) {
            if (messageProcessor instanceof NodeAttachable) {
                try {
                    ((NodeAttachable) messageProcessor).postDetach(this);
                } catch (RuntimeInterruptedException e) {
                    Thread.currentThread().interrupt();
                } catch (Exception e2) {
                    if (e2 instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    LOG.log(Level.WARNING, "Continuing shutdown after problem detaching message-processor " + messageProcessor, (Throwable) e2);
                }
            }
        }
        for (NodeErrorHandler nodeErrorHandler : getErrorHandlers()) {
            if (nodeErrorHandler instanceof NodeAttachable) {
                try {
                    ((NodeAttachable) nodeErrorHandler).postDetach(this);
                } catch (RuntimeInterruptedException e3) {
                    Thread.currentThread().interrupt();
                } catch (Exception e4) {
                    if (e4 instanceof InterruptedException) {
                        Thread.currentThread().interrupt();
                    }
                    LOG.log(Level.WARNING, "Continuing shutdown after problem detaching error-handler " + nodeErrorHandler, (Throwable) e4);
                }
            }
        }
        stopHealthMonitor();
        shutdownThreads(j);
        this.lifecyclePublisher.onNodeStopping();
        getCommunications().dispose();
        Iterator<LifecycleListener> it = this.lifecycleListeners.iterator();
        while (it.hasNext()) {
            it.next().onStopped();
        }
    }

    public void kill() {
        LOG.info("Killing node " + getAddress());
        this.toldToKill.set(true);
        getCommunications().dispose();
        shutdownThreads(-1L);
        onShutdown(BasicControlMessageFactory.NULL_TRANSITION_ID, -1L);
    }

    private void shutdownThreads(long j) {
        if (this.messageReader != null) {
            try {
                this.messageReader.shutdown(j);
            } catch (RuntimeInterruptedException e) {
                Thread.currentThread().interrupt();
            } catch (InterruptedException e2) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.mainWorker != null) {
            try {
                this.mainWorker.shutdown(j);
            } catch (RuntimeInterruptedException e3) {
                Thread.currentThread().interrupt();
            } catch (InterruptedException e4) {
                Thread.currentThread().interrupt();
            }
        }
        if (this.replicationWorker != null) {
            try {
                this.replicationWorker.shutdown(j);
            } catch (RuntimeInterruptedException e5) {
                Thread.currentThread().interrupt();
            } catch (InterruptedException e6) {
                Thread.currentThread().interrupt();
            }
        }
    }

    public boolean isStarted() {
        return this.toldToStart.get();
    }

    public boolean isDisposed() {
        return this.toldToShutdown.get() || this.toldToKill.get();
    }

    public void shutdown() {
        onShutdown(BasicControlMessageFactory.NULL_TRANSITION_ID);
    }

    @Override // com.cloudsoftcorp.monterey.node.api.Node
    public boolean isRunning() {
        return isStarted() && !isDisposed();
    }

    private void startHealthMonitorRequired(double d) {
        startHealthMonitor(1000L, d);
    }

    private void stopHealthMonitor() {
        if (this.healthMonitor != null) {
            this.healthMonitor.interrupt();
            this.healthMonitor = null;
        }
    }

    private void startHealthMonitor(long j, final double d) {
        if (Loggers.RESOURCE_USAGE.isLoggable(Level.WARNING) && this.healthMonitor == null) {
            this.healthMonitor = CloudsoftThreadFactory.createThreadRunningPeriodically("health monitor", new Runnable() { // from class: com.cloudsoftcorp.monterey.node.basic.BasicNode.3
                @Override // java.lang.Runnable
                public void run() {
                    try {
                        if (ResourceUsageUtils.getFractionUsedOfMax() > d) {
                            BasicNode.this.dumpDiagnosticsIfNotDoneRecently(Loggers.RESOURCE_USAGE, Level.INFO);
                        }
                    } catch (RuntimeInterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }, (int) j, false, false);
            this.healthMonitor.setDaemon(true);
            this.healthMonitor.start();
        }
    }

    @Override // com.cloudsoftcorp.monterey.control.workrate.api.WorkrateContributor
    public void contributeWorkrateItems(WorkrateReport workrateReport) {
        if (getCommunications() != null) {
            getCommunications().contributeWorkrateItems(workrateReport);
        }
        for (MessageProcessor messageProcessor : getMessageProcessors()) {
            if (messageProcessor instanceof WorkrateContributor) {
                ((WorkrateContributor) messageProcessor).contributeWorkrateItems(workrateReport);
            }
        }
        WorkrateItem makeCapacityWorkrateItem = makeCapacityWorkrateItem(true);
        if (makeCapacityWorkrateItem != null) {
            workrateReport.addItem(makeCapacityWorkrateItem);
        }
    }

    @Override // com.cloudsoftcorp.monterey.control.workrate.api.WorkrateContributor
    public Collection<WorkrateItem> peekWorkrateItems() {
        LinkedList linkedList = new LinkedList();
        if (getCommunications() != null) {
            linkedList.addAll(getCommunications().peekWorkrateItems());
        }
        for (MessageProcessor messageProcessor : getMessageProcessors()) {
            if (messageProcessor instanceof WorkrateContributor) {
                linkedList.addAll(((WorkrateContributor) messageProcessor).peekWorkrateItems());
            }
        }
        WorkrateItem makeCapacityWorkrateItem = makeCapacityWorkrateItem(false);
        if (makeCapacityWorkrateItem != null) {
            linkedList.add(makeCapacityWorkrateItem);
        }
        return linkedList;
    }

    private WorkrateItem makeCapacityWorkrateItem(boolean z) {
        BasicNodeCapacityItem basicNodeCapacityItem = null;
        try {
            basicNodeCapacityItem = new BasicNodeCapacityItem(this.mainWorker.getPriorityHighQueueLength(), this.mainWorker.getPriorityMedQueueLength() + this.mainWorker.getPriorityLowQueueLength(), this.replicationWorker.getCombinedQueueLength(), this.mainWorker.getIdleTime(z), this.replicationWorker.getIdleTime(z));
        } catch (RuntimeInterruptedException e) {
            Thread.currentThread().interrupt();
            throw e;
        } catch (Throwable th) {
            if (th instanceof InterruptedException) {
                Thread.currentThread().interrupt();
                throw new RuntimeInterruptedException((InterruptedException) th);
            }
            LOG.log(Level.SEVERE, "exception trying to make workrate item at " + this + ", ignoring: " + th, th);
        }
        return basicNodeCapacityItem;
    }
}
