package org.pentaho.amazon.emr.job;

import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClient;
import com.amazonaws.services.elasticmapreduce.model.AddJobFlowStepsRequest;
import com.amazonaws.services.elasticmapreduce.model.DescribeJobFlowsRequest;
import com.amazonaws.services.elasticmapreduce.model.HadoopJarStepConfig;
import com.amazonaws.services.elasticmapreduce.model.JobFlowDetail;
import com.amazonaws.services.elasticmapreduce.model.JobFlowInstancesConfig;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowRequest;
import com.amazonaws.services.elasticmapreduce.model.RunJobFlowResult;
import com.amazonaws.services.elasticmapreduce.model.StepConfig;
import com.amazonaws.services.s3.AmazonS3Client;
import com.amazonaws.services.s3.model.PutObjectRequest;
import com.amazonaws.services.s3.model.S3Object;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.StringTokenizer;
import org.apache.commons.io.IOUtils;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemOptions;
import org.apache.commons.vfs2.auth.StaticUserAuthenticator;
import org.apache.commons.vfs2.impl.DefaultFileSystemConfigBuilder;
import org.apache.log4j.Appender;
import org.pentaho.amazon.AbstractAmazonJobEntry;
import org.pentaho.amazon.AbstractAmazonJobExecutorController;
import org.pentaho.di.cluster.SlaveServer;
import org.pentaho.di.core.Const;
import org.pentaho.di.core.Result;
import org.pentaho.di.core.ResultFile;
import org.pentaho.di.core.annotations.JobEntry;
import org.pentaho.di.core.database.DatabaseMeta;
import org.pentaho.di.core.encryption.Encr;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.exception.KettleXMLException;
import org.pentaho.di.core.hadoop.HadoopConfigurationBootstrap;
import org.pentaho.di.core.logging.LogWriter;
import org.pentaho.di.core.util.StringUtil;
import org.pentaho.di.core.vfs.KettleVFS;
import org.pentaho.di.core.xml.XMLHandler;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.job.entries.hadoopjobexecutor.JarUtility;
import org.pentaho.di.job.entry.JobEntryInterface;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.Repository;
import org.pentaho.hadoop.shim.spi.HadoopShim;
import org.pentaho.metastore.api.IMetaStore;
import org.w3c.dom.Node;

@JobEntry(id = "EMRJobExecutorPlugin", image = "EMR.svg", name = "EMRJobExecutorPlugin.Name", description = "EMRJobExecutorPlugin.Description", categoryDescription = "i18n:org.pentaho.di.job:JobCategory.Category.BigData", documentationUrl = "http://wiki.pentaho.com/display/EAI/Amazon+EMR+Job+Executor", i18nPackageName = "org.pentaho.amazon.emr.job")
/* loaded from: input_file:org/pentaho/amazon/emr/job/AmazonElasticMapReduceJobExecutor.class */
public class AmazonElasticMapReduceJobExecutor extends AbstractAmazonJobEntry implements Cloneable, JobEntryInterface {
    private static Class<?> PKG = AmazonElasticMapReduceJobExecutor.class;
    private JarUtility util = new JarUtility();

    public String getMainClass(URL url) throws Exception {
        HadoopShim hadoopShim = HadoopConfigurationBootstrap.getHadoopConfigurationProvider().getActiveConfiguration().getHadoopShim();
        Class<?> mainClassFromManifest = this.util.getMainClassFromManifest(url, hadoopShim.getClass().getClassLoader());
        if (mainClassFromManifest != null) {
            return mainClassFromManifest.getName();
        }
        List<Class<?>> classesInJarWithMain = this.util.getClassesInJarWithMain(url.toExternalForm(), hadoopShim.getClass().getClassLoader());
        if (classesInJarWithMain.isEmpty()) {
            throw new RuntimeException("Could not find main class in: " + url.toExternalForm());
        }
        return classesInJarWithMain.get(0).getName();
    }

