/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.compactor;

import java.io.BufferedWriter;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import org.apache.flink.connector.file.sink.FileSinkCommittable;
import org.apache.flink.connector.file.sink.FileSinkCommittableSerializer;
import org.apache.flink.connector.file.sink.compactor.AbstractCompactTestBase;
import org.apache.flink.connector.file.sink.compactor.DecoderBasedReader;
import org.apache.flink.connector.file.sink.compactor.FileCompactStrategy;
import org.apache.flink.connector.file.sink.compactor.FileCompactor;
import org.apache.flink.connector.file.sink.compactor.RecordWiseFileCompactor;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperator;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorOperatorStateHandler;
import org.apache.flink.connector.file.sink.compactor.operator.CompactorRequest;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.connector.file.sink.utils.IntegerFileSinkTestDataUtils;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.CompactingFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedCompactingFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.WriterProperties;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.types.Either;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactory;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;

class CompactorOperatorTest
extends AbstractCompactTestBase {
    CompactorOperatorTest() {
    }

    @Test
    void testCompact() throws Exception {
        RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor((RecordWiseFileCompactor.Reader.Factory)new DecoderBasedReader.Factory(IntegerFileSinkTestDataUtils.IntDecoder::new));
        CompactorOperator compactor = this.createTestOperator((FileCompactor)fileCompactor);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)compactor);){
            harness.setup();
            harness.open();
            harness.processElement(this.request("0", Arrays.asList(this.committable("0", ".0", 5), this.committable("0", ".1", 5)), null));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            harness.snapshot(1L, 1L);
            harness.notifyOfCompletedCheckpoint(1L);
            compactor.getAllTasksFuture().join();
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(2L);
            ListAssert results = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(4);
            ((CommittableWithLineageAssert)results.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.committable("0", "compacted-0", 10));
            ((CommittableWithLineageAssert)results.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".0"));
            ((CommittableWithLineageAssert)results.element(3, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".1"));
        }
    }

    @Test
    void testPassthrough() throws Exception {
        RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor((RecordWiseFileCompactor.Reader.Factory)new DecoderBasedReader.Factory(IntegerFileSinkTestDataUtils.IntDecoder::new));
        CompactorOperator compactor = this.createTestOperator((FileCompactor)fileCompactor);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)compactor);){
            harness.setup();
            harness.open();
            FileSinkCommittable cleanupInprogressRequest = this.cleanupInprogress("0", "0", 1);
            FileSinkCommittable cleanupPathRequest = this.cleanupPath("0", "1");
            harness.processElement(this.request("0", null, Collections.singletonList(cleanupInprogressRequest)));
            harness.processElement(this.request("0", null, Collections.singletonList(cleanupPathRequest)));
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(1L);
            harness.snapshot(1L, 1L);
            harness.notifyOfCompletedCheckpoint(1L);
            compactor.getAllTasksFuture().join();
            Assertions.assertThat((List)harness.extractOutputValues()).isEmpty();
            harness.prepareSnapshotPreBarrier(2L);
            ListAssert messages = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(3);
            ((CommittableWithLineageAssert)messages.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)cleanupInprogressRequest);
            ((CommittableWithLineageAssert)messages.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)cleanupPathRequest);
        }
    }

    @Test
    void testRestore() throws Exception {
        OperatorSubtaskState state;
        RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor((RecordWiseFileCompactor.Reader.Factory)new DecoderBasedReader.Factory(IntegerFileSinkTestDataUtils.IntDecoder::new));
        CompactorOperator compactor = this.createTestOperator((FileCompactor)fileCompactor);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)compactor);){
            harness.setup();
            harness.open();
            harness.processElement(this.request("0", Arrays.asList(this.committable("0", ".0", 5), this.committable("0", ".1", 5)), null));
            harness.snapshot(1L, 1L);
            harness.processElement(this.request("0", Arrays.asList(this.committable("0", ".2", 5), this.committable("0", ".3", 5)), null));
            harness.notifyOfCompletedCheckpoint(1L);
            state = harness.snapshot(2L, 2L);
        }
        compactor = this.createTestOperator((FileCompactor)fileCompactor);
        harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)compactor);
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            compactor.getAllTasksFuture().join();
            harness.prepareSnapshotPreBarrier(3L);
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(4);
            harness.snapshot(3L, 3L);
            harness.notifyOfCompletedCheckpoint(3L);
            compactor.getAllTasksFuture().join();
            harness.prepareSnapshotPreBarrier(4L);
            Assertions.assertThat((List)harness.extractOutputValues()).hasSize(8);
            ListAssert results = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(8);
            results.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
            ((CommittableWithLineageAssert)results.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.committable("0", "compacted-0", 10));
            ((CommittableWithLineageAssert)results.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".0"));
            ((CommittableWithLineageAssert)results.element(3, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".1"));
            results.element(4, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
            ((CommittableWithLineageAssert)results.element(5, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.committable("0", "compacted-2", 10));
            ((CommittableWithLineageAssert)results.element(6, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".2"));
            ((CommittableWithLineageAssert)results.element(7, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".3"));
        }
        finally {
            harness.close();
        }
    }

    @Test
    void testStateHandler() throws Exception {
        OperatorSubtaskState state;
        RecordWiseFileCompactor fileCompactor = new RecordWiseFileCompactor((RecordWiseFileCompactor.Reader.Factory)new DecoderBasedReader.Factory(IntegerFileSinkTestDataUtils.IntDecoder::new));
        CompactorOperator compactor = this.createTestOperator((FileCompactor)fileCompactor);
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)compactor);){
            harness.setup();
            harness.open();
            harness.processElement(this.request("0", Arrays.asList(this.committable("0", ".0", 1), this.committable("0", ".1", 2)), null));
            harness.snapshot(1L, 1L);
            harness.processElement(this.request("0", Arrays.asList(this.committable("0", ".2", 3), this.committable("0", ".3", 4)), null));
            harness.notifyOfCompletedCheckpoint(1L);
            state = harness.snapshot(2L, 2L);
        }
        CompactorOperatorStateHandler handler = new CompactorOperatorStateHandler(null, this.getTestCommittableSerializer(), this.createTestBucketWriter());
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)handler);){
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement(new StreamRecord((Object)Either.Right((Object)((CompactorRequest)this.request("0", Collections.singletonList(this.committable("0", ".4", 5)), null).getValue()))));
            harness.processElement(new StreamRecord((Object)Either.Right((Object)((CompactorRequest)this.request("0", Collections.singletonList(this.committable("0", ".5", 6)), null).getValue()))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableSummary(0, 1, 3L, 2, 2, 0))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableWithLineage((Object)this.committable("0", ".6", 7), 3L, 0))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableWithLineage((Object)this.committable("0", "7", 8), 3L, 0))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableSummary(0, 1, 4L, 0, 0, 0))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableSummary(0, 1, 5L, 3, 3, 0))));
            ListAssert results = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(18);
            results.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
            List<FileSinkCommittable> expectedResult = Arrays.asList(this.committable("0", "compacted-0", 1), this.cleanupPath("0", ".0"), this.committable("0", "compacted-1", 2), this.cleanupPath("0", ".1"), this.committable("0", "compacted-2", 3), this.cleanupPath("0", ".2"), this.committable("0", "compacted-3", 4), this.cleanupPath("0", ".3"), this.committable("0", "compacted-4", 5), this.cleanupPath("0", ".4"), this.committable("0", "compacted-5", 6), this.cleanupPath("0", ".5"), this.committable("0", "compacted-6", 7), this.committable("0", "7", 8));
            for (int i = 0; i < expectedResult.size(); ++i) {
                ((CommittableWithLineageAssert)results.element(i + 1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)expectedResult.get(i));
            }
            results.element(15, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
            ((CommittableWithLineageAssert)results.element(16, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".6"));
            results.element(17, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
        }
    }

    @Test
    void testStateHandlerRestore() throws Exception {
        ListAssert results;
        OperatorSubtaskState state;
        try (OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new CompactorOperatorStateHandler(null, this.getTestCommittableSerializer(), this.createTestBucketWriter()));){
            harness.setup();
            harness.open();
            harness.processElement(new StreamRecord((Object)Either.Right((Object)((CompactorRequest)this.request("0", Collections.singletonList(this.committable("0", ".1", 1)), null).getValue()))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableSummary(0, 1, 1L, 2, 2, 0))));
            state = harness.snapshot(1L, 1L);
            results = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(3);
            results.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
            ((CommittableWithLineageAssert)results.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.committable("0", "compacted-1", 1));
            ((CommittableWithLineageAssert)results.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".1"));
        }
        harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new CompactorOperatorStateHandler(null, this.getTestCommittableSerializer(), this.createTestBucketWriter()));
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableWithLineage((Object)this.committable("0", ".2", 2), 1L, 0))));
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableWithLineage((Object)this.committable("0", "3", 3), 1L, 0))));
            state = harness.snapshot(2L, 2L);
            results = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(2);
            ((CommittableWithLineageAssert)results.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.committable("0", "2", 2));
            ((CommittableWithLineageAssert)results.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.committable("0", "3", 3));
        }
        finally {
            harness.close();
        }
        harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new CompactorOperatorStateHandler(null, this.getTestCommittableSerializer(), this.createTestBucketWriter()));
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement(new StreamRecord((Object)Either.Left((Object)new CommittableSummary(0, 1, 2L, 0, 0, 0))));
            results = (ListAssert)Assertions.assertThat((List)harness.extractOutputValues()).hasSize(2);
            results.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()));
            ((CommittableWithLineageAssert)results.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)this.cleanupPath("0", ".2"));
        }
        finally {
            harness.close();
        }
    }

    private StreamRecord<CompactorRequest> request(String bucketId, List<FileSinkCommittable> toCompact, List<FileSinkCommittable> toPassthrough) {
        return new StreamRecord((Object)new CompactorRequest(bucketId, toCompact == null ? new ArrayList() : toCompact, (List)(toPassthrough == null ? new ArrayList() : toPassthrough)), 0L);
    }

    private FileSinkCommittable committable(String bucketId, String name, int size) throws IOException {
        return new FileSinkCommittable(bucketId, (InProgressFileWriter.PendingFileRecoverable)new FileSinkTestUtils.TestPendingFileRecoverable(this.newFile(name + "_" + bucketId, size <= 0 ? 1 : size), size));
    }

    private FileSinkCommittable cleanupInprogress(String bucketId, String name, int size) throws IOException {
        Path toCleanup = this.newFile(name + "_" + bucketId, size);
        return new FileSinkCommittable(bucketId, (InProgressFileWriter.InProgressFileRecoverable)new FileSinkTestUtils.TestInProgressFileRecoverable(toCleanup, size));
    }

    private FileSinkCommittable cleanupPath(String bucketId, String name) throws IOException {
        Path toCleanup = this.newFile(name + "_" + bucketId, 1);
        return new FileSinkCommittable(bucketId, toCleanup);
    }

    private SimpleVersionedSerializer<FileSinkCommittable> getTestCommittableSerializer() {
        return new FileSinkCommittableSerializer(new FileSinkTestUtils.SimpleVersionedWrapperSerializer<InProgressFileWriter.PendingFileRecoverable>(FileSinkTestUtils.TestPendingFileRecoverable::new), new FileSinkTestUtils.SimpleVersionedWrapperSerializer<InProgressFileWriter.InProgressFileRecoverable>(FileSinkTestUtils.TestInProgressFileRecoverable::new));
    }

    private CompactorOperator createTestOperator(FileCompactor compactor) {
        return new CompactorOperator(null, FileCompactStrategy.Builder.newBuilder().setNumCompactThreads(2).enableCompactionOnCheckpoint(1).build(), this.getTestCommittableSerializer(), compactor, this.createTestBucketWriter());
    }

    private BucketWriter<?, String> createTestBucketWriter() {
        return new BucketWriter<Integer, String>(){

            public InProgressFileWriter<Integer, String> openNewInProgressFile(final String bucketId, final Path path, long creationTime) throws IOException {
                return new InProgressFileWriter<Integer, String>(){
                    BufferedWriter writer;
                    long size = 0L;

                    public void write(Integer element, long currentTime) throws IOException {
                        if (this.writer == null) {
                            this.writer = new BufferedWriter(new FileWriter(path.toString()));
                        }
                        this.writer.write(element);
                        ++this.size;
                    }

                    public InProgressFileWriter.InProgressFileRecoverable persist() throws IOException {
                        return new FileSinkTestUtils.TestInProgressFileRecoverable(path, this.size);
                    }

                    public InProgressFileWriter.PendingFileRecoverable closeForCommit() throws IOException {
                        return new FileSinkTestUtils.TestPendingFileRecoverable(path, this.size);
                    }

                    public void dispose() {
                    }

                    public String getBucketId() {
                        return bucketId;
                    }

                    public long getCreationTime() {
                        return 0L;
                    }

                    public long getSize() throws IOException {
                        return this.size;
                    }

                    public long getLastUpdateTime() {
                        return 0L;
                    }
                };
            }

            public InProgressFileWriter<Integer, String> resumeInProgressFileFrom(String s, InProgressFileWriter.InProgressFileRecoverable inProgressFileSnapshot, long creationTime) throws IOException {
                return null;
            }

            public WriterProperties getProperties() {
                return null;
            }

            public BucketWriter.PendingFile recoverPendingFile(final InProgressFileWriter.PendingFileRecoverable pendingFileRecoverable) throws IOException {
                return new BucketWriter.PendingFile(){

                    public void commit() throws IOException {
                        FileSinkTestUtils.TestPendingFileRecoverable testRecoverable = (FileSinkTestUtils.TestPendingFileRecoverable)pendingFileRecoverable;
                        if (testRecoverable.getPath() != null && !testRecoverable.getPath().equals((Object)testRecoverable.getUncommittedPath())) {
                            testRecoverable.getPath().getFileSystem().rename(testRecoverable.getUncommittedPath(), testRecoverable.getPath());
                        }
                    }

                    public void commitAfterRecovery() throws IOException {
                        this.commit();
                    }
                };
            }

            public boolean cleanupInProgressFileRecoverable(InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable) throws IOException {
                return false;
            }

            public CompactingFileWriter openNewCompactingFile(CompactingFileWriter.Type type, String bucketId, final Path path, long creationTime) throws IOException {
                if (type == CompactingFileWriter.Type.RECORD_WISE) {
                    return this.openNewInProgressFile(bucketId, path, creationTime);
                }
                final FileOutputStream fileOutputStream = new FileOutputStream(path.toString());
                return new OutputStreamBasedCompactingFileWriter(){

                    public OutputStream asOutputStream() throws IOException {
                        return fileOutputStream;
                    }

                    public InProgressFileWriter.PendingFileRecoverable closeForCommit() throws IOException {
                        fileOutputStream.flush();
                        return new FileSinkTestUtils.TestPendingFileRecoverable(path, fileOutputStream.getChannel().position());
                    }
                };
            }
        };
    }
}

