package com.hazelcast.jet.hadoop.impl;

import com.hazelcast.cluster.Address;
import com.hazelcast.function.BiFunctionEx;
import com.hazelcast.jet.JetException;
import com.hazelcast.jet.Traverser;
import com.hazelcast.jet.Traversers;
import com.hazelcast.jet.core.AbstractProcessor;
import com.hazelcast.jet.core.Processor;
import com.hazelcast.jet.core.ProcessorMetaSupplier;
import com.hazelcast.jet.core.ProcessorSupplier;
import com.hazelcast.jet.hadoop.HadoopSources;
import com.hazelcast.jet.impl.util.Util;
import com.hazelcast.jet.pipeline.file.impl.FileTraverser;
import com.hazelcast.logging.Logger;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RawLocalFileSystem;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.InputFormat;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.InvalidInputException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;

/* loaded from: input_file:com/hazelcast/jet/hadoop/impl/ReadHadoopOldApiP.class */
public final class ReadHadoopOldApiP<K, V, R> extends AbstractProcessor {
    private final HadoopFileTraverser<K, V, R> traverser;

    /* loaded from: input_file:com/hazelcast/jet/hadoop/impl/ReadHadoopOldApiP$HadoopFileTraverser.class */
    private static final class HadoopFileTraverser<K, V, R> implements FileTraverser<R> {
        private final JobConf jobConf;
        private final InputFormat<K, V> inputFormat;
        private final BiFunctionEx<K, V, R> projectionFn;
        private final Traverser<R> delegate;
        private RecordReader<K, V> reader;

        private HadoopFileTraverser(JobConf jobConf, List<InputSplit> list, BiFunctionEx<K, V, R> biFunctionEx) {
            this.jobConf = jobConf;
            this.inputFormat = jobConf.getInputFormat();
            this.projectionFn = biFunctionEx;
            this.delegate = Traversers.traverseIterable(list).flatMap(this::traverseSplit);
        }

        private Traverser<R> traverseSplit(InputSplit inputSplit) {
            this.reader = (RecordReader) Util.uncheckCall(() -> {
                return this.inputFormat.getRecordReader(inputSplit, this.jobConf, Reporter.NULL);
            });
            return () -> {
                return Util.uncheckCall(() -> {
                    Object createKey = this.reader.createKey();
                    Object createValue = this.reader.createValue();
                    while (this.reader.next(createKey, createValue)) {
                        Object apply = this.projectionFn.apply(createKey, createValue);
                        if (apply != null) {
                            return apply;
                        }
                    }
                    this.reader.close();
                    return null;
                });
            };
        }

        public R next() {
            return (R) this.delegate.next();
        }

