package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.LongSupplier;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableSummary;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ThrowingConsumer;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.class */
abstract class SinkWriterOperatorTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$DummySinkOperator.class */
    private static class DummySinkOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        static final String DUMMY_SINK_STATE_NAME = "dummy_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor<>(DUMMY_SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE);
        ListState<String> sinkState;
        private final SimpleVersionedSerializer<String> serializer;

        public DummySinkOperator(SimpleVersionedSerializer<String> simpleVersionedSerializer) {
            this.serializer = simpleVersionedSerializer;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.sinkState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(SINK_STATE_DESC), this.serializer);
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.sinkState.add(streamRecord.getValue());
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$SinkAndSuppliers.class */
    static class SinkAndSuppliers {
        Sink<Integer> sink;
        Supplier<List<String>> elementSupplier;
        Supplier<List<Watermark>> watermarkSupplier;
        LongSupplier lastCheckpointSupplier;
        Supplier<SimpleVersionedSerializer<String>> serializerSupplier;

        public SinkAndSuppliers(Sink<Integer> sink, Supplier<List<String>> supplier, Supplier<List<Watermark>> supplier2, LongSupplier longSupplier, Supplier<SimpleVersionedSerializer<String>> supplier3) {
            this.sink = sink;
            this.elementSupplier = supplier;
            this.watermarkSupplier = supplier2;
            this.lastCheckpointSupplier = longSupplier;
            this.serializerSupplier = supplier3;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$TestCommitterOperator.class */
    private static class TestCommitterOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<String>> committerState;
        private final List<String> buffer = new ArrayList();
        private final SimpleVersionedSerializer<String> serializer;

        public TestCommitterOperator(SimpleVersionedSerializer<String> simpleVersionedSerializer) {
            this.serializer = simpleVersionedSerializer;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.committerState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new TestingCommittableSerializer(this.serializer));
        }

        /* JADX WARN: Multi-variable type inference failed */
        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.buffer.add(streamRecord.getValue());
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            this.committerState.add(this.buffer);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$TestingCommittableSerializer.class */
    private static class TestingCommittableSerializer extends SinkV1WriterCommittableSerializer<String> {
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<String> simpleVersionedSerializer) {
            super(simpleVersionedSerializer);
            this.committableSerializer = simpleVersionedSerializer;
        }

        public byte[] serialize(List<String> list) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
            dataOutputSerializer.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, list, dataOutputSerializer);
            return dataOutputSerializer.getCopyOfBuffer();
        }
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        SinkAndSuppliers sinkWithoutCommitter = sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithoutCommitter.sink));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkWithoutCommitter.elementSupplier.get()).containsOnly(new String[]{"(1,1,-9223372036854775808)"});
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkWithoutCommitter.elementSupplier.get()).isEmpty();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        SinkAndSuppliers sinkWithoutCommitter = sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithoutCommitter.sink));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(0L);
        oneInputStreamOperatorTestHarness.processWatermark(1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).containsExactly(new Object[]{new org.apache.flink.streaming.api.watermark.Watermark(0L), new org.apache.flink.streaming.api.watermark.Watermark(1L)});
        Assertions.assertThat(sinkWithoutCommitter.watermarkSupplier.get()).containsExactly(new Watermark[]{new Watermark(0L), new Watermark(1L)});
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    public void testTimeBasedBufferingSinkWriter() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithTimeBasedWriter().sink));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(0L);
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 0, 1L);
        oneInputStreamOperatorTestHarness.getOutput().poll();
        oneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(2001L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(2L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 2, 2L);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithCommitter().sink));
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 2, 1L);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithCommitter().sink));
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.endInput();
        assertBasicOutput(oneInputStreamOperatorTestHarness.getOutput(), 1, Long.MAX_VALUE);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testStateRestore(boolean z) throws Exception {
        SinkAndSuppliers sinkWithSnapshottingWriter = sinkWithSnapshottingWriter(z, null);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter = createTestHarnessWithBufferingSinkWriter(sinkWithSnapshottingWriter.sink);
        createTestHarnessWithBufferingSinkWriter.open();
        createTestHarnessWithBufferingSinkWriter.processWatermark(0L);
        createTestHarnessWithBufferingSinkWriter.processElement(1, 1L);
        createTestHarnessWithBufferingSinkWriter.processElement(2, 2L);
        createTestHarnessWithBufferingSinkWriter.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = createTestHarnessWithBufferingSinkWriter.snapshot(1L, 1L);
        Assertions.assertThat(createTestHarnessWithBufferingSinkWriter.getOutput()).hasSize(2).contains(new Object[]{new org.apache.flink.streaming.api.watermark.Watermark(0L)});
        Assertions.assertThat(sinkWithSnapshottingWriter.lastCheckpointSupplier.getAsLong()).isEqualTo(z ? 1L : -1L);
        createTestHarnessWithBufferingSinkWriter.close();
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter2 = createTestHarnessWithBufferingSinkWriter(sinkWithSnapshottingWriter(z, null).sink);
        createTestHarnessWithBufferingSinkWriter2.initializeState(snapshot);
        createTestHarnessWithBufferingSinkWriter2.open();
        createTestHarnessWithBufferingSinkWriter2.endInput();
        createTestHarnessWithBufferingSinkWriter2.prepareSnapshotPreBarrier(2L);
        createTestHarnessWithBufferingSinkWriter2.notifyOfCompletedCheckpoint(2L);
        if (z) {
            assertBasicOutput(createTestHarnessWithBufferingSinkWriter2.getOutput(), 2, Long.MAX_VALUE);
        } else {
            Assertions.assertThat(SinkTestUtil.fromOutput(createTestHarnessWithBufferingSinkWriter2.getOutput()).get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(new ThrowingConsumer[]{obj -> {
                SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasOverallCommittables(0).hasPendingCommittables(0).hasFailedCommittables(0);
            }});
        }
        createTestHarnessWithBufferingSinkWriter2.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLoadPreviousSinkState(boolean z) throws Exception {
        List asList = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        SinkAndSuppliers sinkWithSnapshottingWriter = sinkWithSnapshottingWriter(z, "dummy_sink_state");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new DummySinkOperator(sinkWithSnapshottingWriter.serializerSupplier.get()), (TypeSerializer) StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter = createTestHarnessWithBufferingSinkWriter(sinkWithSnapshottingWriter.sink);
        ArrayList arrayList = z ? new ArrayList(asList) : new ArrayList();
        arrayList.add(Tuple3.of(1, 1, Long.MIN_VALUE).toString());
        createTestHarnessWithBufferingSinkWriter.initializeState(buildSubtaskState);
        createTestHarnessWithBufferingSinkWriter.open();
        createTestHarnessWithBufferingSinkWriter.processElement(1, 1L);
        createTestHarnessWithBufferingSinkWriter.endInput();
        createTestHarnessWithBufferingSinkWriter.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = createTestHarnessWithBufferingSinkWriter.snapshot(1L, 1L);
        createTestHarnessWithBufferingSinkWriter.close();
        assertEmitted(arrayList, createTestHarnessWithBufferingSinkWriter.getOutput());
        OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter2 = createTestHarnessWithBufferingSinkWriter(sinkWithSnapshottingWriter(z, "dummy_sink_state").sink);
        List asList2 = Arrays.asList(Tuple3.of(2, 2, Long.MIN_VALUE).toString(), Tuple3.of(3, 3, Long.MIN_VALUE).toString());
        createTestHarnessWithBufferingSinkWriter2.initializeState(snapshot);
        createTestHarnessWithBufferingSinkWriter2.open();
        createTestHarnessWithBufferingSinkWriter2.processElement(2, 2L);
        createTestHarnessWithBufferingSinkWriter2.processElement(3, 3L);
        createTestHarnessWithBufferingSinkWriter2.endInput();
        createTestHarnessWithBufferingSinkWriter2.prepareSnapshotPreBarrier(2L);
        assertEmitted(asList2, createTestHarnessWithBufferingSinkWriter2.getOutput());
        createTestHarnessWithBufferingSinkWriter2.close();
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        List asList = Arrays.asList("state1", "state2");
        SinkAndSuppliers sinkWithCommitter = sinkWithCommitter();
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness((OneInputStreamOperator) new TestCommitterOperator(sinkWithCommitter.serializerSupplier.get()), (TypeSerializer) StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithCommitter.sink));
        oneInputStreamOperatorTestHarness.initializeState(buildSubtaskState);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(2L);
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(oneInputStreamOperatorTestHarness.getOutput());
        Assertions.assertThat(fromOutput).hasSize(4);
        Assertions.assertThat(fromOutput.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(new ThrowingConsumer[]{obj -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasPendingCommittables(asList.size()).hasCheckpointId(1L).hasOverallCommittables(asList.size()).hasFailedCommittables(0);
        }});
        assertRestoredCommitterCommittable(fromOutput.get(1).asRecord().getValue(), (String) asList.get(0));
        assertRestoredCommitterCommittable(fromOutput.get(2).asRecord().getValue(), (String) asList.get(1));
        Assertions.assertThat(fromOutput.get(3).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(new ThrowingConsumer[]{obj2 -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj2).hasPendingCommittables(0).hasCheckpointId(2L).hasOverallCommittables(0).hasFailedCommittables(0);
        }});
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testHandleEndInputInStreamingMode(boolean z) throws Exception {
        SinkAndSuppliers sinkWithCommitter = sinkWithCommitter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sinkWithCommitter.sink));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).isEmpty();
        Assertions.assertThat(sinkWithCommitter.elementSupplier.get()).containsOnly(new String[]{"(1,1,-9223372036854775808)"});
        oneInputStreamOperatorTestHarness.endInput();
        if (z) {
            oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        }
        assertEmitted(Collections.singletonList("(1,1,-9223372036854775808)"), oneInputStreamOperatorTestHarness.getOutput());
        Assertions.assertThat(sinkWithCommitter.elementSupplier.get()).isEmpty();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testInitContext() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Sink sink = initContext -> {
            atomicReference.set(initContext);
            return null;
        };
        StringSerializer stringSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sink), (TypeSerializer) stringSerializer, MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build());
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getUserCodeClassLoader()).isNotNull();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getMailboxExecutor()).isNotNull();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getProcessingTimeService()).isNotNull();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getTaskInfo().getIndexOfThisSubtask()).isEqualTo(1);
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(10);
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getTaskInfo().getAttemptNumber()).isEqualTo(0);
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).metricGroup()).isNotNull();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getRestoredCheckpointId()).isNotPresent();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).isObjectReuseEnabled()).isTrue();
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).createInputSerializer()).isEqualTo(stringSerializer);
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).getJobInfo().getJobId()).isEqualTo(jobID);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testInitContextWrapper() throws Exception {
        final AtomicReference atomicReference = new AtomicReference();
        final AtomicReference atomicReference2 = new AtomicReference();
        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        final Consumer consumer = atomicBoolean2 -> {
            atomicBoolean2.set(true);
        };
        Sink<String> sink = new Sink<String>() { // from class: org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase.1
            public SinkWriter<String> createWriter(WriterInitContext writerInitContext) throws IOException {
                Consumer consumer2 = consumer;
                WriterInitContext writerInitContext2 = (WriterInitContext) Proxy.newProxyInstance(WriterInitContext.class.getClassLoader(), new Class[]{WriterInitContext.class}, (obj, method, objArr) -> {
                    return method.getName().equals("metadataConsumer") ? Optional.of(consumer2) : method.invoke(writerInitContext, objArr);
                });
                atomicReference2.set(writerInitContext2);
                return super.createWriter(writerInitContext2);
            }

            public SinkWriter<String> createWriter(Sink.InitContext initContext) {
                atomicReference.set(initContext);
                return null;
            }
        };
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sink), (TypeSerializer) StringSerializer.INSTANCE, MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(new JobID()).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build());
        oneInputStreamOperatorTestHarness.open();
        assertContextsEqual((Sink.InitContext) atomicReference.get(), (WriterInitContext) atomicReference2.get());
        Assertions.assertThat(((Sink.InitContext) atomicReference.get()).metadataConsumer()).isPresent().hasValueSatisfying(consumer2 -> {
            consumer2.accept(atomicBoolean);
            Assertions.assertThat(atomicBoolean).isTrue();
        });
        oneInputStreamOperatorTestHarness.close();
    }

    private static void assertContextsEqual(Sink.InitContext initContext, WriterInitContext writerInitContext) {
        Assertions.assertThat(initContext.getUserCodeClassLoader().asClassLoader()).isEqualTo(writerInitContext.getUserCodeClassLoader().asClassLoader());
        Assertions.assertThat(initContext.getMailboxExecutor()).isEqualTo(writerInitContext.getMailboxExecutor());
        Assertions.assertThat(initContext.getProcessingTimeService()).isEqualTo(writerInitContext.getProcessingTimeService());
        Assertions.assertThat(initContext.getTaskInfo().getIndexOfThisSubtask()).isEqualTo(writerInitContext.getTaskInfo().getIndexOfThisSubtask());
        Assertions.assertThat(initContext.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(writerInitContext.getTaskInfo().getNumberOfParallelSubtasks());
        Assertions.assertThat(initContext.getTaskInfo().getAttemptNumber()).isEqualTo(writerInitContext.getTaskInfo().getAttemptNumber());
        Assertions.assertThat(initContext.metricGroup()).isEqualTo(writerInitContext.metricGroup());
        Assertions.assertThat(initContext.getRestoredCheckpointId()).isEqualTo(writerInitContext.getRestoredCheckpointId());
        Assertions.assertThat(initContext.isObjectReuseEnabled()).isEqualTo(writerInitContext.isObjectReuseEnabled());
        Assertions.assertThat(initContext.createInputSerializer()).isEqualTo(writerInitContext.createInputSerializer());
        Assertions.assertThat(initContext.getJobInfo().getJobId()).isEqualTo(writerInitContext.getJobInfo().getJobId());
        Assertions.assertThat(initContext.metadataConsumer()).isEqualTo(writerInitContext.metadataConsumer());
    }

    private static void assertRestoredCommitterCommittable(Object obj, String str) {
        Assertions.assertThat(obj).isInstanceOf(CommittableWithLineage.class).satisfies(new ThrowingConsumer[]{obj2 -> {
            SinkV2Assertions.assertThat((CommittableWithLineage<?>) obj2).hasCommittable(str).hasCheckpointId(1L).hasSubtaskId(0);
        }});
    }

    private static void assertEmitted(List<String> list, Queue<Object> queue) {
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(queue);
        Assertions.assertThat(fromOutput).hasSize(list.size() + 1);
        Assertions.assertThat(fromOutput.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(new ThrowingConsumer[]{obj -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasPendingCommittables(list.size()).hasOverallCommittables(list.size()).hasFailedCommittables(0);
        }});
        ArrayList arrayList = new ArrayList();
        for (int i = 1; i <= list.size(); i++) {
            Object value = fromOutput.get(i).asRecord().getValue();
            Assertions.assertThat(value).isInstanceOf(CommittableWithLineage.class);
            arrayList.add(((CommittableWithLineage) value).getCommittable());
        }
        Assertions.assertThat(arrayList).containsExactlyInAnyOrderElementsOf(list);
    }

    private static OneInputStreamOperatorTestHarness<Integer, CommittableMessage<Integer>> createTestHarnessWithBufferingSinkWriter(Sink sink) throws Exception {
        return new OneInputStreamOperatorTestHarness<>((OneInputStreamOperatorFactory) new SinkWriterOperatorFactory(sink));
    }

    private static void assertBasicOutput(Collection<Object> collection, int i, @Nullable Long l) {
        List<StreamElement> fromOutput = SinkTestUtil.fromOutput(collection);
        Assertions.assertThat(fromOutput).hasSize(i + 1);
        Assertions.assertThat(fromOutput.get(0).asRecord().getValue()).isInstanceOf(CommittableSummary.class).satisfies(new ThrowingConsumer[]{obj -> {
            SinkV2Assertions.assertThat((CommittableSummary<?>) obj).hasOverallCommittables(i).hasPendingCommittables(i).hasFailedCommittables(0);
        }});
        for (int i2 = 1; i2 <= i; i2++) {
            Assertions.assertThat(fromOutput.get(i2).asRecord().getValue()).isInstanceOf(CommittableWithLineage.class).satisfies(new ThrowingConsumer[]{obj2 -> {
                SinkV2Assertions.assertThat((CommittableWithLineage<?>) obj2).hasCheckpointId(l).hasSubtaskId(0);
            }});
        }
    }

    abstract SinkAndSuppliers sinkWithoutCommitter();

    abstract SinkAndSuppliers sinkWithTimeBasedWriter();

    abstract SinkAndSuppliers sinkWithSnapshottingWriter(boolean z, String str);

    abstract SinkAndSuppliers sinkWithCommitter();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1028864142:
                if (implMethodName.equals("lambda$testInitContext$5266c777$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/connector/sink2/Sink") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWriter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/connector/sink2/Sink$InitContext;)Lorg/apache/flink/api/connector/sink2/SinkWriter;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicReference;Lorg/apache/flink/api/connector/sink2/Sink$InitContext;)Lorg/apache/flink/api/connector/sink2/SinkWriter;")) {
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(0);
                    return initContext -> {
                        atomicReference.set(initContext);
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
