package org.apache.flink.runtime.jobmaster.event;

import java.io.File;
import java.io.IOException;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

/* loaded from: input_file:org/apache/flink/runtime/jobmaster/event/FileSystemJobEventStoreTest.class */
class FileSystemJobEventStoreTest {

    @TempDir
    private Path temporaryFolder;

    FileSystemJobEventStoreTest() {
    }

    @Test
    void testReadAndWriteEvent() throws Exception {
        FileSystemJobEventStore fileSystemJobEventStore = new FileSystemJobEventStore(new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath()), new Configuration());
        fileSystemJobEventStore.registerJobEventSerializer(TestingJobEvent.TYPE_ID, new GenericJobEventSerializer());
        fileSystemJobEventStore.start();
        TestingJobEvent testingJobEvent = new TestingJobEvent(0);
        TestingJobEvent testingJobEvent2 = new TestingJobEvent(1);
        TestingJobEvent testingJobEvent3 = new TestingJobEvent(2);
        TestingJobEvent testingJobEvent4 = new TestingJobEvent(3);
        TestingJobEvent testingJobEvent5 = new TestingJobEvent(4);
        fileSystemJobEventStore.writeEvent(testingJobEvent);
        fileSystemJobEventStore.writeEvent(testingJobEvent2);
        fileSystemJobEventStore.writeEvent(testingJobEvent3);
        fileSystemJobEventStore.writeEvent(testingJobEvent4);
        fileSystemJobEventStore.writeEvent(testingJobEvent5);
        fileSystemJobEventStore.stop(false);
        fileSystemJobEventStore.start();
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent2);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent3);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent4);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent5);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isNull();
        fileSystemJobEventStore.stop(false);
        fileSystemJobEventStore.start();
        fileSystemJobEventStore.writeEvent(testingJobEvent);
        fileSystemJobEventStore.writeEvent(testingJobEvent2);
        fileSystemJobEventStore.stop(false);
        fileSystemJobEventStore.start();
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent2);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent3);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent4);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent5);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isEqualTo(testingJobEvent2);
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isNull();
        fileSystemJobEventStore.stop(true);
        fileSystemJobEventStore.start();
        Assertions.assertThat(fileSystemJobEventStore.readEvent()).isNull();
        fileSystemJobEventStore.stop(true);
    }

    @Test
    void testMultiThreadWriteEvent() throws Exception {
        FileSystemJobEventStore fileSystemJobEventStore = new FileSystemJobEventStore(new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath()), new Configuration());
        fileSystemJobEventStore.registerJobEventSerializer(TestingJobEvent.TYPE_ID, new GenericJobEventSerializer());
        ScheduledExecutorService newSingleThreadScheduledExecutor = Executors.newSingleThreadScheduledExecutor();
        fileSystemJobEventStore.start();
        TestingJobEvent testingJobEvent = new TestingJobEvent(0);
        TestingJobEvent testingJobEvent2 = new TestingJobEvent(1);
        TestingJobEvent testingJobEvent3 = new TestingJobEvent(2);
        TestingJobEvent testingJobEvent4 = new TestingJobEvent(3);
        TestingJobEvent testingJobEvent5 = new TestingJobEvent(4);
        TestingJobEvent testingJobEvent6 = new TestingJobEvent(5);
        TestingJobEvent testingJobEvent7 = new TestingJobEvent(6);
        TestingJobEvent testingJobEvent8 = new TestingJobEvent(7);
        TestingJobEvent testingJobEvent9 = new TestingJobEvent(8);
        TestingJobEvent testingJobEvent10 = new TestingJobEvent(9);
        newSingleThreadScheduledExecutor.schedule(() -> {
            fileSystemJobEventStore.writeEvent(testingJobEvent6);
            fileSystemJobEventStore.writeEvent(testingJobEvent7);
            fileSystemJobEventStore.writeEvent(testingJobEvent8);
            fileSystemJobEventStore.writeEvent(testingJobEvent9);
            fileSystemJobEventStore.writeEvent(testingJobEvent10);
        }, 0L, TimeUnit.MILLISECONDS);
        fileSystemJobEventStore.writeEvent(testingJobEvent);
        fileSystemJobEventStore.writeEvent(testingJobEvent2);
        fileSystemJobEventStore.writeEvent(testingJobEvent3);
        fileSystemJobEventStore.writeEvent(testingJobEvent4);
        fileSystemJobEventStore.writeEvent(testingJobEvent5);
        newSingleThreadScheduledExecutor.shutdown();
        newSingleThreadScheduledExecutor.awaitTermination(10000L, TimeUnit.MILLISECONDS);
        fileSystemJobEventStore.stop(false);
        fileSystemJobEventStore.start();
        ArrayList arrayList = new ArrayList();
        while (true) {
            JobEvent readEvent = fileSystemJobEventStore.readEvent();
            if (readEvent == null) {
                Assertions.assertThat(arrayList).hasSize(10);
                Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new JobEvent[]{testingJobEvent, testingJobEvent2, testingJobEvent3, testingJobEvent4, testingJobEvent5, testingJobEvent6, testingJobEvent7, testingJobEvent8, testingJobEvent9, testingJobEvent10});
                fileSystemJobEventStore.stop(true);
                return;
            }
            arrayList.add(readEvent);
        }
    }

    @Test
    void testStopWithClear() throws IOException {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore fileSystemJobEventStore = new FileSystemJobEventStore(path, new Configuration());
        fileSystemJobEventStore.registerJobEventSerializer(TestingJobEvent.TYPE_ID, new GenericJobEventSerializer());
        Assertions.assertThat(new File(path.getPath()).listFiles().length).isZero();
        fileSystemJobEventStore.start();
        fileSystemJobEventStore.writeEvent(new TestingJobEvent(0));
        fileSystemJobEventStore.stop(false);
        Assertions.assertThat(new File(path.getPath()).listFiles().length).isEqualTo(1);
        fileSystemJobEventStore.start();
        fileSystemJobEventStore.stop(true);
        Assertions.assertThat(new File(path.getPath()).exists()).isFalse();
    }

    @Test
    void testReadUnexpectedLengthEvent() throws Exception {
        FileSystemJobEventStore fileSystemJobEventStore = new FileSystemJobEventStore(new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath()), new Configuration());
        fileSystemJobEventStore.registerJobEventSerializer(TestingJobEvent.TYPE_ID, new GenericJobEventSerializer());
        fileSystemJobEventStore.start();
        TestingJobEvent testingJobEvent = new TestingJobEvent(0);
        TestingJobEvent testingJobEvent2 = new TestingJobEvent(1);
        TestingJobEvent testingJobEvent3 = new TestingJobEvent(2);
        TestingJobEvent testingJobEvent4 = new TestingJobEvent(3);
        TestingJobEvent testingJobEvent5 = new TestingJobEvent(4);
        fileSystemJobEventStore.writeEvent(testingJobEvent);
        fileSystemJobEventStore.writeEvent(testingJobEvent2);
        fileSystemJobEventStore.writeEvent(testingJobEvent3);
        fileSystemJobEventStore.writeEvent(testingJobEvent4);
        fileSystemJobEventStore.writeEvent(testingJobEvent5);
        fileSystemJobEventStore.getEventWriterExecutor().submit(() -> {
            try {
                fileSystemJobEventStore.getOutputStream().writeInt(TestingJobEvent.TYPE_ID);
                fileSystemJobEventStore.getOutputStream().writeInt(5000);
                fileSystemJobEventStore.writeEvent(new TestingJobEvent(100));
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }).get();
        fileSystemJobEventStore.stop(false);
        fileSystemJobEventStore.start();
        TestingJobEvent testingJobEvent6 = new TestingJobEvent(5);
        TestingJobEvent testingJobEvent7 = new TestingJobEvent(6);
        TestingJobEvent testingJobEvent8 = new TestingJobEvent(7);
        TestingJobEvent testingJobEvent9 = new TestingJobEvent(8);
        TestingJobEvent testingJobEvent10 = new TestingJobEvent(9);
        fileSystemJobEventStore.writeEvent(testingJobEvent6);
        fileSystemJobEventStore.writeEvent(testingJobEvent7);
        fileSystemJobEventStore.writeEvent(testingJobEvent8);
        fileSystemJobEventStore.writeEvent(testingJobEvent9);
        fileSystemJobEventStore.writeEvent(testingJobEvent10);
        fileSystemJobEventStore.stop(false);
        fileSystemJobEventStore.start();
        ArrayList arrayList = new ArrayList();
        while (true) {
            JobEvent readEvent = fileSystemJobEventStore.readEvent();
            if (readEvent == null) {
                Assertions.assertThat(arrayList).hasSize(10);
                Assertions.assertThat(arrayList).containsExactlyInAnyOrder(new JobEvent[]{testingJobEvent, testingJobEvent2, testingJobEvent3, testingJobEvent4, testingJobEvent5, testingJobEvent6, testingJobEvent7, testingJobEvent8, testingJobEvent9, testingJobEvent10});
                fileSystemJobEventStore.stop(true);
                return;
            }
            arrayList.add(readEvent);
        }
    }

    @Test
    void testGenerateWorkingDirCorrectly() throws IOException {
        Configuration configuration = new Configuration();
        configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///tmp/flink");
        configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "cluster_id");
        JobID jobID = new JobID();
        Assertions.assertThat(new FileSystemJobEventStore(jobID, configuration).getWorkingDir().getPath()).isEqualTo(String.format("/tmp/flink/cluster_id/%s/job-events", jobID));
    }

    @Test
    void testCutBlock() throws Exception {
        org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(TempDirUtils.newFolder(this.temporaryFolder).getAbsolutePath());
        FileSystemJobEventStore fileSystemJobEventStore = new FileSystemJobEventStore(path, new Configuration());
        fileSystemJobEventStore.registerJobEventSerializer(TestingJobEvent.TYPE_ID, new GenericJobEventSerializer());
        Assertions.assertThat(new File(path.getPath()).listFiles().length).isZero();
        fileSystemJobEventStore.start();
        fileSystemJobEventStore.writeEvent(new TestingJobEvent(0));
        tryFlushOutputStream(fileSystemJobEventStore);
        Assertions.assertThat(new File(path.getPath()).listFiles().length).isEqualTo(1);
        fileSystemJobEventStore.writeEvent(new TestingJobEvent(1), true);
        Assertions.assertThat(new File(path.getPath()).listFiles().length).isEqualTo(1);
        fileSystemJobEventStore.writeEvent(new TestingJobEvent(2));
        tryFlushOutputStream(fileSystemJobEventStore);
        Assertions.assertThat(new File(path.getPath()).listFiles().length).isEqualTo(2);
        fileSystemJobEventStore.stop(true);
    }

    private void tryFlushOutputStream(FileSystemJobEventStore fileSystemJobEventStore) throws Exception {
        runInEventWriterExecutor(fileSystemJobEventStore, () -> {
            try {
                if (fileSystemJobEventStore.getOutputStream() != null) {
                    fileSystemJobEventStore.getOutputStream().flush();
                }
            } catch (IOException e) {
            }
        });
    }

    private void runInEventWriterExecutor(FileSystemJobEventStore fileSystemJobEventStore, Runnable runnable) throws Exception {
        fileSystemJobEventStore.getEventWriterExecutor().submit(runnable).get();
    }
}