    public Result execute(Result result, int i) throws KettleException {
        Appender appender = null;
        String str = "pdi-" + getName();
        try {
            appender = LogWriter.createFileAppender(str, true, false);
            LogWriter.getInstance().addAppender(appender);
            this.log.setLogLevel(this.parentJob.getLogLevel());
        } catch (Exception e) {
            logError(BaseMessages.getString(PKG, "AmazonElasticMapReduceJobExecutor.FailedToOpenLogFile", new String[]{str, e.toString()}));
            logError(Const.getStackTracker(e));
        }
        try {
            AmazonElasticMapReduceClient amazonElasticMapReduceClient = new AmazonElasticMapReduceClient(this.awsCredentials);
            FileObject fileObject = KettleVFS.getFileObject(buildFilename(this.jarUrl));
            File createTempFile = File.createTempFile("customEMR", "jar");
            createTempFile.deleteOnExit();
            IOUtils.copy(fileObject.getContent().getInputStream(), new FileOutputStream(createTempFile));
            String mainClass = getMainClass(createTempFile.toURI().toURL());
            AmazonS3Client amazonS3Client = new AmazonS3Client(this.awsCredentials);
            FileSystemOptions fileSystemOptions = new FileSystemOptions();
            DefaultFileSystemConfigBuilder.getInstance().setUserAuthenticator(fileSystemOptions, new StaticUserAuthenticator((String) null, this.awsCredentials.getAWSAccessKeyId(), this.awsCredentials.getAWSSecretKey()));
            String baseName = KettleVFS.getFileObject(this.stagingDir, getVariables(), fileSystemOptions).getName().getBaseName();
            if (!amazonS3Client.doesBucketExist(baseName)) {
                amazonS3Client.createBucket(baseName);
            }
            try {
                amazonS3Client.deleteObject(baseName, fileObject.getName().getBaseName());
            } catch (Exception e2) {
                logError(Const.getStackTracker(e2));
            }
            amazonS3Client.putObject(new PutObjectRequest(baseName, fileObject.getName().getBaseName(), createTempFile));
            String str2 = "s3://" + baseName + "/" + fileObject.getName().getBaseName();
            String str3 = "s3://" + baseName;
            RunJobFlowResult runJobFlowResult = null;
            if (StringUtil.isEmpty(this.hadoopJobFlowId)) {
                runJobFlowResult = amazonElasticMapReduceClient.runJobFlow(createJobFlow(str3, str2, mainClass));
            } else {
                ArrayList arrayList = new ArrayList();
                if (!StringUtil.isEmpty(this.cmdLineArgs)) {
                    StringTokenizer stringTokenizer = new StringTokenizer(this.cmdLineArgs, " ");
                    while (stringTokenizer.hasMoreTokens()) {
                        String nextToken = stringTokenizer.nextToken();
                        logBasic("adding args: " + nextToken);
                        arrayList.add(nextToken);
                    }
                }
                HadoopJarStepConfig hadoopJarStepConfig = new HadoopJarStepConfig();
                hadoopJarStepConfig.setJar(str2);
                hadoopJarStepConfig.setMainClass(mainClass);
                hadoopJarStepConfig.setArgs(arrayList);
                StepConfig stepConfig = new StepConfig();
                stepConfig.setName("custom jar: " + this.jarUrl);
                stepConfig.setHadoopJarStep(hadoopJarStepConfig);
                ArrayList arrayList2 = new ArrayList();
                arrayList2.add(stepConfig);
                AddJobFlowStepsRequest addJobFlowStepsRequest = new AddJobFlowStepsRequest();
                addJobFlowStepsRequest.setJobFlowId(this.hadoopJobFlowId);
                addJobFlowStepsRequest.setSteps(arrayList2);
                amazonElasticMapReduceClient.addJobFlowSteps(addJobFlowStepsRequest);
            }
            String environmentSubstitute = environmentSubstitute(this.loggingInterval);
            int i2 = 60;
            try {
                i2 = Integer.parseInt(environmentSubstitute);
            } catch (NumberFormatException e3) {
                logError("Unable to parse logging interval '" + environmentSubstitute + "' - using default of 60");
            }
            if (this.blocking) {
                try {
                    if (this.log.isBasic()) {
                        String str4 = "RUNNING";
                        ArrayList arrayList3 = new ArrayList();
                        String str5 = this.hadoopJobFlowId;
                        if (StringUtil.isEmpty(this.hadoopJobFlowId)) {
                            str5 = runJobFlowResult.getJobFlowId();
                            arrayList3.add(str5);
                        }
                        while (isRunning(str4)) {
                            DescribeJobFlowsRequest describeJobFlowsRequest = new DescribeJobFlowsRequest();
                            describeJobFlowsRequest.setJobFlowIds(arrayList3);
                            boolean z = false;
                            for (JobFlowDetail jobFlowDetail : amazonElasticMapReduceClient.describeJobFlows(describeJobFlowsRequest).getJobFlows()) {
                                if (jobFlowDetail.getJobFlowId().equals(str5)) {
                                    str4 = jobFlowDetail.getExecutionStatusDetail().getState();
                                    z = true;
                                }
                            }
                            if (!z) {
                                break;
                            }
                            logBasic(this.hadoopJobName + " execution status: " + str4);
                            try {
                                if (isRunning(str4)) {
                                    Thread.sleep(i2 * 1000);
                                }
                            } catch (InterruptedException e4) {
                            }
                        }
                        if ("FAILED".equalsIgnoreCase(str4)) {
                            result.setStopped(true);
                            result.setNrErrors(1L);
                            result.setResult(false);
                            S3Object object = amazonS3Client.getObject(baseName, str5 + "/steps/1/stdout");
                            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
                            IOUtils.copy(object.getObjectContent(), byteArrayOutputStream);
                            logError(byteArrayOutputStream.toString());
                            S3Object object2 = amazonS3Client.getObject(baseName, str5 + "/steps/1/stderr");
                            ByteArrayOutputStream byteArrayOutputStream2 = new ByteArrayOutputStream();
                            IOUtils.copy(object2.getObjectContent(), byteArrayOutputStream2);
                            logError(byteArrayOutputStream2.toString());
                        }
                    }
                } catch (Exception e5) {
                    logError(e5.getMessage(), e5);
                }
            }
        } catch (Throwable th) {
            th.printStackTrace();
            result.setStopped(true);
            result.setNrErrors(1L);
            result.setResult(false);
            logError(th.getMessage(), th);
        }
        if (appender != null) {
            LogWriter.getInstance().removeAppender(appender);
            appender.close();
            ResultFile resultFile = new ResultFile(1, appender.getFile(), this.parentJob.getJobname(), getName());
            result.getResultFiles().put(resultFile.getFile().toString(), resultFile);
        }
        return result;
    }

