/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.sink.Sink;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.connector.file.sink.writer.DefaultFileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriter;
import org.apache.flink.connector.file.sink.writer.FileWriterBucket;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketFactory;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
import org.apache.flink.util.ExceptionUtils;
import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;

public class FileWriterTest {
    @ClassRule
    public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();

    @Test
    public void testPreCommit() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        FileWriter<String> fileWriter = FileWriterTest.createWriter(path, (RollingPolicy<String, String>)OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test3", (SinkWriter.Context)new ContextImpl());
        List committables = fileWriter.prepareCommit(false);
        Assert.assertEquals((long)3L, (long)committables.size());
    }

    @Test
    public void testSnapshotAndRestore() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        FileWriter<String> fileWriter = FileWriterTest.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        fileWriter.write((Object)"test3", (SinkWriter.Context)new ContextImpl());
        Assert.assertEquals((long)3L, (long)fileWriter.getActiveBuckets().size());
        fileWriter.prepareCommit(false);
        List states = fileWriter.snapshotState();
        Assert.assertEquals((long)3L, (long)states.size());
        fileWriter = FileWriterTest.restoreWriter(states, path, (RollingPolicy<String, String>)OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        Assert.assertEquals(fileWriter.getActiveBuckets().keySet(), new HashSet<String>(Arrays.asList("test1", "test2", "test3")));
        for (FileWriterBucket bucket : fileWriter.getActiveBuckets().values()) {
            Assert.assertNotNull((String)"The in-progress file should be recovered", (Object)bucket.getInProgressPart());
        }
    }

    @Test
    public void testMergingForRescaling() throws Exception {
        FileWriterBucket bucket;
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        FileWriter<String> firstFileWriter = FileWriterTest.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        firstFileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        firstFileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        firstFileWriter.write((Object)"test3", (SinkWriter.Context)new ContextImpl());
        firstFileWriter.prepareCommit(false);
        List firstState = firstFileWriter.snapshotState();
        FileWriter<String> secondFileWriter = FileWriterTest.createWriter(path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        secondFileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        secondFileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        secondFileWriter.prepareCommit(false);
        List secondState = secondFileWriter.snapshotState();
        ArrayList<FileWriterBucketState> mergedState = new ArrayList<FileWriterBucketState>();
        mergedState.addAll(firstState);
        mergedState.addAll(secondState);
        FileWriter<String> restoredWriter = FileWriterTest.restoreWriter(mergedState, path, (RollingPolicy<String, String>)DefaultRollingPolicy.builder().build(), new OutputFileConfig("part-", ""));
        Assert.assertEquals((long)3L, (long)restoredWriter.getActiveBuckets().size());
        for (String bucketId : Arrays.asList("test1", "test2")) {
            bucket = (FileWriterBucket)restoredWriter.getActiveBuckets().get(bucketId);
            Assert.assertNotNull((String)"The in-progress file should be recovered", (Object)bucket.getInProgressPart());
            Assert.assertEquals((long)1L, (long)bucket.getPendingFiles().size());
        }
        for (String bucketId : Collections.singletonList("test3")) {
            bucket = (FileWriterBucket)restoredWriter.getActiveBuckets().get(bucketId);
            Assert.assertNotNull((String)"The in-progress file should be recovered", (Object)bucket.getInProgressPart());
            Assert.assertEquals((long)0L, (long)bucket.getPendingFiles().size());
        }
    }

    @Test
    public void testBucketIsRemovedWhenNotActive() throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        FileWriter<String> fileWriter = FileWriterTest.createWriter(path, (RollingPolicy<String, String>)OnCheckpointRollingPolicy.build(), new OutputFileConfig("part-", ""));
        fileWriter.write((Object)"test", (SinkWriter.Context)new ContextImpl());
        fileWriter.prepareCommit(false);
        fileWriter.snapshotState();
        fileWriter.prepareCommit(false);
        Assert.assertTrue((boolean)fileWriter.getActiveBuckets().isEmpty());
    }

