/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.state;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Random;
import javax.annotation.Nullable;
import org.apache.commons.io.IOUtils;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.runtime.state.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.DuplicatingCheckpointOutputStream;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestMemoryCheckpointOutputStream;
import org.apache.flink.runtime.testutils.CommonTestUtils;
import org.assertj.core.api.AbstractThrowableAssert;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

class DuplicatingCheckpointOutputStreamTest {
    DuplicatingCheckpointOutputStreamTest() {
    }

    @Test
    void testDuplicatedWrite() throws Exception {
        int streamCapacity = 0x100000;
        TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        TestMemoryCheckpointOutputStream secondaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        TestMemoryCheckpointOutputStream referenceStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        DuplicatingCheckpointOutputStream duplicatingStream = new DuplicatingCheckpointOutputStream((CheckpointStateOutputStream)primaryStream, (CheckpointStateOutputStream)secondaryStream, 64);
        Random random = new Random(42L);
        for (int i = 0; i < 500; ++i) {
            int choice = random.nextInt(3);
            if (choice == 0) {
                int val = random.nextInt();
                referenceStream.write(val);
                duplicatingStream.write(val);
            } else {
                byte[] bytes = new byte[random.nextInt(128)];
                random.nextBytes(bytes);
                if (choice == 1) {
                    referenceStream.write(bytes);
                    duplicatingStream.write(bytes);
                } else {
                    int off = bytes.length > 0 ? random.nextInt(bytes.length) : 0;
                    int len = bytes.length > 0 ? random.nextInt(bytes.length - off) : 0;
                    referenceStream.write(bytes, off, len);
                    duplicatingStream.write(bytes, off, len);
                }
            }
            Assertions.assertThat((long)duplicatingStream.getPos()).isEqualTo(referenceStream.getPos());
        }
        StreamStateHandle refStateHandle = referenceStream.closeAndGetHandle();
        StreamStateHandle primaryStateHandle = duplicatingStream.closeAndGetPrimaryHandle();
        StreamStateHandle secondaryStateHandle = duplicatingStream.closeAndGetSecondaryHandle();
        Assertions.assertThat((boolean)CommonTestUtils.isStreamContentEqual((InputStream)refStateHandle.openInputStream(), (InputStream)primaryStateHandle.openInputStream())).isTrue();
        Assertions.assertThat((boolean)CommonTestUtils.isStreamContentEqual((InputStream)refStateHandle.openInputStream(), (InputStream)secondaryStateHandle.openInputStream())).isTrue();
        refStateHandle.discardState();
        primaryStateHandle.discardState();
        secondaryStateHandle.discardState();
    }

