package org.deephacks.jobpipe.spark;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.apache.spark.launcher.SparkLauncher;
import org.deephacks.jobpipe.PathSubstitutor;
import org.deephacks.jobpipe.Task;
import org.deephacks.jobpipe.TaskContext;
import org.deephacks.jobpipe.TaskOutput;
import org.joda.time.DateTime;

/* loaded from: input_file:org/deephacks/jobpipe/spark/SparkTask.class */
public class SparkTask implements Task {
    protected SparkTaskBuilder builder;
    protected String appName;

    /* loaded from: input_file:org/deephacks/jobpipe/spark/SparkTask$SparkTaskBuilder.class */
    public static class SparkTaskBuilder {
        Function<String, TaskOutput> output;
        String basePath;
        String outputPathPattern;
        String inputPathPattern;
        Boolean verbose;
        String appName;
        String appResource;
        String deployMode;
        Class<?> mainClass;
        String master;
        String sparkHome;
        protected String propertiesFile;
        final List<String> appArgs;
        final List<String> sparkArgs;
        final List<String> jars;
        final List<String> files;
        final List<String> pyFiles;
        final Map<String, String> conf;

        private SparkTaskBuilder(Class<?> cls) {
            this.outputPathPattern = "${basePath}/${appName}/${year}-${month}-${day}T${hour}";
            this.inputPathPattern = "${basePath}/${appName}/${year}-${month}-${day}T${hour}";
            this.appArgs = new ArrayList();
            this.sparkArgs = new ArrayList();
            this.conf = new HashMap();
            this.files = new ArrayList();
            this.jars = new ArrayList();
            this.pyFiles = new ArrayList();
            this.mainClass = cls;
        }

        public static SparkTaskBuilder newBuilder(Class<?> cls) {
            Objects.nonNull(cls);
            return new SparkTaskBuilder(cls);
        }

        public SparkTaskBuilder setOutputPattern(String str) {
            this.outputPathPattern = str;
            return this;
        }

        public SparkTaskBuilder setInputPattern(String str) {
            this.inputPathPattern = str;
            return this;
        }

        public SparkTaskBuilder setBasePath(String str) {
            this.basePath = str;
            return this;
        }

        public SparkTaskBuilder setConfig(String str, String str2) {
            this.conf.put(str, str2);
            return this;
        }

        public SparkTaskBuilder addAppArgs(String... strArr) {
            this.appArgs.addAll(Arrays.asList(strArr));
            return this;
        }

        public SparkTaskBuilder setPropertiesFile(String str) {
            this.propertiesFile = str;
            return this;
        }

        public SparkTaskBuilder setDeployMode(String str) {
            this.deployMode = str;
            return this;
        }

        public SparkTaskBuilder addSparkArg(String str) {
            this.sparkArgs.add(str);
            return this;
        }

        public SparkTaskBuilder addFile(String str) {
            this.files.add(str);
            return this;
        }

        public SparkTaskBuilder setMaster(String str) {
            this.master = str;
            return this;
        }

        public SparkTaskBuilder addPyFile(String str) {
            this.pyFiles.add(str);
            return this;
        }

        public SparkTaskBuilder setSparkHome(String str) {
            this.sparkHome = str;
            return this;
        }

        public SparkTaskBuilder addJar(String str) {
            this.jars.add(str);
            return this;
        }

        public SparkTaskBuilder setVerbose(boolean z) {
            this.verbose = Boolean.valueOf(z);
            return this;
        }

        public SparkTaskBuilder setAppName(String str) {
            this.appName = str;
            return this;
        }

        public String getAppName() {
            return this.appName == null ? this.mainClass.getSimpleName() : this.appName;
        }

        public String getMaster() {
            return this.master == null ? "local" : this.master;
        }

        public SparkTaskBuilder setAppResource(String str) {
            this.appResource = str;
            return this;
        }

        public SparkTask build() {
            return new SparkTask(this);
        }

