/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.io.Closeable;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.CheckpointableInputFormat;
import org.apache.flink.api.common.io.FileInputFormat;
import org.apache.flink.api.common.io.InputFormat;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class ContinuousFileProcessingRescalingTest {
    private final int maxParallelism = 10;
    private final int sizeOfSplit = 20;

    ContinuousFileProcessingRescalingTest() {
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    void testReaderScalingDown() throws Exception {
        HarnessWithFormat[] beforeRescale = new HarnessWithFormat[]{};
        try {
            beforeRescale = this.buildAndStart(5, 15);
            try (HarnessWithFormat afterRescale = this.buildAndStart(1, 0, 5, this.snapshotAndMergeState(beforeRescale), new FileInputSplit[0]);){
                afterRescale.awaitEverythingProcessed();
                for (HarnessWithFormat i : beforeRescale) {
                    i.getHarness().getOutput().clear();
                    i.awaitEverythingProcessed();
                }
                Assertions.assertThat(this.collectOutput(afterRescale)).isEqualTo(this.collectOutput(beforeRescale));
            }
        }
        finally {
            for (HarnessWithFormat harness : beforeRescale) {
                harness.close();
            }
        }
    }

    @Test
    void testReaderScalingUp() throws Exception {
        try (HarnessWithFormat beforeRescale = this.buildAndStart(1, 0, 5, null, this.buildSplits(2));){
            OperatorSubtaskState snapshot = beforeRescale.getHarness().snapshot(0L, 0L);
            try (HarnessWithFormat afterRescale0 = this.buildAndStart(2, 0, 15, AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 0), new FileInputSplit[0]);
                 HarnessWithFormat afterRescale1 = this.buildAndStart(2, 1, 15, AbstractStreamOperatorTestHarness.repartitionOperatorState(snapshot, 10, 1, 2, 1), new FileInputSplit[0]);){
                beforeRescale.getHarness().getOutput().clear();
                for (HarnessWithFormat harness : Arrays.asList(beforeRescale, afterRescale0, afterRescale1)) {
                    harness.awaitEverythingProcessed();
                }
                Assertions.assertThat(this.collectOutput(afterRescale0, afterRescale1)).isEqualTo(this.collectOutput(beforeRescale));
            }
        }
    }

    private HarnessWithFormat[] buildAndStart(int ... elementsBeforeCheckpoint) throws Exception {
        int count = elementsBeforeCheckpoint.length;
        FileInputSplit[] splits = this.buildSplits(count);
        HarnessWithFormat[] res = new HarnessWithFormat[count];
        for (int i = 0; i < count; ++i) {
            res[i] = this.buildAndStart(2, i, elementsBeforeCheckpoint[i], null, splits[i]);
        }
        return res;
    }

    private HarnessWithFormat buildAndStart(int noOfTasks, int taskIdx, int elementsBeforeCheckpoint, @Nullable OperatorSubtaskState initState, FileInputSplit ... splits) throws Exception {
        BlockingFileInputFormat format = new BlockingFileInputFormat(new Path("test"), 20, elementsBeforeCheckpoint);
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = this.getTestHarness(format, noOfTasks, taskIdx);
        harness.setup();
        if (initState != null) {
            harness.initializeState(initState);
        }
        harness.open();
        if (splits != null) {
            for (int i = 0; i < splits.length; ++i) {
                harness.processElement((StreamRecord<TimestampedFileInputSplit>)new StreamRecord((Object)this.getTimestampedSplit(i, splits[i])));
            }
        }
        HarnessWithFormat harnessWithFormat = new HarnessWithFormat(harness, format);
        while (!format.isFirstChunkProcessed()) {
            harnessWithFormat.mailboxProcessor.runMailboxStep();
        }
        return harnessWithFormat;
    }

    private OperatorSubtaskState snapshotAndMergeState(HarnessWithFormat[] hh) throws Exception {
        OperatorSubtaskState[] oss = new OperatorSubtaskState[hh.length];
        for (int i = 0; i < hh.length; ++i) {
            oss[i] = hh[i].getHarness().snapshot(0L, 0L);
        }
        return AbstractStreamOperatorTestHarness.repartitionOperatorState(AbstractStreamOperatorTestHarness.repackageState(oss), 10, hh.length, 1, 0);
    }

    private FileInputSplit[] buildSplits(int n) {
        return new BlockingFileInputFormat(new Path("test"), 20, 5).createInputSplits(n);
    }

    private OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> getTestHarness(BlockingFileInputFormat format, int noOfTasks, int taskIdx) throws Exception {
        OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> testHarness = new OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String>((OneInputStreamOperatorFactory<TimestampedFileInputSplit, String>)new ContinuousFileReaderOperatorFactory((InputFormat)format, TypeExtractor.getInputFormatTypes((InputFormat)format), new ExecutionConfig()), 10, noOfTasks, taskIdx);
        return testHarness;
    }

    private TimestampedFileInputSplit getTimestampedSplit(long modTime, FileInputSplit split) {
        Preconditions.checkNotNull((Object)split);
        return new TimestampedFileInputSplit(modTime, split.getSplitNumber(), split.getPath(), split.getStart(), split.getLength(), split.getHostnames());
    }

    private List<Object> collectOutput(HarnessWithFormat ... in) {
        return Stream.of(in).flatMap(i -> i.getHarness().getOutput().stream()).filter(output -> !(output instanceof Watermark)).collect(Collectors.toList());
    }

    private static final class HarnessWithFormat
    implements Closeable {
        private final OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness;
        private final BlockingFileInputFormat format;
        private final SteppingMailboxProcessor mailboxProcessor;

        HarnessWithFormat(OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness, BlockingFileInputFormat format) {
            this.format = format;
            this.harness = harness;
            this.mailboxProcessor = new SteppingMailboxProcessor(MailboxDefaultAction.Controller::suspendDefaultAction, harness.getTaskMailbox(), StreamTaskActionExecutor.IMMEDIATE);
        }

        public OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> getHarness() {
            return this.harness;
        }

        public BlockingFileInputFormat getFormat() {
            return this.format;
        }

        void awaitEverythingProcessed() throws Exception {
            while (!this.getFormat().isFirstChunkProcessed()) {
                this.mailboxProcessor.runMailboxStep();
            }
            while (!this.getFormat().isLastProcessed()) {
                this.mailboxProcessor.runMailboxStep();
            }
            this.harness.getOperator().finish();
        }

        @Override
        public void close() throws IOException {
            try {
                this.harness.close();
            }
            catch (IOException e) {
                throw e;
            }
            catch (Exception e) {
                throw new IOException(e);
            }
            this.format.close();
            this.mailboxProcessor.close();
        }
    }

    private static class BlockingFileInputFormat
    extends FileInputFormat<String>
    implements CheckpointableInputFormat<FileInputSplit, Integer> {
        private boolean firstChunkTrigger = false;
        private boolean endTrigger = false;
        private final int elementsBeforeCheckpoint;
        private final int linesPerSplit;
        private FileInputSplit split;
        private int state = 0;

        BlockingFileInputFormat(Path filePath, int sizeOfSplit, int elementsBeforeCheckpoint) {
            super(filePath);
            this.elementsBeforeCheckpoint = elementsBeforeCheckpoint;
            this.linesPerSplit = sizeOfSplit;
        }

        public FileInputSplit[] createInputSplits(int minNumSplits) {
            FileInputSplit[] splits = new FileInputSplit[minNumSplits];
            for (int i = 0; i < minNumSplits; ++i) {
                splits[i] = new FileInputSplit(i, this.getFilePaths()[0], (long)(i * this.linesPerSplit + 1), (long)this.linesPerSplit, null);
            }
            return splits;
        }

        public void open(FileInputSplit fileSplit) throws IOException {
            this.split = fileSplit;
            this.state = 0;
        }

        public void reopen(FileInputSplit split, Integer state) {
            this.split = split;
            this.state = state;
        }

        public Integer getCurrentState() {
            return this.state;
        }

        public boolean reachedEnd() {
            if (this.state == this.elementsBeforeCheckpoint) {
                this.firstChunkTrigger = true;
            }
            this.endTrigger = this.state == this.linesPerSplit;
            return this.endTrigger;
        }

        public String nextRecord(String reuse) {
            return this.reachedEnd() ? null : this.split.getSplitNumber() + ": test line " + this.state++;
        }

        public boolean isFirstChunkProcessed() {
            return this.firstChunkTrigger;
        }

        public boolean isLastProcessed() {
            return this.endTrigger;
        }
    }
}

