package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import java.io.IOException;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import javax.annotation.Nullable;
import org.apache.beam.repackaged.direct_java.runners.core.construction.ReadTranslation;
import org.apache.beam.runners.direct.StepTransformResult;
import org.apache.beam.runners.direct.UnboundedReadDeduplicator;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.io.UnboundedSource;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.joda.time.Instant;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory.class */
public class UnboundedReadEvaluatorFactory implements TransformEvaluatorFactory {
    private static final double DEFAULT_READER_REUSE_CHANCE = 0.95d;
    private final EvaluationContext evaluationContext;
    private final PipelineOptions options;
    private final double readerReuseChance;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$InputProvider.class */
    public static class InputProvider<T> implements RootInputProvider<T, UnboundedSourceShard<T, ?>, PBegin> {
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;

        /* JADX INFO: Access modifiers changed from: package-private */
        public InputProvider(EvaluationContext evaluationContext, PipelineOptions pipelineOptions) {
            this.evaluationContext = evaluationContext;
            this.options = pipelineOptions;
        }

        @Override // org.apache.beam.runners.direct.RootInputProvider
        public Collection<CommittedBundle<UnboundedSourceShard<T, ?>>> getInitialInputs(AppliedPTransform<PBegin, PCollection<T>, PTransform<PBegin, PCollection<T>>> appliedPTransform, int i) throws Exception {
            UnboundedSource unboundedSourceFromTransform = ReadTranslation.unboundedSourceFromTransform(appliedPTransform);
            List split = unboundedSourceFromTransform.split(i, this.options);
            UnboundedReadDeduplicator create = unboundedSourceFromTransform.requiresDeduping() ? UnboundedReadDeduplicator.CachedIdDeduplicator.create() : UnboundedReadDeduplicator.NeverDeduplicator.create();
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator it = split.iterator();
            while (it.hasNext()) {
                builder.add(this.evaluationContext.createRootBundle().add(WindowedValue.valueInGlobalWindow(UnboundedSourceShard.unstarted((UnboundedSource) it.next(), create))).commit(BoundedWindow.TIMESTAMP_MAX_VALUE));
            }
            return builder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$UnboundedReadEvaluator.class */
    public static class UnboundedReadEvaluator<OutputT, CheckpointMarkT extends UnboundedSource.CheckpointMark> implements TransformEvaluator<UnboundedSourceShard<OutputT, CheckpointMarkT>> {
        private static final int ARBITRARY_MAX_ELEMENTS = 10;
        private final AppliedPTransform<?, PCollection<OutputT>, ?> transform;
        private final EvaluationContext evaluationContext;
        private final PipelineOptions options;
        private final double readerReuseChance;
        private final StepTransformResult.Builder resultBuilder;

        public UnboundedReadEvaluator(AppliedPTransform<?, PCollection<OutputT>, ?> appliedPTransform, EvaluationContext evaluationContext, PipelineOptions pipelineOptions, double d) {
            this.transform = appliedPTransform;
            this.evaluationContext = evaluationContext;
            this.options = pipelineOptions;
            this.readerReuseChance = d;
            this.resultBuilder = StepTransformResult.withoutHold(appliedPTransform);
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public void processElement(WindowedValue<UnboundedSourceShard<OutputT, CheckpointMarkT>> windowedValue) throws IOException {
            UncommittedBundle<?> createBundle = this.evaluationContext.createBundle((PCollection) Iterables.getOnlyElement(this.transform.getOutputs().values()));
            UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard = (UnboundedSourceShard) windowedValue.getValue();
            UnboundedSource.UnboundedReader<OutputT> unboundedReader = null;
            try {
                unboundedReader = getReader(unboundedSourceShard);
                if (startReader(unboundedReader, unboundedSourceShard)) {
                    UnboundedReadDeduplicator deduplicator = unboundedSourceShard.getDeduplicator();
                    int i = 0;
                    do {
                        if (deduplicator.shouldOutput(unboundedReader.getCurrentRecordId())) {
                            createBundle.add(WindowedValue.timestampedValueInGlobalWindow(unboundedReader.getCurrent(), unboundedReader.getCurrentTimestamp()));
                        }
                        i++;
                        if (i >= 10) {
                            break;
                        }
                    } while (unboundedReader.advance());
                    Instant watermark = unboundedReader.getWatermark();
                    CheckpointMarkT finishRead = finishRead(unboundedReader, watermark, unboundedSourceShard);
                    if (ThreadLocalRandom.current().nextDouble(1.0d) >= this.readerReuseChance) {
                        unboundedReader = null;
                        unboundedReader.close();
                    }
                    this.resultBuilder.addOutput(createBundle, new UncommittedBundle[0]).addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(UnboundedSourceShard.of(unboundedSourceShard.mo166getSource(), unboundedSourceShard.getDeduplicator(), unboundedReader, finishRead), watermark)));
                } else {
                    Instant watermark2 = unboundedReader.getWatermark();
                    if (watermark2.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                        this.resultBuilder.addUnprocessedElements(Collections.singleton(WindowedValue.timestampedValueInGlobalWindow(UnboundedSourceShard.of(unboundedSourceShard.mo166getSource(), unboundedSourceShard.getDeduplicator(), unboundedReader, unboundedSourceShard.getCheckpoint()), watermark2)));
                    } else {
                        CheckpointMarkT checkpoint = unboundedSourceShard.getCheckpoint();
                        IOException iOException = null;
                        if (checkpoint != null) {
                            try {
                                checkpoint.finalizeCheckpoint();
                            } catch (IOException e) {
                                iOException = e;
                                try {
                                    unboundedReader = null;
                                    unboundedReader.close();
                                } catch (IOException e2) {
                                    if (iOException == null) {
                                        throw e2;
                                    }
                                    iOException.addSuppressed(e2);
                                }
                            } catch (Throwable th) {
                                try {
                                    unboundedReader = null;
                                    unboundedReader.close();
                                } catch (IOException e3) {
                                    if (0 == 0) {
                                        throw e3;
                                    }
                                    iOException.addSuppressed(e3);
                                }
                                throw th;
                            }
                        }
                        try {
                            unboundedReader = null;
                            unboundedReader.close();
                        } catch (IOException e4) {
                            if (0 == 0) {
                                throw e4;
                            }
                            iOException.addSuppressed(e4);
                        }
                        if (iOException != null) {
                            throw iOException;
                        }
                    }
                }
            } catch (IOException e5) {
                if (unboundedReader != null) {
                    unboundedReader.close();
                }
                throw e5;
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        /* JADX WARN: Type inference failed for: r0v14, types: [org.apache.beam.sdk.io.UnboundedSource$CheckpointMark] */
        private UnboundedSource.UnboundedReader<OutputT> getReader(UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard) throws IOException {
            UnboundedSource.UnboundedReader<OutputT> existingReader = unboundedSourceShard.getExistingReader();
            if (existingReader != null) {
                return existingReader;
            }
            CheckpointMarkT checkpoint = unboundedSourceShard.getCheckpoint();
            if (checkpoint != null) {
                checkpoint = (UnboundedSource.CheckpointMark) CoderUtils.clone(unboundedSourceShard.mo166getSource().getCheckpointMarkCoder(), checkpoint);
            }
            return unboundedSourceShard.mo166getSource().createReader(this.options, checkpoint);
        }

        private boolean startReader(UnboundedSource.UnboundedReader<OutputT> unboundedReader, UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard) throws IOException {
            return unboundedSourceShard.getExistingReader() == null ? unboundedReader.start() : unboundedSourceShard.getExistingReader().advance();
        }

        private CheckpointMarkT finishRead(UnboundedSource.UnboundedReader<OutputT> unboundedReader, Instant instant, UnboundedSourceShard<OutputT, CheckpointMarkT> unboundedSourceShard) throws IOException {
            CheckpointMarkT checkpoint = unboundedSourceShard.getCheckpoint();
            CheckpointMarkT checkpointmarkt = (CheckpointMarkT) unboundedReader.getCheckpointMark();
            if (checkpoint != null) {
                checkpoint.finalizeCheckpoint();
            }
            if (!instant.isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE)) {
                PCollection<?> pCollection = (PCollection) Iterables.getOnlyElement(this.transform.getOutputs().values());
                this.evaluationContext.scheduleAfterOutputWouldBeProduced(pCollection, (BoundedWindow) GlobalWindow.INSTANCE, pCollection.getWindowingStrategy(), () -> {
                    try {
                        checkpointmarkt.finalizeCheckpoint();
                    } catch (IOException e) {
                        throw new RuntimeException("Couldn't finalize checkpoint after the end of the Global Window", e);
                    }
                });
            }
            return checkpointmarkt;
        }

        @Override // org.apache.beam.runners.direct.TransformEvaluator
        public TransformResult<UnboundedSourceShard<OutputT, CheckpointMarkT>> finishBundle() throws IOException {
            return this.resultBuilder.build();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/UnboundedReadEvaluatorFactory$UnboundedSourceShard.class */
    public static abstract class UnboundedSourceShard<T, CheckpointT extends UnboundedSource.CheckpointMark> implements SourceShard<T> {
        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> unstarted(UnboundedSource<T, CheckpointT> unboundedSource, UnboundedReadDeduplicator unboundedReadDeduplicator) {
            return of(unboundedSource, unboundedReadDeduplicator, null, null);
        }

        static <T, CheckpointT extends UnboundedSource.CheckpointMark> UnboundedSourceShard<T, CheckpointT> of(UnboundedSource<T, CheckpointT> unboundedSource, UnboundedReadDeduplicator unboundedReadDeduplicator, @Nullable UnboundedSource.UnboundedReader<T> unboundedReader, @Nullable CheckpointT checkpointt) {
            return new AutoValue_UnboundedReadEvaluatorFactory_UnboundedSourceShard(unboundedSource, unboundedReadDeduplicator, unboundedReader, checkpointt);
        }

        @Override // org.apache.beam.runners.direct.SourceShard
        /* renamed from: getSource */
        public abstract UnboundedSource<T, CheckpointT> mo166getSource();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract UnboundedReadDeduplicator getDeduplicator();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract UnboundedSource.UnboundedReader<T> getExistingReader();

        /* JADX INFO: Access modifiers changed from: package-private */
        @Nullable
        public abstract CheckpointT getCheckpoint();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions pipelineOptions) {
        this(evaluationContext, pipelineOptions, DEFAULT_READER_REUSE_CHANCE);
    }

    @VisibleForTesting
    UnboundedReadEvaluatorFactory(EvaluationContext evaluationContext, PipelineOptions pipelineOptions, double d) {
        this.evaluationContext = evaluationContext;
        this.options = pipelineOptions;
        this.readerReuseChance = d;
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    @Nullable
    public <InputT> TransformEvaluator<InputT> forApplication(AppliedPTransform<?, ?, ?> appliedPTransform, CommittedBundle<?> committedBundle) {
        return (TransformEvaluator<InputT>) createEvaluator(appliedPTransform);
    }

    private <OutputT> TransformEvaluator<?> createEvaluator(AppliedPTransform<PBegin, PCollection<OutputT>, Read.Unbounded<OutputT>> appliedPTransform) {
        return new UnboundedReadEvaluator(appliedPTransform, this.evaluationContext, this.options, this.readerReuseChance);
    }

    @Override // org.apache.beam.runners.direct.TransformEvaluatorFactory
    public void cleanup() {
    }
}