        public SparkTaskBuilder setOutput(Function<String, TaskOutput> function) {
            this.output = function;
            return this;
        }
    }

    private SparkTask(SparkTaskBuilder sparkTaskBuilder) {
        this.builder = sparkTaskBuilder;
        this.appName = sparkTaskBuilder.getAppName();
    }

    public static SparkTaskBuilder newBuilder(Class<?> cls) {
        return new SparkTaskBuilder(cls);
    }

    public void execute(TaskContext taskContext) {
        try {
            SparkLauncher createLauncher = createLauncher();
            List list = (List) taskContext.getDependecyOutput().stream().map(taskOutput -> {
                return taskOutput.get().toString();
            }).collect(Collectors.toList());
            DateTime from = taskContext.getTimeRange().from();
            createLauncher.addAppArgs(new SparkArgs(this.builder.getAppName(), this.builder.master, getInputPath(from), getOutputPath(from), list).toArgs(taskContext.getArgs()));
            Process launch = createLauncher.launch();
            new Thread(new InputStreamThread(launch.getInputStream(), System.out)).start();
            new Thread(new InputStreamThread(launch.getErrorStream(), System.out)).start();
            launch.waitFor();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public TaskOutput getOutput(TaskContext taskContext) {
        return this.builder.output.apply(getOutputPath(taskContext.getTimeRange().from()));
    }

    private String getOutputPath(DateTime dateTime) {
        return PathSubstitutor.newBuilder(dateTime).pattern(this.builder.outputPathPattern).basePath(this.builder.basePath).sub("appName", this.appName).replace();
    }

    private String getInputPath(DateTime dateTime) {
        return PathSubstitutor.newBuilder(dateTime).pattern(this.builder.inputPathPattern).basePath(this.builder.basePath).sub("appName", this.appName).replace();
    }

    SparkLauncher createLauncher() {
        SparkLauncher sparkLauncher = new SparkLauncher();
        sparkLauncher.setMainClass(this.builder.mainClass.getName());
        if (this.builder.verbose != null) {
            sparkLauncher.setVerbose(this.builder.verbose.booleanValue());
        }
        sparkLauncher.setAppName(this.builder.getAppName());
        if (this.builder.appResource != null) {
            sparkLauncher.setAppResource(this.builder.appResource);
        } else {
            sparkLauncher.setAppResource(getJarAbsolutePath(this.builder.mainClass));
        }
        if (this.builder.deployMode != null) {
            sparkLauncher.setDeployMode(this.builder.deployMode);
        }
        if (this.builder.master != null) {
            sparkLauncher.setMaster(this.builder.master);
        }
        if (this.builder.propertiesFile != null) {
            sparkLauncher.setPropertiesFile(this.builder.propertiesFile);
        }
        if (this.builder.sparkHome != null) {
            sparkLauncher.setSparkHome(this.builder.sparkHome);
        }
        sparkLauncher.addJar(getJarAbsolutePath(Task.class));
        sparkLauncher.addJar(getJarAbsolutePath(SparkTask.class));
        this.builder.appArgs.forEach(str -> {
            sparkLauncher.addAppArgs(new String[]{str});
        });
        this.builder.files.forEach(str2 -> {
            sparkLauncher.addFile(str2);
        });
        this.builder.pyFiles.forEach(str3 -> {
            sparkLauncher.addPyFile(str3);
        });
        this.builder.jars.stream().forEach(str4 -> {
            sparkLauncher.addJar(str4);
        });
        this.builder.conf.forEach((str5, str6) -> {
            sparkLauncher.setConf(str5, str6);
        });
        return sparkLauncher;
    }

    private static String getJarAbsolutePath(Class<?> cls) {
        return getJarPath(cls).getAbsolutePath();
    }

    private static File getJarPath(Class<?> cls) {
        return new File(cls.getProtectionDomain().getCodeSource().getLocation().getPath());
    }
}