    @Test
    public void testOnProcessingTime() throws IOException {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        ManuallyTriggeredProcessingTimeService processingTimeService = new ManuallyTriggeredProcessingTimeService();
        processingTimeService.advanceTo(10L);
        FileWriter<String> fileWriter = FileWriterTest.createWriter(path, new FileSinkTestUtils.StringIdentityBucketAssigner(), (RollingPolicy<String, String>)DefaultRollingPolicy.builder().withRolloverInterval(10L).build(), new OutputFileConfig("part-", ""), processingTimeService, 5L);
        fileWriter.initializeState(Collections.emptyList());
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(15L);
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(20L);
        FileWriterBucket test1Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test1");
        Assert.assertNull((String)"The in-progress part of test1 should be rolled", (Object)test1Bucket.getInProgressPart());
        Assert.assertEquals((long)1L, (long)test1Bucket.getPendingFiles().size());
        FileWriterBucket test2Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test2");
        Assert.assertNotNull((String)"The in-progress part of test2 should not be rolled", (Object)test2Bucket.getInProgressPart());
        Assert.assertEquals((long)0L, (long)test2Bucket.getPendingFiles().size());
        processingTimeService.advanceTo(30L);
        fileWriter.prepareCommit(false);
        fileWriter.write((Object)"test1", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(35L);
        fileWriter.write((Object)"test2", (SinkWriter.Context)new ContextImpl());
        processingTimeService.advanceTo(40L);
        test1Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test1");
        Assert.assertNull((String)"The in-progress part of test1 should be rolled", (Object)test1Bucket.getInProgressPart());
        Assert.assertEquals((long)1L, (long)test1Bucket.getPendingFiles().size());
        test2Bucket = (FileWriterBucket)fileWriter.getActiveBuckets().get("test2");
        Assert.assertNotNull((String)"The in-progress part of test2 should not be rolled", (Object)test2Bucket.getInProgressPart());
        Assert.assertEquals((long)0L, (long)test2Bucket.getPendingFiles().size());
    }

    @Test
    public void testContextPassingNormalExecution() throws Exception {
        this.testCorrectTimestampPassingInContext(1L, 2L, 3L);
    }

    @Test
    public void testContextPassingNullTimestamp() throws Exception {
        this.testCorrectTimestampPassingInContext(null, 4L, 5L);
    }

    private void testCorrectTimestampPassingInContext(Long timestamp, long watermark, long processingTime) throws Exception {
        File outDir = TEMP_FOLDER.newFolder();
        Path path = new Path(outDir.toURI());
        ManuallyTriggeredProcessingTimeService processingTimeService = new ManuallyTriggeredProcessingTimeService();
        processingTimeService.advanceTo(processingTime);
        FileWriter<String> fileWriter = FileWriterTest.createWriter(path, new VerifyingBucketAssigner(timestamp, watermark, processingTime), (RollingPolicy<String, String>)DefaultRollingPolicy.builder().withRolloverInterval(10L).build(), new OutputFileConfig("part-", ""), processingTimeService, 5L);
        fileWriter.initializeState(Collections.emptyList());
        fileWriter.write((Object)"test", (SinkWriter.Context)new ContextImpl(watermark, timestamp));
    }

    private static FileWriter<String> createWriter(Path basePath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException {
        return new FileWriter(basePath, (BucketAssigner)new FileSinkTestUtils.StringIdentityBucketAssigner(), (FileWriterBucketFactory)new DefaultFileWriterBucketFactory(), (BucketWriter)new RowWiseBucketWriter(FileSystem.get((URI)basePath.toUri()).createRecoverableWriter(), (Encoder)new SimpleStringEncoder()), rollingPolicy, outputFileConfig, (Sink.ProcessingTimeService)new ManuallyTriggeredProcessingTimeService(), 10L);
    }

    private static FileWriter<String> createWriter(Path basePath, BucketAssigner<String, String> bucketAssigner, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig, Sink.ProcessingTimeService processingTimeService, long bucketCheckInterval) throws IOException {
        return new FileWriter(basePath, bucketAssigner, (FileWriterBucketFactory)new DefaultFileWriterBucketFactory(), (BucketWriter)new RowWiseBucketWriter(FileSystem.get((URI)basePath.toUri()).createRecoverableWriter(), (Encoder)new SimpleStringEncoder()), rollingPolicy, outputFileConfig, processingTimeService, bucketCheckInterval);
    }

    private static FileWriter<String> restoreWriter(List<FileWriterBucketState> states, Path basePath, RollingPolicy<String, String> rollingPolicy, OutputFileConfig outputFileConfig) throws IOException {
        FileWriter<String> writer = FileWriterTest.createWriter(basePath, rollingPolicy, outputFileConfig);
        writer.initializeState(states);
        return writer;
    }

    private static class VerifyingBucketAssigner
    implements BucketAssigner<String, String> {
        private static final long serialVersionUID = 7729086510972377578L;
        private final Long expectedTimestamp;
        private final long expectedWatermark;
        private final long expectedProcessingTime;

        VerifyingBucketAssigner(Long expectedTimestamp, long expectedWatermark, long expectedProcessingTime) {
            this.expectedTimestamp = expectedTimestamp;
            this.expectedWatermark = expectedWatermark;
            this.expectedProcessingTime = expectedProcessingTime;
        }

        public String getBucketId(String element, BucketAssigner.Context context) {
            Long elementTimestamp = context.timestamp();
            long watermark = context.currentWatermark();
            long processingTime = context.currentProcessingTime();
            Assert.assertEquals((Object)this.expectedTimestamp, (Object)elementTimestamp);
            Assert.assertEquals((long)this.expectedProcessingTime, (long)processingTime);
            Assert.assertEquals((long)this.expectedWatermark, (long)watermark);
            return element;
        }

        public SimpleVersionedSerializer<String> getSerializer() {
            return SimpleVersionedStringSerializer.INSTANCE;
        }
    }

    private static class ManuallyTriggeredProcessingTimeService
    implements Sink.ProcessingTimeService {
        private long now;
        private final Queue<Tuple2<Long, Sink.ProcessingTimeService.ProcessingTimeCallback>> timers = new PriorityQueue<Tuple2>(Comparator.comparingLong(o -> (Long)o.f0));

        private ManuallyTriggeredProcessingTimeService() {
        }

        public long getCurrentProcessingTime() {
            return this.now;
        }

        public void registerProcessingTimer(long time, Sink.ProcessingTimeService.ProcessingTimeCallback processingTimeCallback) {
            if (time <= this.now) {
                try {
                    processingTimeCallback.onProcessingTime(this.now);
                }
                catch (IOException e) {
                    ExceptionUtils.rethrow((Throwable)e);
                }
            } else {
                this.timers.add((Tuple2<Long, Sink.ProcessingTimeService.ProcessingTimeCallback>)new Tuple2((Object)time, (Object)processingTimeCallback));
            }
        }

        public void advanceTo(long time) throws IOException {
            if (time > this.now) {
                Tuple2<Long, Sink.ProcessingTimeService.ProcessingTimeCallback> timer;
                this.now = time;
                while ((timer = this.timers.peek()) != null && (Long)timer.f0 <= this.now) {
                    ((Sink.ProcessingTimeService.ProcessingTimeCallback)timer.f1).onProcessingTime(this.now);
                    this.timers.poll();
                }
            }
        }
    }

    private static class ContextImpl
    implements SinkWriter.Context {
        private final long watermark;
        private final Long timestamp;

        public ContextImpl() {
            this(0L, 0L);
        }

        private ContextImpl(long watermark, Long timestamp) {
            this.watermark = watermark;
            this.timestamp = timestamp;
        }

        public long currentWatermark() {
            return this.watermark;
        }

        public Long timestamp() {
            return this.timestamp;
        }
    }
}

