/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.utils.NoOpCommitter;
import org.apache.flink.connector.file.sink.utils.NoOpRecoverable;
import org.apache.flink.connector.file.sink.utils.NoOpRecoverableFsDataOutputStream;
import org.apache.flink.connector.file.sink.utils.NoOpRecoverableWriter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucket;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.connector.file.table.FileSystemTableSink;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.fs.local.LocalFileSystem;
import org.apache.flink.core.fs.local.LocalRecoverableWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.PartFileInfo;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.util.Preconditions;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.Fail;
import org.assertj.core.api.ObjectAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

class FileWriterBucketTest {
    private static final String BUCKET_ID = "testing-bucket";
    private static final Encoder<String> ENCODER = new SimpleStringEncoder();
    private static final Encoder<RowData> rowDataENCODER = new SimpleStringEncoder();
    private static final RollingPolicy<String, String> DEFAULT_ROLLING_POLICY = DefaultRollingPolicy.builder().build();
    private static final RollingPolicy<String, String> ON_CHECKPOING_ROLLING_POLICY = OnCheckpointRollingPolicy.build();
    private static final EachElementRollingPolicy EACH_ELEMENT_ROLLING_POLICY = new EachElementRollingPolicy();

    FileWriterBucketTest() {
    }

    @Test
    void testOnCheckpointNoPendingRecoverable(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<String> bucket = FileWriterBucketTest.createBucket((RecoverableWriter)recoverableWriter, path, DEFAULT_ROLLING_POLICY, OutputFileConfig.builder().build());
        bucket.write((Object)"test-element", 0L);
        List fileSinkCommittables = bucket.prepareCommit(false);
        FileWriterBucketState bucketState = bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 0, 0);
        Assertions.assertThat((String)bucketState.getBucketId()).isEqualTo(BUCKET_ID);
        Assertions.assertThat((Object)bucketState.getBucketPath()).isEqualTo((Object)path);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should have in-progress recoverable", new Object[0])).isNotNull();
    }

    @Test
    void testOnCheckpointRollingOnCheckpoint(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<String> bucket = FileWriterBucketTest.createBucket((RecoverableWriter)recoverableWriter, path, ON_CHECKPOING_ROLLING_POLICY, OutputFileConfig.builder().build());
        bucket.write((Object)"test-element", 0L);
        List fileSinkCommittables = bucket.prepareCommit(false);
        FileWriterBucketState bucketState = bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 1, 0);
        Assertions.assertThat((String)bucketState.getBucketId()).isEqualTo(BUCKET_ID);
        Assertions.assertThat((Object)bucketState.getBucketPath()).isEqualTo((Object)path);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should not have in-progress recoverable", new Object[0])).isNull();
    }

    @Test
    void testOnCheckpointMultiplePendingFiles(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<String> bucket = FileWriterBucketTest.createBucket((RecoverableWriter)recoverableWriter, path, EACH_ELEMENT_ROLLING_POLICY, OutputFileConfig.builder().build());
        bucket.write((Object)"test-element", 0L);
        bucket.write((Object)"test-element", 0L);
        bucket.write((Object)"test-element", 0L);
        List fileSinkCommittables = bucket.prepareCommit(false);
        FileWriterBucketState bucketState = bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 2, 0);
        Assertions.assertThat((String)bucketState.getBucketId()).isEqualTo(BUCKET_ID);
        Assertions.assertThat((Object)bucketState.getBucketPath()).isEqualTo((Object)path);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should not have in-progress recoverable", new Object[0])).isNotNull();
    }

    @Test
    void testOnCheckpointWithInProgressFileToCleanup(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<String> bucket = FileWriterBucketTest.createBucket((RecoverableWriter)recoverableWriter, path, DEFAULT_ROLLING_POLICY, OutputFileConfig.builder().build());
        bucket.write((Object)"test-element", 0L);
        bucket.prepareCommit(false);
        bucket.snapshotState();
        bucket.write((Object)"test-element", 0L);
        List fileSinkCommittables = bucket.prepareCommit(false);
        FileWriterBucketState bucketState = bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 0, 1);
        Assertions.assertThat((String)bucketState.getBucketId()).isEqualTo(BUCKET_ID);
        Assertions.assertThat((Object)bucketState.getBucketPath()).isEqualTo((Object)path);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should not have in-progress recoverable", new Object[0])).isNotNull();
    }

    @Test
    void testFlush(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<String> bucket = FileWriterBucketTest.createBucket((RecoverableWriter)recoverableWriter, path, DEFAULT_ROLLING_POLICY, OutputFileConfig.builder().build());
        bucket.write((Object)"test-element", 0L);
        List fileSinkCommittables = bucket.prepareCommit(true);
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 1, 0);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should not have in-progress part after flushed", new Object[0])).isNull();
    }

    @Test
    void testRollingOnProcessingTime(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        DefaultRollingPolicy onProcessingTimeRollingPolicy = DefaultRollingPolicy.builder().withRolloverInterval(Duration.ofMillis(10L)).build();
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<String> bucket = FileWriterBucketTest.createBucket((RecoverableWriter)recoverableWriter, path, (RollingPolicy<String, String>)onProcessingTimeRollingPolicy, OutputFileConfig.builder().build());
        bucket.write((Object)"test-element", 11L);
        bucket.write((Object)"test-element", 12L);
        bucket.onProcessingTime(20L);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should not roll since interval is not reached", new Object[0])).isNotNull();
        bucket.write((Object)"test-element", 21L);
        bucket.onProcessingTime(21L);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should roll since interval is reached", new Object[0])).isNull();
        List fileSinkCommittables = bucket.prepareCommit(false);
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 1, 0);
    }

    @Test
    void testTableRollingOnProcessingTime(@TempDir java.nio.file.Path tmpDir) throws IOException {
        File outDir = tmpDir.toFile();
        Path path = new Path(outDir.toURI());
        FileSystemTableSink.TableRollingPolicy tableRollingPolicy = new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, 20L, 10L);
        TestRecoverableWriter recoverableWriter = FileWriterBucketTest.getRecoverableWriter(path);
        FileWriterBucket<RowData> bucket = FileWriterBucketTest.createRowDataBucket((RecoverableWriter)recoverableWriter, path, (RollingPolicy<RowData, String>)tableRollingPolicy, OutputFileConfig.builder().build());
        bucket.write((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"test-element")}), 11L);
        bucket.write((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"test-element")}), 12L);
        bucket.onProcessingTime(21L);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should not roll since interval and inactivity not reached", new Object[0])).isNotNull();
        bucket.onProcessingTime(22L);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should roll since inactivity is reached", new Object[0])).isNull();
        bucket.write((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"test-element")}), 11L);
        bucket.write((Object)GenericRowData.of((Object[])new Object[]{StringData.fromString((String)"test-element")}), 21L);
        bucket.onProcessingTime(30L);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should not roll since interval and inactivity not reached", new Object[0])).isNotNull();
        bucket.onProcessingTime(31L);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The bucket should roll since interval is reached", new Object[0])).isNull();
        List fileSinkCommittables = bucket.prepareCommit(false);
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 2, 0);
    }

    @Test
    void testRestoreWithInprogressFileNotSupportResume() throws IOException {
        StubNonResumableWriter nonResumableWriter = new StubNonResumableWriter();
        FileWriterBucket<String> bucket = this.getRestoredBucketWithOnlyInProgressPart(nonResumableWriter, DEFAULT_ROLLING_POLICY);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The in-progress file should be pre-committed", new Object[0])).isNull();
        List fileSinkCommittables = bucket.prepareCommit(false);
        FileWriterBucketState bucketState = bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 1, 0);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should not have in-progress recoverable", new Object[0])).isNull();
    }

    @Test
    void testRestoreWithInprogressFileSupportResume() throws IOException {
        StubResumableWriter resumableWriter = new StubResumableWriter();
        FileWriterBucket<String> bucket = this.getRestoredBucketWithOnlyInProgressPart(resumableWriter, DEFAULT_ROLLING_POLICY);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("The in-progress file should be recovered", new Object[0])).isNotNull();
        List fileSinkCommittables = bucket.prepareCommit(false);
        FileWriterBucketState bucketState = bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 0, 0);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should have in-progress recoverable", new Object[0])).isNotNull();
    }

    @Test
    void testRestoringWithOnlyPendingFiles() throws IOException {
        int noOfPendingFileCheckpoints = 4;
        StubResumableWriter resumableWriter = new StubResumableWriter();
        FileWriterBucket<String> bucket = this.getRestoredBucketWithOnlyPendingFiles(resumableWriter, DEFAULT_ROLLING_POLICY, 4);
        ((ObjectAssert)Assertions.assertThat((Object)bucket.getInProgressPart()).as("There should be no in-progress file", new Object[0])).isNull();
        Assertions.assertThat((List)bucket.getPendingFiles()).hasSize(4);
        List fileSinkCommittables = bucket.prepareCommit(false);
        bucket.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 4, 0);
    }

    @Test
    void testMergeWithInprogressFileNotSupportResume() throws IOException {
        FileWriterBucket<String> bucket1 = this.getRestoredBucketWithOnlyInProgressPart(new StubNonResumableWriter(), DEFAULT_ROLLING_POLICY);
        FileWriterBucket<String> bucket2 = this.getRestoredBucketWithOnlyInProgressPart(new StubNonResumableWriter(), DEFAULT_ROLLING_POLICY);
        bucket1.merge(bucket2);
        ((ObjectAssert)Assertions.assertThat((Object)bucket1.getInProgressPart()).as("The in-progress file should be pre-committed", new Object[0])).isNull();
        List fileSinkCommittables = bucket1.prepareCommit(false);
        FileWriterBucketState bucketState = bucket1.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 2, 0);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should have in-progress recoverable", new Object[0])).isNull();
    }

    @Test
    void testMergeWithInprogressFileSupportResume() throws IOException {
        FileWriterBucket<String> bucket1 = this.getRestoredBucketWithOnlyInProgressPart(new StubResumableWriter(), DEFAULT_ROLLING_POLICY);
        FileWriterBucket<String> bucket2 = this.getRestoredBucketWithOnlyInProgressPart(new StubResumableWriter(), DEFAULT_ROLLING_POLICY);
        bucket1.merge(bucket2);
        ((ObjectAssert)Assertions.assertThat((Object)bucket1.getInProgressPart()).as("The in-progress file should be recovered", new Object[0])).isNotNull();
        List fileSinkCommittables = bucket1.prepareCommit(false);
        FileWriterBucketState bucketState = bucket1.snapshotState();
        this.compareNumberOfPendingAndInProgress(fileSinkCommittables, 1, 0);
        ((ObjectAssert)Assertions.assertThat((Object)bucketState.getInProgressFileRecoverable()).as("The bucket should not have in-progress recoverable", new Object[0])).isNotNull();
    }

    private static FileWriterBucket<String> createBucket(RecoverableWriter writer, Path bucketPath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) {
        return FileWriterBucket.getNew((String)BUCKET_ID, (Path)bucketPath, (BucketWriter)new RowWiseBucketWriter(writer, ENCODER), rollingPolicy, (OutputFileConfig)outputFileConfig);
    }

    private static FileWriterBucket<RowData> createRowDataBucket(RecoverableWriter writer, Path bucketPath, RollingPolicy<RowData, String> rollingPolicy, OutputFileConfig outputFileConfig) {
        return FileWriterBucket.getNew((String)BUCKET_ID, (Path)bucketPath, (BucketWriter)new RowWiseBucketWriter(writer, rowDataENCODER), rollingPolicy, (OutputFileConfig)outputFileConfig);
    }

    private static TestRecoverableWriter getRecoverableWriter(Path path) {
        try {
            FileSystem fs = FileSystem.get((URI)path.toUri());
            if (!(fs instanceof LocalFileSystem)) {
                Fail.fail((String)("Expected Local FS but got a " + fs.getClass().getName() + " for path: " + path));
            }
            return new TestRecoverableWriter((LocalFileSystem)fs);
        }
        catch (IOException e) {
            Fail.fail((String)"Test failed");
            return null;
        }
    }

    private void compareNumberOfPendingAndInProgress(List<FileSinkCommittable> fileSinkCommittables, int expectedPendingFiles, int expectedInProgressFiles) {
        int numPendingFiles = 0;
        int numInProgressFiles = 0;
        for (FileSinkCommittable committable : fileSinkCommittables) {
            if (committable.getPendingFile() != null) {
                ++numPendingFiles;
            }
            if (committable.getInProgressFileToCleanup() == null) continue;
            ++numInProgressFiles;
        }
        Assertions.assertThat((int)numPendingFiles).isEqualTo(expectedPendingFiles);
        Assertions.assertThat((int)numInProgressFiles).isEqualTo(expectedInProgressFiles);
    }

    private FileWriterBucket<String> getRestoredBucketWithOnlyInProgressPart(BaseStubWriter writer, RollingPolicy<String, String> rollingPolicy) throws IOException {
        FileWriterBucketState stateWithOnlyInProgressFile = new FileWriterBucketState("test", new Path("file:///fake/fakefile"), 12345L, (InProgressFileWriter.InProgressFileRecoverable)new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable((RecoverableWriter.ResumeRecoverable)new NoOpRecoverable()));
        return FileWriterBucket.restore((BucketWriter)new RowWiseBucketWriter((RecoverableWriter)writer, ENCODER), rollingPolicy, (FileWriterBucketState)stateWithOnlyInProgressFile, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private FileWriterBucket<String> getRestoredBucketWithOnlyPendingFiles(BaseStubWriter writer, RollingPolicy<String, String> rollingPolicy, int numberOfPendingParts) throws IOException {
        Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> completePartsPerCheckpoint = this.createPendingPartsPerCheckpoint(numberOfPendingParts);
        FileWriterBucketState state = new FileWriterBucketState("test", new Path("file:///fake/fakefile"), 12345L, null, completePartsPerCheckpoint);
        return FileWriterBucket.restore((BucketWriter)new RowWiseBucketWriter((RecoverableWriter)writer, ENCODER), rollingPolicy, (FileWriterBucketState)state, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private Map<Long, List<InProgressFileWriter.PendingFileRecoverable>> createPendingPartsPerCheckpoint(int noOfCheckpoints) {
        HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingCommittablesPerCheckpoint = new HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>>();
        for (int checkpointId = 0; checkpointId < noOfCheckpoints; ++checkpointId) {
            ArrayList<OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable> pending = new ArrayList<OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable>();
            pending.add(new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable((RecoverableWriter.CommitRecoverable)new NoOpRecoverable()));
            pendingCommittablesPerCheckpoint.put(Long.valueOf(checkpointId), pending);
        }
        return pendingCommittablesPerCheckpoint;
    }

    private static class EachElementRollingPolicy
    implements RollingPolicy<String, String> {
        private EachElementRollingPolicy() {
        }

        public boolean shouldRollOnCheckpoint(PartFileInfo<String> partFileState) throws IOException {
            return false;
        }

        public boolean shouldRollOnEvent(PartFileInfo<String> partFileState, String element) throws IOException {
            return true;
        }

        public boolean shouldRollOnProcessingTime(PartFileInfo<String> partFileState, long currentTime) throws IOException {
            return false;
        }
    }

    private static class BaseStubWriter
    extends NoOpRecoverableWriter {
        private final boolean supportsResume;

        private BaseStubWriter(boolean supportsResume) {
            this.supportsResume = supportsResume;
        }

        @Override
        public RecoverableFsDataOutputStream recover(RecoverableWriter.ResumeRecoverable resumable) throws IOException {
            return new NoOpRecoverableFsDataOutputStream(){

                @Override
                public RecoverableFsDataOutputStream.Committer closeForCommit() throws IOException {
                    return new NoOpCommitter();
                }
            };
        }

        @Override
        public RecoverableFsDataOutputStream.Committer recoverForCommit(RecoverableWriter.CommitRecoverable resumable) throws IOException {
            Preconditions.checkArgument((boolean)(resumable instanceof NoOpRecoverable));
            return new NoOpCommitter();
        }

        @Override
        public boolean supportsResume() {
            return this.supportsResume;
        }
    }

    private static class StubNonResumableWriter
    extends BaseStubWriter {
        StubNonResumableWriter() {
            super(false);
        }
    }

    private static class StubResumableWriter
    extends BaseStubWriter {
        StubResumableWriter() {
            super(true);
        }
    }

    private static class TestRecoverableWriter
    extends LocalRecoverableWriter {
        private int cleanupCallCounter = 0;

        TestRecoverableWriter(LocalFileSystem fs) {
            super(fs);
        }

        int getCleanupCallCounter() {
            return this.cleanupCallCounter;
        }

        public boolean requiresCleanupOfRecoverableState() {
            return true;
        }

        public boolean cleanupRecoverableState(RecoverableWriter.ResumeRecoverable resumable) throws IOException {
            ++this.cleanupCallCounter;
            return false;
        }

        public String toString() {
            return "TestRecoverableWriter has called discardRecoverableState() " + this.cleanupCallCounter + " times.";
        }
    }
}