    @Test
    void testSecondaryWriteFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> {
            for (int i = 0; i < 128; ++i) {
                duplicatingStream.write(42);
            }
        });
    }

    @Test
    void testFailingSecondaryWriteArrayFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512]));
    }

    @Test
    void testFailingSecondaryWriteArrayOffsFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512], 20, 130));
    }

    @Test
    void testFailingSecondaryFlush() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).flush());
    }

    @Test
    void testFailingSecondarySync() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingSecondary();
        this.testFailingSecondaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).sync());
    }

    @Test
    void testPrimaryWriteFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> {
            for (int i = 0; i < 128; ++i) {
                duplicatingStream.write(42);
            }
        });
    }

    @Test
    void testFailingPrimaryWriteArrayFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512]));
    }

    @Test
    void testFailingPrimaryWriteArrayOffsFail() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> duplicatingStream.write(new byte[512], 20, 130));
    }

    @Test
    void testFailingPrimaryFlush() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).flush());
    }

    @Test
    void testFailingPrimarySync() throws Exception {
        DuplicatingCheckpointOutputStream duplicatingStream = this.createDuplicatingStreamWithFailingPrimary();
        this.testFailingPrimaryStream(duplicatingStream, () -> ((DuplicatingCheckpointOutputStream)duplicatingStream).sync());
    }

    private void testFailingSecondaryStream(DuplicatingCheckpointOutputStream duplicatingStream, StreamTestMethod testMethod) throws Exception {
        testMethod.call();
        duplicatingStream.write(42);
        FailingCheckpointOutStream secondary = (FailingCheckpointOutStream)duplicatingStream.getSecondaryOutputStream();
        Assertions.assertThat((boolean)secondary.isClosed()).isTrue();
        long pos = duplicatingStream.getPos();
        StreamStateHandle primaryHandle = duplicatingStream.closeAndGetPrimaryHandle();
        if (primaryHandle != null) {
            Assertions.assertThat((long)primaryHandle.getStateSize()).isEqualTo(pos);
        }
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((DuplicatingCheckpointOutputStream)duplicatingStream).closeAndGetSecondaryHandle()).isInstanceOf(IOException.class)).hasCause((Throwable)duplicatingStream.getSecondaryStreamException());
    }

    private void testFailingPrimaryStream(DuplicatingCheckpointOutputStream duplicatingStream, StreamTestMethod testMethod) throws Exception {
        try {
            Assertions.assertThatThrownBy(testMethod::call).isInstanceOf(IOException.class);
        }
        finally {
            IOUtils.closeQuietly((OutputStream)duplicatingStream);
        }
    }

    @Test
    void testUnalignedStreamsException() throws IOException {
        int streamCapacity = 0x100000;
        TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        TestMemoryCheckpointOutputStream secondaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        primaryStream.write(42);
        DuplicatingCheckpointOutputStream stream = new DuplicatingCheckpointOutputStream((CheckpointStateOutputStream)primaryStream, (CheckpointStateOutputStream)secondaryStream);
        Assertions.assertThat((Throwable)stream.getSecondaryStreamException()).isNotNull();
        Assertions.assertThat((boolean)secondaryStream.isClosed()).isTrue();
        stream.write(23);
        ((AbstractThrowableAssert)Assertions.assertThatThrownBy(() -> ((DuplicatingCheckpointOutputStream)stream).closeAndGetSecondaryHandle()).isInstanceOf(IOException.class)).hasCause((Throwable)stream.getSecondaryStreamException());
        StreamStateHandle primaryHandle = stream.closeAndGetPrimaryHandle();
        try (FSDataInputStream inputStream = primaryHandle.openInputStream();){
            Assertions.assertThat((int)inputStream.read()).isEqualTo(42);
            Assertions.assertThat((int)inputStream.read()).isEqualTo(23);
            Assertions.assertThat((int)inputStream.read()).isEqualTo(-1);
        }
    }

    private DuplicatingCheckpointOutputStream createDuplicatingStreamWithFailingSecondary() throws IOException {
        int streamCapacity = 0x100000;
        TestMemoryCheckpointOutputStream primaryStream = new TestMemoryCheckpointOutputStream(streamCapacity);
        FailingCheckpointOutStream failSecondaryStream = new FailingCheckpointOutStream();
        return new DuplicatingCheckpointOutputStream((CheckpointStateOutputStream)primaryStream, (CheckpointStateOutputStream)failSecondaryStream, 64);
    }

    private DuplicatingCheckpointOutputStream createDuplicatingStreamWithFailingPrimary() throws IOException {
        int streamCapacity = 0x100000;
        FailingCheckpointOutStream failPrimaryStream = new FailingCheckpointOutStream();
        TestMemoryCheckpointOutputStream secondary = new TestMemoryCheckpointOutputStream(streamCapacity);
        return new DuplicatingCheckpointOutputStream((CheckpointStateOutputStream)failPrimaryStream, (CheckpointStateOutputStream)secondary, 64);
    }

    @FunctionalInterface
    private static interface StreamTestMethod {
        public void call() throws IOException;
    }

    private static class FailingCheckpointOutStream
    extends CheckpointStateOutputStream {
        private boolean closed = false;

        private FailingCheckpointOutStream() {
        }

        @Nullable
        public StreamStateHandle closeAndGetHandle() throws IOException {
            throw new IOException();
        }

        public long getPos() throws IOException {
            return 0L;
        }

        public void write(int b) throws IOException {
            throw new IOException();
        }

        public void flush() throws IOException {
            throw new IOException();
        }

        public void sync() throws IOException {
            throw new IOException();
        }

        public void close() throws IOException {
            this.closed = true;
        }

        public boolean isClosed() {
            return this.closed;
        }
    }
}