    public RunJobFlowRequest createJobFlow(String str, String str2, String str3) {
        ArrayList arrayList = new ArrayList();
        if (!StringUtil.isEmpty(this.cmdLineArgs)) {
            StringTokenizer stringTokenizer = new StringTokenizer(this.cmdLineArgs, " ");
            while (stringTokenizer.hasMoreTokens()) {
                String nextToken = stringTokenizer.nextToken();
                logBasic("adding args: " + nextToken);
                arrayList.add(nextToken);
            }
        }
        HadoopJarStepConfig hadoopJarStepConfig = new HadoopJarStepConfig();
        hadoopJarStepConfig.setJar(str2);
        hadoopJarStepConfig.setMainClass(str3);
        hadoopJarStepConfig.setArgs(arrayList);
        StepConfig stepConfig = new StepConfig();
        stepConfig.setName("custom jar: " + this.jarUrl);
        stepConfig.setHadoopJarStep(hadoopJarStepConfig);
        ArrayList arrayList2 = new ArrayList();
        arrayList2.add(stepConfig);
        String environmentSubstitute = environmentSubstitute(this.numInstances);
        int i = 2;
        try {
            i = Integer.parseInt(environmentSubstitute);
        } catch (NumberFormatException e) {
            logError("Unable to parse number of instances to use '" + environmentSubstitute + "' - using 2 instances...");
        }
        JobFlowInstancesConfig jobFlowInstancesConfig = new JobFlowInstancesConfig();
        jobFlowInstancesConfig.setInstanceCount(Integer.valueOf(i));
        jobFlowInstancesConfig.setMasterInstanceType(getInstanceType(this.masterInstanceType));
        jobFlowInstancesConfig.setSlaveInstanceType(getInstanceType(this.slaveInstanceType));
        jobFlowInstancesConfig.setHadoopVersion("0.20");
        RunJobFlowRequest runJobFlowRequest = new RunJobFlowRequest();
        runJobFlowRequest.setSteps(arrayList2);
        runJobFlowRequest.setLogUri(str);
        runJobFlowRequest.setName(this.hadoopJobName);
        runJobFlowRequest.setInstances(jobFlowInstancesConfig);
        return runJobFlowRequest;
    }

    public static String getInstanceType(String str) {
        return str.substring(str.lastIndexOf("[") + 1, str.lastIndexOf("]"));
    }

    public static boolean isRunning(String str) {
        return ("COMPLETED".equalsIgnoreCase(str) || "FAILED".equalsIgnoreCase(str) || "TERMINATED".equalsIgnoreCase(str)) ? false : true;
    }

    public void loadXML(Node node, List<DatabaseMeta> list, List<SlaveServer> list2, Repository repository, IMetaStore iMetaStore) throws KettleXMLException {
        super.loadXML(node, list, list2);
        this.hadoopJobName = XMLHandler.getTagValue(node, "hadoop_job_name");
        this.hadoopJobFlowId = XMLHandler.getTagValue(node, "hadoop_job_flow_id");
        this.jarUrl = XMLHandler.getTagValue(node, "jar_url");
        this.accessKey = Encr.decryptPasswordOptionallyEncrypted(XMLHandler.getTagValue(node, "access_key"));
        this.secretKey = Encr.decryptPasswordOptionallyEncrypted(XMLHandler.getTagValue(node, "secret_key"));
        this.stagingDir = XMLHandler.getTagValue(node, "staging_dir");
        this.numInstances = XMLHandler.getTagValue(node, "num_instances");
        this.masterInstanceType = XMLHandler.getTagValue(node, "master_instance_type");
        this.slaveInstanceType = XMLHandler.getTagValue(node, "slave_instance_type");
        this.cmdLineArgs = XMLHandler.getTagValue(node, "command_line_args");
        this.blocking = "Y".equalsIgnoreCase(XMLHandler.getTagValue(node, AbstractAmazonJobExecutorController.BLOCKING));
        this.loggingInterval = XMLHandler.getTagValue(node, "logging_interval");
    }

