/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.checkpoint.channel;

import java.io.IOException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Consumer;
import org.apache.flink.core.memory.HeapMemorySegment;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequest;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.SyncChannelStateWriteRequestExecutor;
import org.apache.flink.runtime.checkpoint.channel.TestException;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.ChannelPersistenceITCase;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.function.BiConsumerWithException;
import org.apache.flink.util.function.RunnableWithException;
import org.junit.Assert;
import org.junit.Test;

public class ChannelStateWriterImplTest {
    private static final long CHECKPOINT_ID = 42L;
    private static final String TASK_NAME = "test";

    @Test(expected=IllegalArgumentException.class)
    public void testAddEventBuffer() throws Exception {
        NetworkBuffer dataBuf = this.getBuffer();
        NetworkBuffer eventBuf = this.getBuffer();
        eventBuf.setDataType(Buffer.DataType.EVENT_BUFFER);
        try {
            this.runWithSyncWorker((ChannelStateWriter writer) -> {
                this.callStart((ChannelStateWriter)writer);
                writer.addInputData(42L, new InputChannelInfo(1, 1), 1, CloseableIterator.ofElements(Buffer::recycleBuffer, (Object[])new Buffer[]{eventBuf, dataBuf}));
            });
        }
        finally {
            Assert.assertTrue((boolean)dataBuf.isRecycled());
        }
    }

    @Test
    public void testResultCompletion() throws IOException {
        ChannelStateWriter.ChannelStateWriteResult result;
        try (ChannelStateWriterImpl writer = this.openWriter();){
            this.callStart((ChannelStateWriter)writer);
            result = writer.getAndRemoveWriteResult(42L);
            Assert.assertFalse((boolean)result.resultSubpartitionStateHandles.isDone());
            Assert.assertFalse((boolean)result.inputChannelStateHandles.isDone());
        }
        Assert.assertTrue((boolean)result.inputChannelStateHandles.isDone());
        Assert.assertTrue((boolean)result.resultSubpartitionStateHandles.isDone());
    }

    @Test
    public void testAbort() throws Exception {
        NetworkBuffer buffer = this.getBuffer();
        this.runWithSyncWorker((BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception>)((BiConsumerWithException)(writer, worker) -> {
            this.callStart((ChannelStateWriter)writer);
            ChannelStateWriter.ChannelStateWriteResult result = writer.getAndRemoveWriteResult(42L);
            this.callAddInputData((ChannelStateWriter)writer, buffer);
            this.callAbort((ChannelStateWriter)writer);
            worker.processAllRequests();
            Assert.assertTrue((boolean)result.isDone());
            Assert.assertTrue((boolean)buffer.isRecycled());
        }));
    }

    @Test(expected=IllegalArgumentException.class)
    public void testAbortClearsResults() throws Exception {
        NetworkBuffer buffer = this.getBuffer();
        this.runWithSyncWorker((BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception>)((BiConsumerWithException)(writer, worker) -> {
            this.callStart((ChannelStateWriter)writer);
            writer.abort(42L, (Throwable)new TestException(), true);
            writer.getAndRemoveWriteResult(42L);
        }));
    }

    @Test
    public void testAbortDoesNotClearsResults() throws Exception {
        this.runWithSyncWorker((BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception>)((BiConsumerWithException)(writer, worker) -> {
            this.callStart((ChannelStateWriter)writer);
            this.callAbort((ChannelStateWriter)writer);
            worker.processAllRequests();
            writer.getAndRemoveWriteResult(42L);
        }));
    }

    @Test
    public void testAbortIgnoresMissing() throws Exception {
        this.runWithSyncWorker(this::callAbort);
    }

