/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummaryAssert;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineageAssert;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.operators.sink.SinkV1WriterCommittableSerializer;
import org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorFactory;
import org.apache.flink.streaming.runtime.operators.sink.TestSinkV2;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.InstanceOfAssertFactory;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

class SinkV2SinkWriterOperatorTest {
    SinkV2SinkWriterOperatorTest() {
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testLoadPreviousSinkState(boolean stateful) throws Exception {
        OperatorSubtaskState snapshot;
        OperatorSubtaskState previousSinkState;
        List<String> previousSinkInputs = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        TestSinkV2.DefaultStatefulSinkWriter writer = new TestSinkV2.DefaultStatefulSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWithPostCommitTopology(true).setWriter(writer).setWriterState(stateful).setCompatibleStateNames("compatible_sink_state").build();
        int expectedState = 5;
        try (OneInputStreamOperatorTestHarness previousSink = new OneInputStreamOperatorTestHarness(new CompatibleStateSinkOperator<Integer>(TestSinkV2.WRITER_SERIALIZER, expectedState), (TypeSerializer)StringSerializer.INSTANCE);){
            previousSinkState = TestHarnessUtil.buildSubtaskState((OneInputStreamOperatorTestHarness)previousSink, previousSinkInputs);
        }
        try (OneInputStreamOperatorTestHarness compatibleWriterOperator = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            compatibleWriterOperator.initializeState(previousSinkState);
            Assertions.assertThat((int)writer.getRecordCount()).isEqualTo(stateful ? expectedState : 0);
            compatibleWriterOperator.prepareSnapshotPreBarrier(1L);
            snapshot = compatibleWriterOperator.snapshot(1L, 1L);
        }
        TestSinkV2.DefaultStatefulSinkWriter restoredWriter = new TestSinkV2.DefaultStatefulSinkWriter();
        TestSinkV2 restoredSink = TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWithPostCommitTopology(true).setWriter(restoredWriter).setWriterState(stateful).setCompatibleStateNames("compatible_sink_state").build();
        try (OneInputStreamOperatorTestHarness restoredSinkOperator = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(restoredSink));){
            restoredSinkOperator.initializeState(snapshot);
            Assertions.assertThat((int)restoredWriter.getRecordCount()).isEqualTo(stateful ? expectedState : 0);
        }
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        TestSinkV2.DefaultSinkWriter writer = new TestSinkV2.DefaultSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            testHarness.processElement((Object)1, 1L);
            Assertions.assertThat((List)testHarness.extractOutputValues()).isEmpty();
            Assertions.assertThat(writer.getRecordsOfCurrentCheckpoint()).containsOnly((Object[])new TestSinkV2.Record[]{new TestSinkV2.Record<Integer>(1, 1L, Long.MIN_VALUE)});
            testHarness.prepareSnapshotPreBarrier(1L);
            Assertions.assertThat((List)testHarness.extractOutputValues()).isEmpty();
            Assertions.assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty();
        }
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        long initialTime = 0L;
        TestSinkV2.DefaultSinkWriter writer = new TestSinkV2.DefaultSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            testHarness.processWatermark(0L);
            testHarness.processWatermark(1L);
            Assertions.assertThat((Collection)testHarness.getOutput()).containsExactly(new Object[]{new Watermark(0L), new Watermark(1L)});
            Assertions.assertThat(writer.getWatermarks()).containsExactly((Object[])new org.apache.flink.api.common.eventtime.Watermark[]{new org.apache.flink.api.common.eventtime.Watermark(0L), new org.apache.flink.api.common.eventtime.Watermark(1L)});
        }
    }

    @Test
    void testTimeBasedBufferingSinkWriter() throws Exception {
        long initialTime = 0L;
        TimeBasedBufferingSinkWriter writer = new TimeBasedBufferingSinkWriter();
        TestSinkV2<Integer> sink = TestSinkV2.newBuilder().setWriter(writer).setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            testHarness.setProcessingTime(0L);
            testHarness.processElement((Object)1, 1L);
            testHarness.processElement((Object)2, 2L);
            testHarness.prepareSnapshotPreBarrier(1L);
            SinkV2SinkWriterOperatorTest.assertBasicOutput(testHarness.extractOutputValues(), 0, 1L);
            testHarness.getProcessingTimeService().setCurrentTime(2001L);
            testHarness.prepareSnapshotPreBarrier(2L);
            SinkV2SinkWriterOperatorTest.assertBasicOutput(testHarness.extractOutputValues().stream().skip(1L).collect(Collectors.toList()), 2, 2L);
        }
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        TestSinkV2.DefaultCommittingSinkWriter writer = new TestSinkV2.DefaultCommittingSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            Assertions.assertThat((List)testHarness.extractOutputValues()).isEmpty();
            testHarness.processElement((Object)1, 1L);
            testHarness.processElement((Object)2, 2L);
            testHarness.prepareSnapshotPreBarrier(1L);
            SinkV2SinkWriterOperatorTest.assertBasicOutput(testHarness.extractOutputValues(), 2, 1L);
        }
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        TestSinkV2.DefaultCommittingSinkWriter writer = new TestSinkV2.DefaultCommittingSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).build();
        SinkWriterOperatorFactory writerOperatorFactory = new SinkWriterOperatorFactory(sink);
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)writerOperatorFactory);){
            testHarness.open();
            Assertions.assertThat((List)testHarness.extractOutputValues()).isEmpty();
            testHarness.processElement((Object)1, 1L);
            testHarness.endInput();
            SinkV2SinkWriterOperatorTest.assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
        }
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testStateRestore(boolean stateful) throws Exception {
        OperatorSubtaskState snapshot;
        long initialTime = 0L;
        TestSinkV2.DefaultStatefulSinkWriter writer = new TestSinkV2.DefaultStatefulSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWithPostCommitTopology(true).setWriter(writer).setWriterState(stateful).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            testHarness.processWatermark(0L);
            testHarness.processElement((Object)1, 1L);
            testHarness.processElement((Object)2, 2L);
            testHarness.prepareSnapshotPreBarrier(1L);
            snapshot = testHarness.snapshot(1L, 1L);
            Assertions.assertThat((int)writer.getRecordCount()).isEqualTo(2);
            Assertions.assertThat((long)writer.getLastCheckpointId()).isEqualTo(stateful ? 1L : -1L);
        }
        TestSinkV2.DefaultStatefulSinkWriter restoredWriter = new TestSinkV2.DefaultStatefulSinkWriter();
        TestSinkV2 restoredSink = TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWithPostCommitTopology(true).setWriter(restoredWriter).setWriterState(stateful).build();
        try (OneInputStreamOperatorTestHarness restoredTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(restoredSink));){
            restoredTestHarness.initializeState(snapshot);
            restoredTestHarness.open();
            Assertions.assertThat((int)restoredWriter.getRecordCount()).isEqualTo(stateful ? 2 : 0);
        }
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        ListAssert records;
        OperatorSubtaskState committerState;
        List<TestSinkV2.Record> committables = Arrays.asList(new TestSinkV2.Record<Integer>(1, 1L, 1L), new TestSinkV2.Record<Integer>(2, 2L, 2L));
        TestSinkV2.DefaultCommittingSinkWriter writer = new TestSinkV2.DefaultCommittingSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).build();
        try (OneInputStreamOperatorTestHarness committer = new OneInputStreamOperatorTestHarness((OneInputStreamOperator)new TestCommitterOperator(new TestSinkV2.RecordSerializer<Integer>()));){
            committerState = TestHarnessUtil.buildSubtaskState((OneInputStreamOperatorTestHarness)committer, committables);
        }
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.initializeState(committerState);
            testHarness.open();
            testHarness.prepareSnapshotPreBarrier(2L);
            records = (ListAssert)Assertions.assertThat((List)testHarness.extractOutputValues()).hasSize(4);
        }
        ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasCheckpointId(1L).hasOverallCommittables(committables.size());
        ((CommittableWithLineageAssert)records.element(1, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)committables.get(0)).hasCheckpointId(1L).hasSubtaskId(0);
        ((CommittableWithLineageAssert)records.element(2, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableWithLineage()))).hasCommittable((Object)committables.get(1)).hasCheckpointId(1L).hasSubtaskId(0);
        ((CommittableSummaryAssert)records.element(3, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasCheckpointId(2L).hasOverallCommittables(0);
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    void testHandleEndInputInStreamingMode(boolean isCheckpointingEnabled) throws Exception {
        TestSinkV2.DefaultCommittingSinkWriter writer = new TestSinkV2.DefaultCommittingSinkWriter();
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(writer).setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            testHarness.processElement((Object)1, 1L);
            Assertions.assertThat((List)testHarness.extractOutputValues()).isEmpty();
            TestSinkV2.Record<Integer> record = new TestSinkV2.Record<Integer>(1, 1L, Long.MIN_VALUE);
            Assertions.assertThat(writer.getRecordsOfCurrentCheckpoint()).containsOnly((Object[])new TestSinkV2.Record[]{record});
            testHarness.endInput();
            if (isCheckpointingEnabled) {
                testHarness.prepareSnapshotPreBarrier(1L);
            }
            List<TestSinkV2.Record<Integer>> committables = Collections.singletonList(record);
            ListAssert records = (ListAssert)Assertions.assertThat((List)testHarness.extractOutputValues()).hasSize(committables.size() + 1);
            ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasOverallCommittables(committables.size());
            ((ListAssert)records.filteredOn(message -> message instanceof CommittableWithLineage)).map(message -> (TestSinkV2.Record)((CommittableWithLineage)message).getCommittable()).containsExactlyInAnyOrderElementsOf(committables);
            Assertions.assertThat(writer.getRecordsOfCurrentCheckpoint()).isEmpty();
        }
    }

    @Test
    void testDoubleEndOfInput() throws Exception {
        OperatorSubtaskState snapshot;
        TestSinkV2 sink = TestSinkV2.newBuilder().setWriter(new TestSinkV2.DefaultCommittingSinkWriter()).setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWriterState(true).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(sink));){
            testHarness.open();
            testHarness.processElement((Object)1, 1L);
            testHarness.endInput();
            testHarness.prepareSnapshotPreBarrier(1L);
            snapshot = testHarness.snapshot(1L, 1L);
            SinkV2SinkWriterOperatorTest.assertBasicOutput(testHarness.extractOutputValues(), 1, 1L);
        }
        TestSinkV2 restoredSink = TestSinkV2.newBuilder().setCommitter(new TestSinkV2.DefaultCommitter(), TestSinkV2.RecordSerializer::new).setWriter(new TestSinkV2.DefaultStatefulSinkWriter()).setWriterState(true).build();
        try (OneInputStreamOperatorTestHarness restoredTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory(restoredSink));){
            restoredTestHarness.setRestoredCheckpointId(1L);
            restoredTestHarness.initializeState(snapshot);
            restoredTestHarness.open();
            restoredTestHarness.processElement((Object)2, 2L);
            restoredTestHarness.endInput();
            restoredTestHarness.prepareSnapshotPreBarrier(3L);
            restoredTestHarness.snapshot(3L, 1L);
            SinkV2SinkWriterOperatorTest.assertBasicOutput(restoredTestHarness.extractOutputValues(), 1, 2L);
        }
    }

    @Test
    void testInitContext() throws Exception {
        AtomicReference initContext = new AtomicReference();
        Sink & Serializable sink = (Sink & Serializable)context -> {
            initContext.set(context);
            return null;
        };
        boolean subtaskId = true;
        int parallelism = 10;
        StringSerializer typeSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        MockEnvironment environment = MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build();
        try (OneInputStreamOperatorTestHarness testHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory)new SinkWriterOperatorFactory((Sink)sink), (TypeSerializer)typeSerializer, environment);){
            testHarness.open();
            Assertions.assertThat((Object)((WriterInitContext)initContext.get()).getUserCodeClassLoader()).isNotNull();
            Assertions.assertThat((Object)((WriterInitContext)initContext.get()).getMailboxExecutor()).isNotNull();
            Assertions.assertThat((Object)((WriterInitContext)initContext.get()).getProcessingTimeService()).isNotNull();
            Assertions.assertThat((int)((WriterInitContext)initContext.get()).getTaskInfo().getIndexOfThisSubtask()).isEqualTo(1);
            Assertions.assertThat((int)((WriterInitContext)initContext.get()).getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(10);
            Assertions.assertThat((int)((WriterInitContext)initContext.get()).getTaskInfo().getAttemptNumber()).isZero();
            Assertions.assertThat((Object)((WriterInitContext)initContext.get()).metricGroup()).isNotNull();
            Assertions.assertThat((OptionalLong)((WriterInitContext)initContext.get()).getRestoredCheckpointId()).isNotPresent();
            Assertions.assertThat((boolean)((WriterInitContext)initContext.get()).isObjectReuseEnabled()).isTrue();
            Assertions.assertThat((Object)((WriterInitContext)initContext.get()).createInputSerializer()).isEqualTo((Object)typeSerializer);
            Assertions.assertThat((Comparable)((WriterInitContext)initContext.get()).getJobInfo().getJobId()).isEqualTo((Object)jobID);
        }
    }

    private static void assertBasicOutput(List<? extends CommittableMessage<?>> output, int numberOfCommittables, long checkpointId) {
        ListAssert records = (ListAssert)Assertions.assertThat(output).hasSize(numberOfCommittables + 1);
        ((CommittableSummaryAssert)records.element(0, Assertions.as((InstanceOfAssertFactory)SinkV2Assertions.committableSummary()))).hasOverallCommittables(numberOfCommittables);
        ((ListAssert)records.filteredOn(r -> r instanceof CommittableWithLineage)).allSatisfy(cl -> SinkV2Assertions.assertThat((CommittableWithLineage)((CommittableWithLineage)cl)).hasCheckpointId(checkpointId).hasSubtaskId(0));
    }

    private static class TestingCommittableSerializer
    extends SinkV1WriterCommittableSerializer<TestSinkV2.Record<Integer>> {
        private final SimpleVersionedSerializer<TestSinkV2.Record<Integer>> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<TestSinkV2.Record<Integer>> committableSerializer) {
            super(committableSerializer);
            this.committableSerializer = committableSerializer;
        }

        public byte[] serialize(List<TestSinkV2.Record<Integer>> obj) throws IOException {
            DataOutputSerializer out = new DataOutputSerializer(256);
            out.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, obj, (DataOutputView)out);
            return out.getCopyOfBuffer();
        }
    }

    private static class CompatibleStateSinkOperator<T>
    extends AbstractStreamOperator<String>
    implements OneInputStreamOperator<String, String> {
        static final String SINK_STATE_NAME = "compatible_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor("compatible_sink_state", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        ListState<T> sinkState;
        private final SimpleVersionedSerializer<T> serializer;
        private final T initialState;

        public CompatibleStateSinkOperator(SimpleVersionedSerializer<T> serializer, T initialState) {
            this.serializer = serializer;
            this.initialState = initialState;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.sinkState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(SINK_STATE_DESC), this.serializer);
            if (!context.isRestored()) {
                this.sinkState.add(this.initialState);
            }
        }

        public void processElement(StreamRecord<String> element) {
        }
    }

    private static class TestCommitterOperator
    extends AbstractStreamOperator<TestSinkV2.Record<Integer>>
    implements OneInputStreamOperator<TestSinkV2.Record<Integer>, TestSinkV2.Record<Integer>> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor("streaming_committer_raw_states", (TypeSerializer)BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<TestSinkV2.Record<Integer>>> committerState;
        private final List<TestSinkV2.Record<Integer>> buffer = new ArrayList<TestSinkV2.Record<Integer>>();
        private final SimpleVersionedSerializer<TestSinkV2.Record<Integer>> serializer;

        public TestCommitterOperator(SimpleVersionedSerializer<TestSinkV2.Record<Integer>> serializer) {
            this.serializer = serializer;
        }

        public void initializeState(StateInitializationContext context) throws Exception {
            super.initializeState(context);
            this.committerState = new SimpleVersionedListState(context.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), (SimpleVersionedSerializer)new TestingCommittableSerializer(this.serializer));
        }

        public void processElement(StreamRecord<TestSinkV2.Record<Integer>> element) {
            this.buffer.add((TestSinkV2.Record)element.getValue());
        }

        public void snapshotState(StateSnapshotContext context) throws Exception {
            super.snapshotState(context);
            this.committerState.add(this.buffer);
        }
    }

    private static class TimeBasedBufferingSinkWriter
    extends TestSinkV2.DefaultCommittingSinkWriter<Integer>
    implements ProcessingTimeService.ProcessingTimeCallback {
        private final List<TestSinkV2.Record<Integer>> cachedCommittables = new ArrayList<TestSinkV2.Record<Integer>>();
        private ProcessingTimeService processingTimeService;

        private TimeBasedBufferingSinkWriter() {
        }

        @Override
        public void write(Integer element, SinkWriter.Context context) {
            this.cachedCommittables.add(new TestSinkV2.Record<Integer>(element, context.timestamp(), context.currentWatermark()));
        }

        public void onProcessingTime(long time) {
            this.elements.addAll(this.cachedCommittables);
            this.cachedCommittables.clear();
            this.processingTimeService.registerTimer(time + 1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }

        @Override
        public void init(WriterInitContext context) {
            this.processingTimeService = context.getProcessingTimeService();
            this.processingTimeService.registerTimer(1000L, (ProcessingTimeService.ProcessingTimeCallback)this);
        }
    }
}