    public String getXML() {
        StringBuffer stringBuffer = new StringBuffer(1024);
        stringBuffer.append(super.getXML());
        stringBuffer.append("      ").append(XMLHandler.addTagValue("hadoop_job_name", this.hadoopJobName));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("hadoop_job_flow_id", this.hadoopJobFlowId));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("jar_url", this.jarUrl));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("access_key", Encr.encryptPasswordIfNotUsingVariables(this.accessKey)));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("secret_key", Encr.encryptPasswordIfNotUsingVariables(this.secretKey)));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("staging_dir", this.stagingDir));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("num_instances", this.numInstances));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("master_instance_type", this.masterInstanceType));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("slave_instance_type", this.slaveInstanceType));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("command_line_args", this.cmdLineArgs));
        stringBuffer.append("      ").append(XMLHandler.addTagValue(AbstractAmazonJobExecutorController.BLOCKING, this.blocking));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("logging_interval", this.loggingInterval));
        stringBuffer.append("      ").append(XMLHandler.addTagValue("hadoop_job_name", this.hadoopJobName));
        return stringBuffer.toString();
    }

    public void loadRep(Repository repository, IMetaStore iMetaStore, ObjectId objectId, List<DatabaseMeta> list, List<SlaveServer> list2) throws KettleException {
        if (repository == null) {
            throw new KettleException("Unable to save to a repository. The repository is null.");
        }
        super.loadRep(repository, iMetaStore, objectId, list, list2);
        setHadoopJobName(repository.getJobEntryAttributeString(objectId, "hadoop_job_name"));
        setHadoopJobFlowId(repository.getJobEntryAttributeString(objectId, "hadoop_job_flow_id"));
        setJarUrl(repository.getJobEntryAttributeString(objectId, "jar_url"));
        setAccessKey(Encr.decryptPasswordOptionallyEncrypted(repository.getJobEntryAttributeString(objectId, "access_key")));
        setSecretKey(Encr.decryptPasswordOptionallyEncrypted(repository.getJobEntryAttributeString(objectId, "secret_key")));
        setStagingDir(repository.getJobEntryAttributeString(objectId, "staging_dir"));
        setNumInstances(repository.getJobEntryAttributeString(objectId, "num_instances"));
        setMasterInstanceType(repository.getJobEntryAttributeString(objectId, "master_instance_type"));
        setSlaveInstanceType(repository.getJobEntryAttributeString(objectId, "slave_instance_type"));
        setCmdLineArgs(repository.getJobEntryAttributeString(objectId, "command_line_args"));
        setBlocking(repository.getJobEntryAttributeBoolean(objectId, AbstractAmazonJobExecutorController.BLOCKING));
        setLoggingInterval(repository.getJobEntryAttributeString(objectId, "logging_interval"));
    }

    public void saveRep(Repository repository, IMetaStore iMetaStore, ObjectId objectId) throws KettleException {
        if (repository == null) {
            throw new KettleException("Unable to save to a repository. The repository is null.");
        }
        super.saveRep(repository, iMetaStore, objectId);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "hadoop_job_name", this.hadoopJobName);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "hadoop_job_flow_id", this.hadoopJobFlowId);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "jar_url", this.jarUrl);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "secret_key", Encr.encryptPasswordIfNotUsingVariables(this.secretKey));
        repository.saveJobEntryAttribute(objectId, getObjectId(), "access_key", Encr.encryptPasswordIfNotUsingVariables(this.accessKey));
        repository.saveJobEntryAttribute(objectId, getObjectId(), "staging_dir", this.stagingDir);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "num_instances", this.numInstances);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "master_instance_type", this.masterInstanceType);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "slave_instance_type", this.slaveInstanceType);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "command_line_args", this.cmdLineArgs);
        repository.saveJobEntryAttribute(objectId, getObjectId(), AbstractAmazonJobExecutorController.BLOCKING, this.blocking);
        repository.saveJobEntryAttribute(objectId, getObjectId(), "logging_interval", this.loggingInterval);
    }

    public String buildFilename(String str) {
        return environmentSubstitute(str);
    }

    public boolean evaluates() {
        return true;
    }

    public boolean isUnconditional() {
        return true;
    }

    public String getDialogClassName() {
        return getClass().getCanonicalName().replaceFirst("\\.job\\.", ".ui.") + "Dialog";
    }
}
