/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.fs.hdfs;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.URI;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.HashMap;
import java.util.Random;
import java.util.stream.Stream;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.util.MathUtils;
import org.apache.flink.util.StringUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

public abstract class AbstractHadoopRecoverableWriterITCase {
    private static final Random RND = new Random();
    protected static Path basePath;
    private static FileSystem fileSystem;
    protected Path basePathForTest;
    private static final String testData1 = "THIS IS A TEST 1.";
    private static final String testData2 = "THIS IS A TEST 2.";
    private static final String testData3 = "THIS IS A TEST 3.";
    protected static final String BIG_CHUNK_DATA_PATTERN = "THIS IS A TEST 1.";
    protected static String bigDataChunk;
    protected static boolean skipped;
    @TempDir
    protected static File tempFolder;
    private static final String INIT_EMPTY_PERSIST = "EMPTY";
    private static final String INTERM_WITH_STATE_PERSIST = "INTERM-STATE";
    private static final String INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST = "INTERM-IMEDIATE";
    private static final String FINAL_WITH_EXTRA_STATE = "FINAL";

    @AfterAll
    static void cleanUp() throws Exception {
        if (!skipped) {
            AbstractHadoopRecoverableWriterITCase.getFileSystem().delete(basePath, true);
        }
        FileSystem.initialize((Configuration)new Configuration());
    }

    @BeforeEach
    void prepare() throws Exception {
        this.basePathForTest = new Path(basePath, StringUtils.getRandomString((Random)RND, (int)16, (int)16, (char)'a', (char)'z'));
        this.cleanupLocalDir();
    }

    protected abstract String getLocalTmpDir() throws Exception;

    protected abstract String getIncompleteObjectName(RecoverableWriter.ResumeRecoverable var1);

    private void cleanupLocalDir() throws Exception {
        String defaultTmpDir = this.getLocalTmpDir();
        java.nio.file.Path defaultTmpPath = Paths.get(defaultTmpDir, new String[0]);
        if (Files.exists(defaultTmpPath, new LinkOption[0])) {
            try (Stream<java.nio.file.Path> files = Files.list(defaultTmpPath);){
                files.forEach(p -> {
                    try {
                        Files.delete(p);
                    }
                    catch (IOException e) {
                        e.printStackTrace();
                    }
                });
            }
        } else {
            Files.createDirectory(defaultTmpPath, new FileAttribute[0]);
        }
    }

    @AfterEach
    void cleanupAndCheckTmpCleanup() throws Exception {
        String defaultTmpDir = this.getLocalTmpDir();
        java.nio.file.Path localTmpDir = Paths.get(defaultTmpDir, new String[0]);
        Assertions.assertThat((boolean)Files.exists(localTmpDir, new LinkOption[0])).isTrue();
        try (Stream<java.nio.file.Path> files = Files.list(localTmpDir);){
            Assertions.assertThat(files).isEmpty();
        }
        Files.delete(localTmpDir);
        AbstractHadoopRecoverableWriterITCase.getFileSystem().delete(this.basePathForTest, true);
    }

    protected static FileSystem getFileSystem() throws Exception {
        if (fileSystem == null) {
            fileSystem = FileSystem.get((URI)basePath.toUri());
        }
        return fileSystem;
    }

    @Test
    void testCloseWithNoData() throws Exception {
        RecoverableWriter writer = this.getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream stream = writer.open(path);
        stream.closeForCommit().commit();
    }

