package com.google.cloud.dataflow.contrib.hadoop;

import com.google.api.client.util.Maps;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.StringUtf8Coder;
import com.google.cloud.dataflow.sdk.io.Sink;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.values.KV;
import com.google.common.base.Preconditions;
import com.google.common.collect.Sets;
import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.TaskID;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;

/* loaded from: input_file:com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink.class */
public class HadoopFileSink<K, V> extends Sink<KV<K, V>> {
    private static final String jtIdentifier = "scio_job";
    protected final String path;
    protected final Class<? extends FileOutputFormat<K, V>> formatClass;
    private final Map<String, String> map;

    /* loaded from: input_file:com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink$HadoopWriteOperation.class */
    public static class HadoopWriteOperation<K, V> extends Sink.WriteOperation<KV<K, V>, String> {
        private final Sink<KV<K, V>> sink;
        protected final String path;
        protected final Class<? extends FileOutputFormat<K, V>> formatClass;
        private final int jobId = (int) (System.currentTimeMillis() / 1000);

        public HadoopWriteOperation(Sink<KV<K, V>> sink, String str, Class<? extends FileOutputFormat<K, V>> cls) {
            this.sink = sink;
            this.path = str;
            this.formatClass = cls;
        }

        public void initialize(PipelineOptions pipelineOptions) throws Exception {
            FileOutputFormat.setOutputPath(((HadoopFileSink) getSink()).jobInstance(), new Path(this.path));
        }

        public void finalize(Iterable<String> iterable, PipelineOptions pipelineOptions) throws Exception {
            Job jobInstance = ((HadoopFileSink) getSink()).jobInstance();
            JobContextImpl jobContextImpl = new JobContextImpl(jobInstance.getConfiguration(), jobID());
            new FileOutputCommitter(new Path(this.path), jobContextImpl).commitJob(jobContextImpl);
            HashSet newHashSet = Sets.newHashSet();
            FileSystem fileSystem = FileSystem.get(jobInstance.getConfiguration());
            FileStatus[] listStatus = fileSystem.listStatus(new Path(this.path), new PathFilter() { // from class: com.google.cloud.dataflow.contrib.hadoop.HadoopFileSink.HadoopWriteOperation.1
                public boolean accept(Path path) {
                    String name = path.getName();
                    return (name.startsWith("_") || name.startsWith(".")) ? false : true;
                }
            });
            HashSet newHashSet2 = Sets.newHashSet(iterable);
            for (FileStatus fileStatus : listStatus) {
                String name = fileStatus.getPath().getName();
                int indexOf = name.indexOf(46);
                newHashSet.add(indexOf > 0 ? name.substring(0, indexOf) : name);
            }
            Preconditions.checkState(newHashSet.equals(newHashSet2), "Writer results and output files do not match");
            int i = 0;
            for (FileStatus fileStatus2 : listStatus) {
                String name2 = fileStatus2.getPath().getName();
                int indexOf2 = name2.indexOf(46);
                fileSystem.rename(fileStatus2.getPath(), new Path(fileStatus2.getPath().getParent(), String.format("part-r-%05d%s", Integer.valueOf(i), indexOf2 > 0 ? name2.substring(indexOf2) : "")));
                i++;
            }
        }

        public Sink.Writer<KV<K, V>, String> createWriter(PipelineOptions pipelineOptions) throws Exception {
            return new HadoopWriter(this, this.path, this.formatClass);
        }

        public Sink<KV<K, V>> getSink() {
            return this.sink;
        }

        public Coder<String> getWriterResultCoder() {
            return StringUtf8Coder.of();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public JobID jobID() {
            return new JobID(HadoopFileSink.jtIdentifier, this.jobId);
        }
    }

    /* loaded from: input_file:com/google/cloud/dataflow/contrib/hadoop/HadoopFileSink$HadoopWriter.class */
    public static class HadoopWriter<K, V> extends Sink.Writer<KV<K, V>, String> {
        private final HadoopWriteOperation<K, V> writeOperation;
        private final String path;
        private final Class<? extends FileOutputFormat<K, V>> formatClass;
        private int hash;
        private TaskAttemptContext context;
        private RecordWriter<K, V> recordWriter;
        private FileOutputCommitter outputCommitter;

        public HadoopWriter(HadoopWriteOperation<K, V> hadoopWriteOperation, String str, Class<? extends FileOutputFormat<K, V>> cls) {
            this.writeOperation = hadoopWriteOperation;
            this.path = str;
            this.formatClass = cls;
        }

        public void open(String str) throws Exception {
            this.hash = str.hashCode();
            Job jobInstance = ((HadoopFileSink) getWriteOperation().getSink()).jobInstance();
            FileOutputFormat.setOutputPath(jobInstance, new Path(this.path));
            this.context = new TaskAttemptContextImpl(jobInstance.getConfiguration(), new TaskAttemptID(new TaskID(this.writeOperation.jobID(), TaskType.REDUCE, this.hash), 0));
            FileOutputFormat<K, V> newInstance = this.formatClass.newInstance();
            this.recordWriter = newInstance.getRecordWriter(this.context);
            this.outputCommitter = newInstance.getOutputCommitter(this.context);
        }

        public void write(KV<K, V> kv) throws Exception {
            this.recordWriter.write(kv.getKey(), kv.getValue());
        }

        @Override // 
        /* renamed from: close, reason: merged with bridge method [inline-methods] */
        public String mo2close() throws Exception {
            this.recordWriter.close(this.context);
            this.outputCommitter.commitTask(this.context);
            return String.format("part-r-%d", Integer.valueOf(this.hash));
        }

        public Sink.WriteOperation<KV<K, V>, String> getWriteOperation() {
            return this.writeOperation;
        }
    }

    public HadoopFileSink(String str, Class<? extends FileOutputFormat<K, V>> cls) {
        this.path = str;
        this.formatClass = cls;
        this.map = Maps.newHashMap();
    }

    /* JADX WARN: Multi-variable type inference failed */
    public HadoopFileSink(String str, Class<? extends FileOutputFormat<K, V>> cls, Configuration configuration) {
        this(str, cls);
        Iterator it = configuration.iterator();
        while (it.hasNext()) {
            Map.Entry entry = (Map.Entry) it.next();
            this.map.put(entry.getKey(), entry.getValue());
        }
    }

    public void validate(PipelineOptions pipelineOptions) {
        try {
            Preconditions.checkState(!FileSystem.get(jobInstance().getConfiguration()).exists(new Path(this.path)), "Output path " + this.path + " already exists");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    public Sink.WriteOperation<KV<K, V>, ?> createWriteOperation(PipelineOptions pipelineOptions) {
        return new HadoopWriteOperation(this, this.path, this.formatClass);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public Job jobInstance() throws IOException {
        Job job = Job.getInstance();
        Configuration configuration = job.getConfiguration();
        for (Map.Entry<String, String> entry : this.map.entrySet()) {
            configuration.set(entry.getKey(), entry.getValue());
        }
        return job;
    }
}
