package org.apache.whirr.service.cdh.integration;

import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.util.Map;
import junit.framework.Assert;
import org.apache.commons.configuration.CompositeConfiguration;
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.lib.LongSumReducer;
import org.apache.hadoop.mapred.lib.TokenCountMapper;
import org.apache.whirr.Cluster;
import org.apache.whirr.ClusterController;
import org.apache.whirr.ClusterSpec;
import org.apache.whirr.service.hadoop.HadoopProxy;
import org.jclouds.compute.domain.ExecResponse;
import org.jclouds.compute.domain.NodeMetadata;
import org.jclouds.scriptbuilder.domain.Statement;
import org.jclouds.scriptbuilder.domain.Statements;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/whirr/service/cdh/integration/Cdh3HadoopServiceTest.class */
public class Cdh3HadoopServiceTest {
    private static final Logger LOG = LoggerFactory.getLogger(Cdh3HadoopServiceTest.class);
    private static final Predicate<NodeMetadata> ALL = Predicates.alwaysTrue();
    protected static ClusterSpec clusterSpec;
    protected static ClusterController controller;
    protected static HadoopProxy proxy;
    protected static Cluster cluster;

    protected static String getPropertiesFilename() {
        return "whirr-hadoop-cdh3-test.properties";
    }

    @BeforeClass
    public static void setUp() throws Exception {
        CompositeConfiguration compositeConfiguration = new CompositeConfiguration();
        if (System.getProperty("config") != null) {
            compositeConfiguration.addConfiguration(new PropertiesConfiguration(System.getProperty("config")));
        }
        compositeConfiguration.addConfiguration(new PropertiesConfiguration(getPropertiesFilename()));
        clusterSpec = ClusterSpec.withTemporaryKeys(compositeConfiguration);
        controller = new ClusterController();
        cluster = controller.launchCluster(clusterSpec);
        proxy = new HadoopProxy(clusterSpec, cluster);
        proxy.start();
    }

    @AfterClass
    public static void tearDown() throws IOException, InterruptedException {
        if (proxy != null) {
            proxy.stop();
        }
        controller.destroyCluster(clusterSpec);
    }

    @Test
    public void testVersion() throws Exception {
        Statement exec = Statements.exec("ls /etc/alternatives/hadoop-lib");
        Map runScriptOnNodesMatching = controller.runScriptOnNodesMatching(clusterSpec, ALL, exec);
        printResponses(exec, runScriptOnNodesMatching);
        assertResponsesContain(runScriptOnNodesMatching, exec, "cdh3");
    }

    @Test
    public void testJobExecution() throws Exception {
        Configuration configuration = getConfiguration();
        JobConf jobConf = new JobConf(configuration, Cdh3HadoopServiceTest.class);
        JobClient jobClient = new JobClient(jobConf);
        waitForTaskTrackers(jobClient);
        checkHadoop(configuration, jobClient, jobConf);
    }

    protected void checkHadoop(Configuration configuration, JobClient jobClient, JobConf jobConf) throws Exception {
        FileSystem fileSystem = FileSystem.get(configuration);
        OutputStreamWriter outputStreamWriter = new OutputStreamWriter(fileSystem.create(new Path("input")));
        outputStreamWriter.write("b a\n");
        outputStreamWriter.close();
        jobConf.setMapperClass(TokenCountMapper.class);
        jobConf.setReducerClass(LongSumReducer.class);
        jobConf.setOutputKeyClass(Text.class);
        jobConf.setOutputValueClass(LongWritable.class);
        FileInputFormat.setInputPaths(jobConf, new Path[]{new Path("input")});
        FileOutputFormat.setOutputPath(jobConf, new Path("output"));
        JobClient.runJob(jobConf);
        BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(fileSystem.open(new Path("output/part-00000"))));
        Assert.assertEquals("a\t1", bufferedReader.readLine());
        Assert.assertEquals("b\t1", bufferedReader.readLine());
        Assert.assertNull(bufferedReader.readLine());
        bufferedReader.close();
    }

    private Configuration getConfiguration() {
        Configuration configuration = new Configuration();
        for (Map.Entry entry : cluster.getConfiguration().entrySet()) {
            configuration.set(entry.getKey().toString(), entry.getValue().toString());
        }
        return configuration;
    }

    private static void waitForTaskTrackers(JobClient jobClient) throws IOException {
        while (jobClient.getClusterStatus().getTaskTrackers() <= 0) {
            try {
                System.out.print(".");
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    private static void assertResponsesContain(Map<? extends NodeMetadata, ExecResponse> map, Statement statement, String str) {
        for (Map.Entry<? extends NodeMetadata, ExecResponse> entry : map.entrySet()) {
            if (!entry.getValue().getOutput().contains(str)) {
                Assert.failNotEquals("Node: " + entry.getKey().getId() + " failed to execute the command: " + statement + " as could not find expected text", str, entry.getValue());
            }
        }
    }

    public static void printResponses(Statement statement, Map<? extends NodeMetadata, ExecResponse> map) {
        LOG.info("Responses for Statement: " + statement);
        for (Map.Entry<? extends NodeMetadata, ExecResponse> entry : map.entrySet()) {
            LOG.info("Node[" + entry.getKey().getId() + "]: " + entry.getValue());
        }
    }
}