    @Test
    void testCommitAfterNormalClose() throws Exception {
        RecoverableWriter writer = this.getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream stream = writer.open(path);
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf("THIS IS A TEST 1."));
        stream.closeForCommit().commit();
        Assertions.assertThat((String)this.getContentsOfFile(path)).isEqualTo("THIS IS A TEST 1.");
    }

    @Test
    void testCommitAfterPersist() throws Exception {
        RecoverableWriter writer = this.getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream stream = writer.open(path);
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf("THIS IS A TEST 1."));
        stream.persist();
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf(testData2));
        stream.closeForCommit().commit();
        Assertions.assertThat((String)this.getContentsOfFile(path)).isEqualTo("THIS IS A TEST 1.THIS IS A TEST 2.");
    }

    @Test
    void testCleanupRecoverableState() throws Exception {
        RecoverableWriter writer = this.getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream stream = writer.open(path);
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf("THIS IS A TEST 1."));
        RecoverableWriter.ResumeRecoverable recoverable = stream.persist();
        stream.closeForCommit().commit();
        String content = this.getContentsOfFile(new Path("/" + this.getIncompleteObjectName(recoverable)));
        Assertions.assertThat((String)content).isEqualTo("THIS IS A TEST 1.");
        boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable);
        Assertions.assertThat((boolean)successfullyDeletedState).isTrue();
        Assertions.assertThatThrownBy(() -> {
            long delayMs = 1000L;
            for (int retryTimes = 10; retryTimes > 0; --retryTimes) {
                this.getContentsOfFile(new Path("/" + this.getIncompleteObjectName(recoverable)));
                Thread.sleep(1000L);
            }
        }).isInstanceOf(FileNotFoundException.class);
    }

    @Test
    void testCallingDeleteObjectTwiceDoesNotThroughException() throws Exception {
        RecoverableWriter writer = this.getRecoverableWriter();
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableFsDataOutputStream stream = writer.open(path);
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf("THIS IS A TEST 1."));
        RecoverableWriter.ResumeRecoverable recoverable = stream.persist();
        stream.closeForCommit().commit();
        String content = this.getContentsOfFile(new Path("/" + this.getIncompleteObjectName(recoverable)));
        Assertions.assertThat((String)content).isEqualTo("THIS IS A TEST 1.");
        boolean successfullyDeletedState = writer.cleanupRecoverableState(recoverable);
        Assertions.assertThat((boolean)successfullyDeletedState).isTrue();
        boolean unsuccessfulDeletion = writer.cleanupRecoverableState(recoverable);
        Assertions.assertThat((boolean)unsuccessfulDeletion).isFalse();
    }

    @Test
    void testCommitAfterRecovery() throws Exception {
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableWriter initWriter = this.getRecoverableWriter();
        RecoverableFsDataOutputStream stream = initWriter.open(path);
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf("THIS IS A TEST 1."));
        stream.persist();
        stream.persist();
        stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf(testData2));
        RecoverableWriter.CommitRecoverable recoverable = stream.closeForCommit().getRecoverable();
        byte[] serializedRecoverable = initWriter.getCommitRecoverableSerializer().serialize((Object)recoverable);
        RecoverableWriter newWriter = this.getRecoverableWriter();
        SimpleVersionedSerializer deserializer = newWriter.getCommitRecoverableSerializer();
        RecoverableWriter.CommitRecoverable recoveredRecoverable = (RecoverableWriter.CommitRecoverable)deserializer.deserialize(deserializer.getVersion(), serializedRecoverable);
        RecoverableFsDataOutputStream.Committer committer = newWriter.recoverForCommit(recoveredRecoverable);
        committer.commitAfterRecovery();
        Assertions.assertThat((String)this.getContentsOfFile(path)).isEqualTo("THIS IS A TEST 1.THIS IS A TEST 2.");
    }

    @Test
    void testRecoverWithEmptyState() throws Exception {
        this.testResumeAfterMultiplePersistWithSmallData(INIT_EMPTY_PERSIST, testData3);
    }

    @Test
    void testRecoverWithState() throws Exception {
        this.testResumeAfterMultiplePersistWithSmallData(INTERM_WITH_STATE_PERSIST, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    void testRecoverFromIntermWithoutAdditionalState() throws Exception {
        this.testResumeAfterMultiplePersistWithSmallData(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, "THIS IS A TEST 1.THIS IS A TEST 3.");
    }

    @Test
    void testRecoverAfterMultiplePersistsState() throws Exception {
        this.testResumeAfterMultiplePersistWithSmallData(FINAL_WITH_EXTRA_STATE, "THIS IS A TEST 1.THIS IS A TEST 2.THIS IS A TEST 3.");
    }

    @Test
    void testRecoverWithStateWithMultiPart() throws Exception {
        this.testResumeAfterMultiplePersistWithMultiPartUploads(INTERM_WITH_STATE_PERSIST, bigDataChunk + bigDataChunk);
    }

    @Test
    void testRecoverFromIntermWithoutAdditionalStateWithMultiPart() throws Exception {
        this.testResumeAfterMultiplePersistWithMultiPartUploads(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, bigDataChunk + bigDataChunk);
    }

    @Test
    void testRecoverAfterMultiplePersistsStateWithMultiPart() throws Exception {
        this.testResumeAfterMultiplePersistWithMultiPartUploads(FINAL_WITH_EXTRA_STATE, bigDataChunk + bigDataChunk + bigDataChunk);
    }

    private void testResumeAfterMultiplePersistWithSmallData(String persistName, String expectedFinalContents) throws Exception {
        this.testResumeAfterMultiplePersist(persistName, expectedFinalContents, "THIS IS A TEST 1.", testData2, testData3);
    }

    private void testResumeAfterMultiplePersistWithMultiPartUploads(String persistName, String expectedFinalContents) throws Exception {
        this.testResumeAfterMultiplePersist(persistName, expectedFinalContents, bigDataChunk, bigDataChunk, bigDataChunk);
    }

    private void testResumeAfterMultiplePersist(String persistName, String expectedFinalContents, String firstItemToWrite, String secondItemToWrite, String thirdItemToWrite) throws Exception {
        Path path = new Path(this.basePathForTest, "part-0");
        RecoverableWriter initWriter = this.getRecoverableWriter();
        HashMap<String, RecoverableWriter.ResumeRecoverable> recoverables = new HashMap<String, RecoverableWriter.ResumeRecoverable>(4);
        try (RecoverableFsDataOutputStream stream = initWriter.open(path);){
            recoverables.put(INIT_EMPTY_PERSIST, stream.persist());
            stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf(firstItemToWrite));
            recoverables.put(INTERM_WITH_STATE_PERSIST, stream.persist());
            recoverables.put(INTERM_WITH_NO_ADDITIONAL_STATE_PERSIST, stream.persist());
            stream.write(AbstractHadoopRecoverableWriterITCase.bytesOf(secondItemToWrite));
            recoverables.put(FINAL_WITH_EXTRA_STATE, stream.persist());
        }
        SimpleVersionedSerializer serializer = initWriter.getResumeRecoverableSerializer();
        byte[] serializedRecoverable = serializer.serialize((Object)((RecoverableWriter.ResumeRecoverable)recoverables.get(persistName)));
        RecoverableWriter newWriter = this.getRecoverableWriter();
        SimpleVersionedSerializer deserializer = newWriter.getResumeRecoverableSerializer();
        RecoverableWriter.ResumeRecoverable recoveredRecoverable = (RecoverableWriter.ResumeRecoverable)deserializer.deserialize(serializer.getVersion(), serializedRecoverable);
        RecoverableFsDataOutputStream recoveredStream = newWriter.recover(recoveredRecoverable);
        recoveredStream.write(AbstractHadoopRecoverableWriterITCase.bytesOf(thirdItemToWrite));
        recoveredStream.closeForCommit().commit();
        Assertions.assertThat((String)this.getContentsOfFile(path)).isEqualTo(expectedFinalContents);
    }

    protected String getContentsOfFile(Path path) throws Exception {
        StringBuilder builder = new StringBuilder();
        try (FSDataInputStream inStream = AbstractHadoopRecoverableWriterITCase.getFileSystem().open(path);
             BufferedReader reader = new BufferedReader(new InputStreamReader((InputStream)inStream));){
            String line;
            while ((line = reader.readLine()) != null) {
                builder.append(line);
            }
        }
        return builder.toString();
    }

    protected static String createBigDataChunk(String pattern, long size) {
        StringBuilder stringBuilder = new StringBuilder();
        int sampleLength = AbstractHadoopRecoverableWriterITCase.bytesOf(pattern).length;
        int repeats = MathUtils.checkedDownCast((long)size) / sampleLength + 100;
        for (int i = 0; i < repeats; ++i) {
            stringBuilder.append(pattern);
        }
        return stringBuilder.toString();
    }

    protected static byte[] bytesOf(String str) {
        return str.getBytes(StandardCharsets.UTF_8);
    }

    protected RecoverableWriter getRecoverableWriter() throws Exception {
        return AbstractHadoopRecoverableWriterITCase.getFileSystem().createRecoverableWriter();
    }

    static {
        skipped = true;
    }
}