    @Test(expected=TestException.class)
    public void testBuffersRecycledOnError() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(TestException.class, () -> {
            NetworkBuffer buffer = this.getBuffer();
            try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap(), this.failingWorker(), 5);){
                writer.open();
                this.callAddInputData((ChannelStateWriter)writer, buffer);
            }
            finally {
                Assert.assertTrue((boolean)buffer.isRecycled());
            }
        });
    }

    @Test
    public void testBuffersRecycledOnClose() throws Exception {
        NetworkBuffer buffer = this.getBuffer();
        this.runWithSyncWorker((ChannelStateWriter writer) -> {
            this.callStart((ChannelStateWriter)writer);
            this.callAddInputData((ChannelStateWriter)writer, buffer);
            Assert.assertFalse((boolean)buffer.isRecycled());
        });
        Assert.assertTrue((boolean)buffer.isRecycled());
    }

    @Test(expected=IllegalArgumentException.class)
    public void testNoAddDataAfterFinished() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalArgumentException.class, () -> this.runWithSyncWorker((ChannelStateWriter writer) -> {
            this.callStart((ChannelStateWriter)writer);
            this.callFinish((ChannelStateWriter)writer);
            this.callAddInputData((ChannelStateWriter)writer, new NetworkBuffer[0]);
        }));
    }

    @Test(expected=IllegalArgumentException.class)
    public void testAddDataNotStarted() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalArgumentException.class, () -> this.runWithSyncWorker((ChannelStateWriter writer) -> this.callAddInputData((ChannelStateWriter)writer, new NetworkBuffer[0])));
    }

    @Test(expected=IllegalArgumentException.class)
    public void testFinishNotStarted() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalArgumentException.class, () -> this.runWithSyncWorker(this::callFinish));
    }

    @Test(expected=IllegalArgumentException.class)
    public void testRethrowOnClose() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalArgumentException.class, () -> this.runWithSyncWorker((ChannelStateWriter writer) -> {
            try {
                this.callFinish((ChannelStateWriter)writer);
            }
            catch (IllegalArgumentException illegalArgumentException) {
                // empty catch block
            }
        }));
    }

    @Test(expected=TestException.class)
    public void testRethrowOnNextCall() throws Exception {
        SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor();
        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap(), (ChannelStateWriteRequestExecutor)worker, 5);
        writer.open();
        worker.setThrown(new TestException());
        ChannelStateWriterImplTest.unwrappingError(TestException.class, () -> this.callStart((ChannelStateWriter)writer));
    }

    @Test(expected=IllegalStateException.class)
    public void testLimit() throws IOException {
        int maxCheckpoints = 3;
        try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, ChannelPersistenceITCase.getStreamFactoryFactory(), maxCheckpoints);){
            writer.open();
            for (int i = 0; i < maxCheckpoints; ++i) {
                writer.start((long)i, CheckpointOptions.forCheckpointWithDefaultLocation());
            }
            writer.start((long)maxCheckpoints, CheckpointOptions.forCheckpointWithDefaultLocation());
        }
    }

    @Test(expected=IllegalStateException.class)
    public void testStartNotOpened() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalStateException.class, () -> {
            try (ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, ChannelPersistenceITCase.getStreamFactoryFactory());){
                this.callStart((ChannelStateWriter)writer);
            }
        });
    }

    @Test(expected=IllegalStateException.class)
    public void testNoStartAfterClose() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalStateException.class, () -> {
            ChannelStateWriterImpl writer = this.openWriter();
            writer.close();
            writer.start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
        });
    }

    @Test(expected=IllegalStateException.class)
    public void testNoAddDataAfterClose() throws Exception {
        ChannelStateWriterImplTest.unwrappingError(IllegalStateException.class, () -> {
            ChannelStateWriterImpl writer = this.openWriter();
            this.callStart((ChannelStateWriter)writer);
            writer.close();
            this.callAddInputData((ChannelStateWriter)writer, new NetworkBuffer[0]);
        });
    }

    private static <T extends Throwable> void unwrappingError(Class<T> clazz, RunnableWithException r) throws Exception {
        try {
            r.run();
        }
        catch (Exception e) {
            throw ExceptionUtils.findThrowable((Throwable)e, clazz).map(te -> (Exception)te).orElse(e);
        }
    }

    private NetworkBuffer getBuffer() {
        return new NetworkBuffer((MemorySegment)HeapMemorySegment.FACTORY.allocateUnpooledSegment(123, null), FreeingBufferRecycler.INSTANCE);
    }

    private ChannelStateWriteRequestExecutor failingWorker() {
        return new ChannelStateWriteRequestExecutor(){

            public void close() {
            }

            public void submit(ChannelStateWriteRequest e) {
                throw new TestException();
            }

            public void submitPriority(ChannelStateWriteRequest e) {
                throw new TestException();
            }

            public void start() throws IllegalStateException {
            }
        };
    }

    private void runWithSyncWorker(Consumer<ChannelStateWriter> writerConsumer) throws Exception {
        this.runWithSyncWorker((BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception>)((BiConsumerWithException)(channelStateWriter, syncChannelStateWriterWorker) -> writerConsumer.accept((ChannelStateWriter)channelStateWriter)));
    }

    private void runWithSyncWorker(BiConsumerWithException<ChannelStateWriter, SyncChannelStateWriteRequestExecutor, Exception> testFn) throws Exception {
        try (SyncChannelStateWriteRequestExecutor worker = new SyncChannelStateWriteRequestExecutor();
             ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, new ConcurrentHashMap(), (ChannelStateWriteRequestExecutor)worker, 5);){
            writer.open();
            testFn.accept((Object)writer, (Object)worker);
            worker.processAllRequests();
        }
    }

    private ChannelStateWriterImpl openWriter() {
        ChannelStateWriterImpl writer = new ChannelStateWriterImpl(TASK_NAME, ChannelPersistenceITCase.getStreamFactoryFactory());
        writer.open();
        return writer;
    }

    private void callStart(ChannelStateWriter writer) {
        writer.start(42L, CheckpointOptions.forCheckpointWithDefaultLocation());
    }

    private void callAddInputData(ChannelStateWriter writer, NetworkBuffer ... buffer) {
        writer.addInputData(42L, new InputChannelInfo(1, 1), 1, CloseableIterator.ofElements(Buffer::recycleBuffer, (Object[])buffer));
    }

    private void callAbort(ChannelStateWriter writer) {
        writer.abort(42L, (Throwable)new TestException(), false);
    }

    private void callFinish(ChannelStateWriter writer) {
        writer.finishInput(42L);
        writer.finishOutput(42L);
    }
}