        public void close() throws IOException {
            if (this.reader != null) {
                this.reader.close();
            }
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/hadoop/impl/ReadHadoopOldApiP$MetaSupplier.class */
    public static class MetaSupplier<K, V, R> extends ReadHdfsMetaSupplierBase<R> {
        static final long serialVersionUID = 1;

        @SuppressFBWarnings({"SE_BAD_FIELD"})
        private final JobConf jobConf;
        private final BiFunctionEx<K, V, R> projectionFn;
        private transient Map<Address, List<IndexedInputSplit>> assigned;

        public MetaSupplier(@Nonnull JobConf jobConf, @Nonnull BiFunctionEx<K, V, R> biFunctionEx) {
            this.jobConf = jobConf;
            this.projectionFn = biFunctionEx;
        }

        @Override // com.hazelcast.jet.hadoop.impl.ReadHdfsMetaSupplierBase
        public void init(@Nonnull ProcessorMetaSupplier.Context context) throws Exception {
            super.init(context);
            if (ReadHadoopOldApiP.shouldSplitOnMembers(this.jobConf)) {
                this.assigned = new HashMap();
                return;
            }
            InputSplit[] splits = ReadHadoopOldApiP.getSplits(this.jobConf, context.totalParallelism());
            IndexedInputSplit[] indexedInputSplitArr = new IndexedInputSplit[splits.length];
            Arrays.setAll(indexedInputSplitArr, i -> {
                return new IndexedInputSplit(i, splits[i]);
            });
            this.assigned = assignSplitsToMembers(indexedInputSplitArr, (Address[]) context.hazelcastInstance().getCluster().getMembers().stream().map((v0) -> {
                return v0.getAddress();
            }).toArray(i2 -> {
                return new Address[i2];
            }));
            printAssignments(this.assigned);
        }

        @Nonnull
        public Function<Address, ProcessorSupplier> get(@Nonnull List<Address> list) {
            return address -> {
                return new Supplier(this.jobConf, this.assigned.getOrDefault(address, Collections.emptyList()), this.projectionFn);
            };
        }

        public FileTraverser<R> traverser() throws Exception {
            return new HadoopFileTraverser(this.jobConf, Arrays.asList(ReadHadoopOldApiP.getSplits(this.jobConf, 1)), this.projectionFn);
        }

        public boolean closeIsCooperative() {
            return true;
        }
    }

    /* loaded from: input_file:com/hazelcast/jet/hadoop/impl/ReadHadoopOldApiP$Supplier.class */
    private static final class Supplier<K, V, R> implements ProcessorSupplier {
        static final long serialVersionUID = 1;

        @SuppressFBWarnings({"SE_BAD_FIELD"})
        private final JobConf jobConf;
        private final BiFunctionEx<K, V, R> projectionFn;
        private final List<IndexedInputSplit> assignedSplits;

        private Supplier(@Nonnull JobConf jobConf, @Nonnull List<IndexedInputSplit> list, @Nonnull BiFunctionEx<K, V, R> biFunctionEx) {
            this.jobConf = jobConf;
            this.projectionFn = biFunctionEx;
            this.assignedSplits = list;
        }

        @Nonnull
        /* renamed from: get, reason: merged with bridge method [inline-methods] */
        public List<Processor> m5get(int i) {
            return (List) Util.distributeObjects(i, ReadHadoopOldApiP.shouldSplitOnMembers(this.jobConf) ? (List) Util.uncheckCall(() -> {
                return Arrays.asList(ReadHadoopOldApiP.getSplits(this.jobConf, i));
            }) : (List) this.assignedSplits.stream().map((v0) -> {
                return v0.getOldSplit();
            }).collect(Collectors.toList())).values().stream().map(list -> {
                return new ReadHadoopOldApiP(this.jobConf, list, this.projectionFn);
            }).collect(Collectors.toList());
        }
    }

    private ReadHadoopOldApiP(@Nonnull JobConf jobConf, @Nonnull List<InputSplit> list, @Nonnull BiFunctionEx<K, V, R> biFunctionEx) {
        this.traverser = new HadoopFileTraverser<>(jobConf, list, biFunctionEx);
    }

    public boolean isCooperative() {
        return false;
    }

    public boolean complete() {
        return emitFromTraverser(this.traverser);
    }

    public void close() throws Exception {
        this.traverser.close();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <K, V> InputSplit[] getSplits(JobConf jobConf, int i) throws IOException {
        try {
            return jobConf.getInputFormat().getSplits(jobConf, i);
        } catch (InvalidInputException e) {
            String str = jobConf.get("mapreduce.input.fileinputformat.inputdir", "");
            if (!jobConf.getBoolean(HadoopSources.IGNORE_FILE_NOT_FOUND, true)) {
                throw new JetException("The input " + str + " matches no files");
            }
            Logger.getLogger(ReadHadoopNewApiP.class).fine("The directory '" + str + "' does not exist. This source will emit 0 items.");
            return new InputSplit[0];
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean shouldSplitOnMembers(JobConf jobConf) {
        if (jobConf.getBoolean(HadoopSources.SHARED_LOCAL_FS, false)) {
            return false;
        }
        boolean z = false;
        boolean z2 = false;
        for (Path path : FileInputFormat.getInputPaths(jobConf)) {
            if (isLocalFileSystem(path, jobConf)) {
                z = true;
            } else {
                z2 = true;
            }
        }
        if (z && z2) {
            throw new IllegalArgumentException("LocalFileSystem should be marked as shared when used with other remote file systems");
        }
        return z;
    }

    private static boolean isLocalFileSystem(Path path, JobConf jobConf) {
        FileSystem fileSystem = (FileSystem) Util.uncheckCall(() -> {
            return path.getFileSystem(jobConf);
        });
        return (fileSystem instanceof LocalFileSystem) || (fileSystem instanceof RawLocalFileSystem);
    }
}
