/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.connector.file.sink.committer.FileCommitter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucket;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketStateSerializer;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStateGenerator;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketStatePathResolver;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.FileUtils;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(value=Parameterized.class)
public class FileWriterBucketStateSerializerMigrationTest {
    private static final int CURRENT_VERSION = 2;
    @Parameterized.Parameter
    public Integer previousVersion;
    private static final String IN_PROGRESS_CONTENT = "writing";
    private static final String PENDING_CONTENT = "wrote";
    private static final String BUCKET_ID = "test-bucket";
    private static final java.nio.file.Path BASE_PATH = Paths.get("src/test/resources/", new String[0]).resolve("bucket-state-migration-test");
    @ClassRule
    public static TemporaryFolder tempFolder = new TemporaryFolder();
    private final BucketStateGenerator generator = new BucketStateGenerator("test-bucket", "writing", "wrote", BASE_PATH, 2);

    @Parameterized.Parameters(name="Previous Version = {0}")
    public static Collection<Integer> previousVersions() {
        return Arrays.asList(1, 2);
    }

    @Test
    @Ignore
    public void prepareDeserializationEmpty() throws IOException {
        this.generator.prepareDeserializationEmpty();
    }

    @Test
    public void testSerializationEmpty() throws IOException {
        String scenarioName = "empty";
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion.intValue());
        java.nio.file.Path outputPath = pathResolver.getOutputPath("empty");
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        FileWriterBucketState recoveredState = FileWriterBucketStateSerializerMigrationTest.readBucketState("empty", this.previousVersion);
        FileWriterBucket<String> bucket = FileWriterBucketStateSerializerMigrationTest.restoreBucket(recoveredState);
        Assert.assertEquals((Object)testBucketPath, (Object)bucket.getBucketPath());
        Assert.assertNull((Object)bucket.getInProgressPart());
        Assert.assertTrue((boolean)bucket.getPendingFiles().isEmpty());
    }

    @Test
    @Ignore
    public void prepareDeserializationOnlyInProgress() throws IOException {
        this.generator.prepareDeserializationOnlyInProgress();
    }

    @Test
    public void testSerializationOnlyInProgress() throws IOException {
        String scenarioName = "only-in-progress";
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion.intValue());
        java.nio.file.Path outputPath = pathResolver.getOutputPath("only-in-progress");
        Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
        FileWriterBucketState recoveredState = FileWriterBucketStateSerializerMigrationTest.readBucketState("only-in-progress", this.previousVersion);
        FileWriterBucket<String> bucket = FileWriterBucketStateSerializerMigrationTest.restoreBucket(recoveredState);
        Assert.assertEquals((Object)testBucketPath, (Object)bucket.getBucketPath());
        Assert.assertEquals((long)8L, (long)bucket.getInProgressPart().getSize());
        long numFiles = Files.list(Paths.get(testBucketPath.toString(), new String[0])).map(file -> {
            Assert.assertThat((Object)file.getFileName().toString(), (Matcher)CoreMatchers.startsWith((String)".part-0-0.inprogress"));
            return 1;
        }).count();
        Assert.assertThat((Object)numFiles, (Matcher)CoreMatchers.is((Object)1L));
    }

    @Test
    @Ignore
    public void prepareDeserializationFull() throws IOException {
        this.generator.prepareDeserializationFull();
    }

    @Test
    public void testSerializationFull() throws IOException {
        this.testDeserializationFull(true, "full");
    }

    @Test
    @Ignore
    public void prepareDeserializationNullInProgress() throws IOException {
        this.generator.prepareDeserializationNullInProgress();
    }

    @Test
    public void testSerializationNullInProgress() throws IOException {
        this.testDeserializationFull(false, "full-no-in-progress");
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void testDeserializationFull(boolean withInProgress, String scenarioName) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, this.previousVersion.intValue());
        try {
            java.nio.file.Path outputPath = pathResolver.getOutputPath(scenarioName);
            Path testBucketPath = new Path(outputPath.resolve(BUCKET_ID).toString());
            FileWriterBucketState recoveredState = FileWriterBucketStateSerializerMigrationTest.readBucketStateFromTemplate(scenarioName, this.previousVersion);
            int noOfPendingCheckpoints = 5;
            Map pendingFileRecoverables = recoveredState.getPendingFileRecoverablesPerCheckpoint();
            Assert.assertEquals((long)5L, (long)pendingFileRecoverables.size());
            Set beforeRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = ".part-0-" + i + ".inprogress";
                Assert.assertThat(beforeRestorePaths, (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.startsWith((String)part)));
            }
            FileWriterBucket<String> bucket = FileWriterBucketStateSerializerMigrationTest.restoreBucket(recoveredState);
            Assert.assertEquals((Object)testBucketPath, (Object)bucket.getBucketPath());
            Assert.assertEquals((long)5L, (long)bucket.getPendingFiles().size());
            bucket.snapshotState();
            List committables = bucket.prepareCommit(false);
            FileCommitter committer = new FileCommitter(FileWriterBucketStateSerializerMigrationTest.createBucketWriter());
            committer.commit(committables);
            Set afterRestorePaths = Files.list(outputPath.resolve(BUCKET_ID)).map(file -> file.getFileName().toString()).collect(Collectors.toSet());
            for (int i = 0; i < 5; ++i) {
                String part = "part-0-" + i;
                Assert.assertThat(afterRestorePaths, (Matcher)CoreMatchers.hasItem((Object)part));
                afterRestorePaths.remove(part);
            }
            if (withInProgress) {
                Assert.assertThat(afterRestorePaths, (Matcher)Matchers.iterableWithSize((int)1));
                Assert.assertThat(afterRestorePaths, (Matcher)CoreMatchers.hasItem((Matcher)CoreMatchers.startsWith((String)".part-0-5.inprogress")));
            } else {
                Assert.assertThat(afterRestorePaths, (Matcher)Matchers.empty());
            }
        }
        finally {
            FileUtils.deleteDirectory((File)pathResolver.getResourcePath(scenarioName).toFile());
        }
    }

    private static FileWriterBucket<String> restoreBucket(FileWriterBucketState bucketState) throws IOException {
        return FileWriterBucket.restore(FileWriterBucketStateSerializerMigrationTest.createBucketWriter(), (RollingPolicy)DefaultRollingPolicy.builder().withMaxPartSize(10L).build(), (FileWriterBucketState)bucketState, (OutputFileConfig)OutputFileConfig.builder().build());
    }

    private static RowWiseBucketWriter<String, String> createBucketWriter() throws IOException {
        return new RowWiseBucketWriter(FileSystem.getLocalFileSystem().createRecoverableWriter(), (Encoder)new SimpleStringEncoder());
    }

    private static SimpleVersionedSerializer<FileWriterBucketState> bucketStateSerializer() throws IOException {
        RowWiseBucketWriter<String, String> bucketWriter = FileWriterBucketStateSerializerMigrationTest.createBucketWriter();
        return new FileWriterBucketStateSerializer(bucketWriter.getProperties().getInProgressFileRecoverableSerializer(), bucketWriter.getProperties().getPendingFileRecoverableSerializer());
    }

    private static FileWriterBucketState readBucketState(String scenarioName, int version) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version);
        byte[] bytes = Files.readAllBytes(pathResolver.getSnapshotPath(scenarioName));
        return (FileWriterBucketState)SimpleVersionedSerialization.readVersionAndDeSerialize(FileWriterBucketStateSerializerMigrationTest.bucketStateSerializer(), (byte[])bytes);
    }

    private static FileWriterBucketState readBucketStateFromTemplate(String scenarioName, int version) throws IOException {
        BucketStatePathResolver pathResolver = new BucketStatePathResolver(BASE_PATH, version);
        java.nio.file.Path scenarioPath = pathResolver.getResourcePath(scenarioName);
        FileUtils.deleteDirectory((File)scenarioPath.toFile());
        FileUtils.copy((Path)new Path(scenarioPath.toString() + "-template"), (Path)new Path(scenarioPath.toString()), (boolean)false);
        return FileWriterBucketStateSerializerMigrationTest.readBucketState(scenarioName, version);
    }
}

