/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.filesystem.stream;

import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.filesystem.stream.PartitionCommitInfo;
import org.apache.flink.table.filesystem.stream.StreamingFileWriter;
import org.junit.Assert;
import org.junit.Before;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class StreamingFileWriterTest {
    @ClassRule
    public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
    private Path path;

    @Before
    public void before() throws IOException {
        File file = TEMPORARY_FOLDER.newFile();
        file.delete();
        this.path = new Path(file.toURI());
    }

    @Test
    public void testFailover() throws Exception {
        List<String> partitions;
        OperatorSubtaskState state;
        try (OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = this.create();){
            harness.setup();
            harness.initializeEmptyState();
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            state = harness.snapshot(1L, 1L);
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.notifyOfCompletedCheckpoint(1L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assert.assertEquals(Arrays.asList("1", "2"), partitions);
        }
        harness = this.create();
        var3_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            state = harness.snapshot(2L, 2L);
            harness.notifyOfCompletedCheckpoint(2L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assert.assertEquals(Arrays.asList("1", "2", "3", "4"), partitions);
        }
        catch (Throwable partitions2) {
            var3_2 = partitions2;
            throw partitions2;
        }
        finally {
            if (harness != null) {
                if (var3_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable partitions2) {
                        var3_2.addSuppressed(partitions2);
                    }
                } else {
                    harness.close();
                }
            }
        }
        harness = this.create();
        var3_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("5"), 0L);
            state = harness.snapshot(3L, 3L);
            harness.notifyOfCompletedCheckpoint(3L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assert.assertEquals(Arrays.asList("3", "4", "5"), partitions);
        }
        catch (Throwable partitions3) {
            var3_2 = partitions3;
            throw partitions3;
        }
        finally {
            if (harness != null) {
                if (var3_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable partitions3) {
                        var3_2.addSuppressed(partitions3);
                    }
                } else {
                    harness.close();
                }
            }
        }
        harness = this.create();
        var3_2 = null;
        try {
            harness.setup();
            harness.initializeState(state);
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("6"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("7"), 0L);
            harness.snapshot(4L, 4L);
            harness.processElement((Object)StreamingFileWriterTest.row("8"), 0L);
            harness.snapshot(5L, 5L);
            harness.processElement((Object)StreamingFileWriterTest.row("9"), 0L);
            harness.snapshot(6L, 6L);
            harness.notifyOfCompletedCheckpoint(5L);
            partitions = StreamingFileWriterTest.collect(harness);
            Assert.assertEquals(Arrays.asList("4", "5", "6", "7", "8"), partitions);
        }
        catch (Throwable throwable) {
            var3_2 = throwable;
            throw throwable;
        }
        finally {
            if (harness != null) {
                if (var3_2 != null) {
                    try {
                        harness.close();
                    }
                    catch (Throwable throwable) {
                        var3_2.addSuppressed(throwable);
                    }
                } else {
                    harness.close();
                }
            }
        }
    }

    @Test
    public void testCommitImmediately() throws Exception {
        try (OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = this.create();){
            harness.setup();
            harness.initializeEmptyState();
            harness.open();
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("2"), 0L);
            harness.snapshot(1L, 1L);
            harness.processElement((Object)StreamingFileWriterTest.row("1"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("3"), 0L);
            harness.processElement((Object)StreamingFileWriterTest.row("4"), 0L);
            harness.notifyOfCompletedCheckpoint(1L);
            List<String> partitions = StreamingFileWriterTest.collect(harness);
            Assert.assertEquals(Arrays.asList("1", "2"), partitions);
        }
    }

    private static RowData row(String s) {
        return GenericRowData.of((Object[])new Object[]{StringData.fromString((String)s)});
    }

    private static List<String> collect(OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness) {
        ArrayList<String> parts = new ArrayList<String>();
        harness.extractOutputValues().forEach(m -> parts.addAll(m.getPartitions()));
        return parts;
    }

    private OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> create() throws Exception {
        StreamingFileWriter writer = new StreamingFileWriter(1000L, (StreamingFileSink.BucketsBuilder)((StreamingFileSink.DefaultRowFormatBuilder)StreamingFileSink.forRowFormat((Path)this.path, (Encoder & Serializable)(element, stream) -> stream.write((element.getString(0) + "\n").getBytes(StandardCharsets.UTF_8))).withBucketAssigner((BucketAssigner)new BucketAssigner<RowData, String>(){

            public String getBucketId(RowData element, BucketAssigner.Context context) {
                return element.getString(0).toString();
            }

            public SimpleVersionedSerializer<String> getSerializer() {
                return SimpleVersionedStringSerializer.INSTANCE;
            }
        })).withRollingPolicy((RollingPolicy)OnCheckpointRollingPolicy.build()));
        OneInputStreamOperatorTestHarness harness = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)writer, 1, 1, 0);
        harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        return harness;
    }
}

