package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.shaded.guava32.com.google.common.collect.ImmutableList;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest.class */
class SinkV2SinkWriterOperatorTest extends SinkWriterOperatorTestBase {

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest$InspectableSink.class */
    public static class InspectableSink extends SinkWriterOperatorTestBase.AbstractInspectableSink<TestSinkV2<Integer>> {
        InspectableSink(TestSinkV2<Integer> testSinkV2) {
            super(testSinkV2);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase.InspectableSink
        public long getLastCheckpointId() {
            return getSink().getWriter().lastCheckpointId;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase.InspectableSink
        public List<String> getRecordsOfCurrentCheckpoint() {
            return getSink().getWriter().elements;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase.InspectableSink
        public List<Watermark> getWatermarks() {
            return getSink().getWriter().watermarks;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase.InspectableSink
        public int getRecordCountFromState() {
            return ((TestSinkV2.DefaultStatefulSinkWriter) getSink().getWriter()).getRecordCount();
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest$SnapshottingBufferingSinkWriter.class */
    private static class SnapshottingBufferingSinkWriter extends TestSinkV2.DefaultStatefulSinkWriter {
        public static final int NOT_SNAPSHOTTED = -1;
        long lastCheckpointId = -1;
        boolean endOfInput = false;

        private SnapshottingBufferingSinkWriter() {
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommittingSinkWriter, org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultSinkWriter
        public void flush(boolean z) throws IOException, InterruptedException {
            this.endOfInput = z;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultStatefulSinkWriter
        public List<String> snapshotState(long j) throws IOException {
            this.lastCheckpointId = j;
            return super.snapshotState(j);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultCommittingSinkWriter
        public Collection<String> prepareCommit() {
            if (!this.endOfInput) {
                return ImmutableList.of();
            }
            List<String> list = this.elements;
            this.elements = new ArrayList();
            return list;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkV2SinkWriterOperatorTest$TimeBasedBufferingSinkWriter.class */
    public static class TimeBasedBufferingSinkWriter extends TestSinkV2.DefaultCommittingSinkWriter<Integer> implements ProcessingTimeService.ProcessingTimeCallback {
        private final List<String> cachedCommittables = new ArrayList();
        private ProcessingTimeService processingTimeService;

        private TimeBasedBufferingSinkWriter() {
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultSinkWriter
        public void write(Integer num, SinkWriter.Context context) {
            this.cachedCommittables.add(Tuple3.of(num, context.timestamp(), Long.valueOf(context.currentWatermark())).toString());
        }

        public void onProcessingTime(long j) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimeService.registerTimer(j + 1000, this);
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.TestSinkV2.DefaultSinkWriter
        public void init(WriterInitContext writerInitContext) {
            this.processingTimeService = writerInitContext.getProcessingTimeService();
            this.processingTimeService.registerTimer(1000L, this);
        }
    }

    SinkV2SinkWriterOperatorTest() {
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    public InspectableSink sinkWithoutCommitter() {
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(new TestSinkV2.DefaultSinkWriter()).build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    public InspectableSink sinkWithCommitter() {
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(new TestSinkV2.DefaultCommittingSinkWriter()).setDefaultCommitter().build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    public InspectableSink sinkWithTimeBasedWriter() {
        return new InspectableSink(TestSinkV2.newBuilder().setWriter(new TimeBasedBufferingSinkWriter()).setDefaultCommitter().build());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase
    public InspectableSink sinkWithState(boolean z, String str) {
        TestSinkV2.Builder writer = TestSinkV2.newBuilder().setDefaultCommitter().setWithPostCommitTopology(true).setWriter(new TestSinkV2.DefaultStatefulSinkWriter());
        if (z) {
            writer.setWriterState(true);
        }
        if (str != null) {
            writer.setCompatibleStateNames(str);
        }
        return new InspectableSink(writer.build());
    }
}
