package org.apache.flink.streaming.runtime.operators.sink;

import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink2.Sink;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.CommittableWithLineage;
import org.apache.flink.streaming.api.connector.sink2.SinkV2Assertions;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.TestHarnessUtil;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

/* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase.class */
abstract class SinkWriterOperatorTestBase {

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$AbstractInspectableSink.class */
    static abstract class AbstractInspectableSink<S extends Sink<Integer>> implements InspectableSink {
        private final S sink;

        /* JADX INFO: Access modifiers changed from: protected */
        public AbstractInspectableSink(S s) {
            this.sink = s;
        }

        @Override // org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperatorTestBase.InspectableSink
        public S getSink() {
            return this.sink;
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$CompatibleStateSinkOperator.class */
    private static class CompatibleStateSinkOperator<T> extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        static final String SINK_STATE_NAME = "compatible_sink_state";
        static final ListStateDescriptor<byte[]> SINK_STATE_DESC = new ListStateDescriptor<>(SINK_STATE_NAME, BytePrimitiveArraySerializer.INSTANCE);
        ListState<T> sinkState;
        private final SimpleVersionedSerializer<T> serializer;
        private final T initialState;

        public CompatibleStateSinkOperator(SimpleVersionedSerializer<T> simpleVersionedSerializer, T t) {
            this.serializer = simpleVersionedSerializer;
            this.initialState = t;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.sinkState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(SINK_STATE_DESC), this.serializer);
            if (stateInitializationContext.isRestored()) {
                return;
            }
            this.sinkState.add(this.initialState);
        }

        public void processElement(StreamRecord<String> streamRecord) {
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$InspectableSink.class */
    interface InspectableSink {
        long getLastCheckpointId();

        List<String> getRecordsOfCurrentCheckpoint();

        List<Watermark> getWatermarks();

        int getRecordCountFromState();

        Sink<Integer> getSink();
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$TestCommitterOperator.class */
    private static class TestCommitterOperator extends AbstractStreamOperator<String> implements OneInputStreamOperator<String, String> {
        private static final ListStateDescriptor<byte[]> STREAMING_COMMITTER_RAW_STATES_DESC = new ListStateDescriptor<>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
        private ListState<List<String>> committerState;
        private final List<String> buffer = new ArrayList();
        private final SimpleVersionedSerializer<String> serializer;

        public TestCommitterOperator(SimpleVersionedSerializer<String> simpleVersionedSerializer) {
            this.serializer = simpleVersionedSerializer;
        }

        public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
            super.initializeState(stateInitializationContext);
            this.committerState = new SimpleVersionedListState(stateInitializationContext.getOperatorStateStore().getListState(STREAMING_COMMITTER_RAW_STATES_DESC), new TestingCommittableSerializer(this.serializer));
        }

        public void processElement(StreamRecord<String> streamRecord) throws Exception {
            this.buffer.add((String) streamRecord.getValue());
        }

        public void snapshotState(StateSnapshotContext stateSnapshotContext) throws Exception {
            super.snapshotState(stateSnapshotContext);
            this.committerState.add(this.buffer);
        }
    }

    /* loaded from: input_file:org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase$TestingCommittableSerializer.class */
    private static class TestingCommittableSerializer extends SinkV1WriterCommittableSerializer<String> {
        private final SimpleVersionedSerializer<String> committableSerializer;

        public TestingCommittableSerializer(SimpleVersionedSerializer<String> simpleVersionedSerializer) {
            super(simpleVersionedSerializer);
            this.committableSerializer = simpleVersionedSerializer;
        }

        public byte[] serialize(List<String> list) throws IOException {
            DataOutputSerializer dataOutputSerializer = new DataOutputSerializer(256);
            dataOutputSerializer.writeInt(-1189141204);
            SimpleVersionedSerialization.writeVersionAndSerializeList(this.committableSerializer, list, dataOutputSerializer);
            return dataOutputSerializer.getCopyOfBuffer();
        }
    }

    @Test
    void testNotEmitCommittablesWithoutCommitter() throws Exception {
        InspectableSink sinkWithoutCommitter = sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithoutCommitter.getSink()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).isEmpty();
        Assertions.assertThat(sinkWithoutCommitter.getRecordsOfCurrentCheckpoint()).containsOnly(new String[]{"(1,1,-9223372036854775808)"});
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).isEmpty();
        Assertions.assertThat(sinkWithoutCommitter.getRecordsOfCurrentCheckpoint()).isEmpty();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testWatermarkPropagatedToSinkWriter() throws Exception {
        InspectableSink sinkWithoutCommitter = sinkWithoutCommitter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithoutCommitter.getSink()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(0L);
        oneInputStreamOperatorTestHarness.processWatermark(1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.getOutput()).containsExactly(new Object[]{new org.apache.flink.streaming.api.watermark.Watermark(0L), new org.apache.flink.streaming.api.watermark.Watermark(1L)});
        Assertions.assertThat(sinkWithoutCommitter.getWatermarks()).containsExactly(new Watermark[]{new Watermark(0L), new Watermark(1L)});
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testTimeBasedBufferingSinkWriter() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithTimeBasedWriter().getSink()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.setProcessingTime(0L);
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.extractOutputValues(), 0, 1L);
        oneInputStreamOperatorTestHarness.getProcessingTimeService().setCurrentTime(2001L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(2L);
        assertBasicOutput((List) oneInputStreamOperatorTestHarness.extractOutputValues().stream().skip(1L).collect(Collectors.toList()), 2, 2L);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testEmitOnFlushWithCommitter() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithCommitter().getSink()));
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).isEmpty();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        assertBasicOutput(oneInputStreamOperatorTestHarness.extractOutputValues(), 2, 1L);
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testEmitOnEndOfInputInBatchMode() throws Exception {
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithCommitter().getSink()));
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).isEmpty();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.endInput();
        assertBasicOutput(oneInputStreamOperatorTestHarness.extractOutputValues(), 1, Long.MAX_VALUE);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testStateRestore(boolean z) throws Exception {
        InspectableSink sinkWithState = sinkWithState(z, null);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithState.getSink()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processWatermark(0L);
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        oneInputStreamOperatorTestHarness.processElement(2, 2L);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(1L, 1L);
        Assertions.assertThat(sinkWithState.getRecordCountFromState()).isEqualTo(2);
        Assertions.assertThat(sinkWithState.getLastCheckpointId()).isEqualTo(z ? 1L : -1L);
        oneInputStreamOperatorTestHarness.close();
        InspectableSink sinkWithState2 = sinkWithState(z, null);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithState2.getSink()));
        oneInputStreamOperatorTestHarness2.initializeState(snapshot);
        oneInputStreamOperatorTestHarness2.open();
        Assertions.assertThat(sinkWithState2.getRecordCountFromState()).isEqualTo(z ? 2 : 0);
        oneInputStreamOperatorTestHarness2.close();
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    public void testLoadPreviousSinkState(boolean z) throws Exception {
        List asList = Arrays.asList("bit", "mention", "thick", "stick", "stir", "easy", "sleep", "forth", "cost", "prompt");
        InspectableSink sinkWithState = sinkWithState(z, "compatible_sink_state");
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness(new CompatibleStateSinkOperator(TestSinkV2.WRITER_SERIALIZER, 5), StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithState.getSink()));
        oneInputStreamOperatorTestHarness.initializeState(buildSubtaskState);
        Assertions.assertThat(sinkWithState.getRecordCountFromState()).isEqualTo(z ? 5 : 0);
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        OperatorSubtaskState snapshot = oneInputStreamOperatorTestHarness.snapshot(1L, 1L);
        oneInputStreamOperatorTestHarness.close();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness2 = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithState(z, "compatible_sink_state").getSink()));
        oneInputStreamOperatorTestHarness2.initializeState(snapshot);
        Assertions.assertThat(sinkWithState.getRecordCountFromState()).isEqualTo(z ? 5 : 0);
        oneInputStreamOperatorTestHarness2.close();
    }

    @Test
    void testRestoreCommitterState() throws Exception {
        List asList = Arrays.asList("state1", "state2");
        InspectableSink sinkWithCommitter = sinkWithCommitter();
        OperatorSubtaskState buildSubtaskState = TestHarnessUtil.buildSubtaskState(new OneInputStreamOperatorTestHarness(new TestCommitterOperator(TestSinkV2.COMMITTABLE_SERIALIZER), StringSerializer.INSTANCE), asList);
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithCommitter.getSink()));
        oneInputStreamOperatorTestHarness.initializeState(buildSubtaskState);
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(2L);
        ListAssert hasSize = Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).hasSize(4);
        hasSize.element(0, Assertions.as(SinkV2Assertions.committableSummary())).hasCheckpointId(1L).hasOverallCommittables(asList.size());
        hasSize.element(1, Assertions.as(SinkV2Assertions.committableWithLineage())).hasCommittable((String) asList.get(0)).hasCheckpointId(1L).hasSubtaskId(0);
        hasSize.element(2, Assertions.as(SinkV2Assertions.committableWithLineage())).hasCommittable((String) asList.get(1)).hasCheckpointId(1L).hasSubtaskId(0);
        hasSize.element(3, Assertions.as(SinkV2Assertions.committableSummary())).hasCheckpointId(2L).hasOverallCommittables(0);
    }

    @ValueSource(booleans = {true, false})
    @ParameterizedTest
    void testHandleEndInputInStreamingMode(boolean z) throws Exception {
        InspectableSink sinkWithCommitter = sinkWithCommitter();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sinkWithCommitter.getSink()));
        oneInputStreamOperatorTestHarness.open();
        oneInputStreamOperatorTestHarness.processElement(1, 1L);
        Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).isEmpty();
        Assertions.assertThat(sinkWithCommitter.getRecordsOfCurrentCheckpoint()).containsOnly(new String[]{"(1,1,-9223372036854775808)"});
        oneInputStreamOperatorTestHarness.endInput();
        if (z) {
            oneInputStreamOperatorTestHarness.prepareSnapshotPreBarrier(1L);
        }
        List singletonList = Collections.singletonList("(1,1,-9223372036854775808)");
        ListAssert hasSize = Assertions.assertThat(oneInputStreamOperatorTestHarness.extractOutputValues()).hasSize(singletonList.size() + 1);
        hasSize.element(0, Assertions.as(SinkV2Assertions.committableSummary())).hasOverallCommittables(singletonList.size());
        hasSize.filteredOn(committableMessage -> {
            return committableMessage instanceof CommittableWithLineage;
        }).map(committableMessage2 -> {
            return (String) ((CommittableWithLineage) committableMessage2).getCommittable();
        }).containsExactlyInAnyOrderElementsOf(singletonList);
        Assertions.assertThat(sinkWithCommitter.getRecordsOfCurrentCheckpoint()).isEmpty();
        oneInputStreamOperatorTestHarness.close();
    }

    @Test
    void testInitContext() throws Exception {
        AtomicReference atomicReference = new AtomicReference();
        Sink sink = writerInitContext -> {
            atomicReference.set(writerInitContext);
            return null;
        };
        StringSerializer stringSerializer = StringSerializer.INSTANCE;
        JobID jobID = new JobID();
        OneInputStreamOperatorTestHarness oneInputStreamOperatorTestHarness = new OneInputStreamOperatorTestHarness(new SinkWriterOperatorFactory(sink), stringSerializer, MockEnvironment.builder().setSubtaskIndex(1).setParallelism(10).setMaxParallelism(10).setJobID(jobID).setExecutionConfig(new ExecutionConfig().enableObjectReuse()).build());
        oneInputStreamOperatorTestHarness.open();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getUserCodeClassLoader()).isNotNull();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getMailboxExecutor()).isNotNull();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getProcessingTimeService()).isNotNull();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getTaskInfo().getIndexOfThisSubtask()).isEqualTo(1);
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(10);
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getTaskInfo().getAttemptNumber()).isZero();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).metricGroup()).isNotNull();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getRestoredCheckpointId()).isNotPresent();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).isObjectReuseEnabled()).isTrue();
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).createInputSerializer()).isEqualTo(stringSerializer);
        Assertions.assertThat(((WriterInitContext) atomicReference.get()).getJobInfo().getJobId()).isEqualTo(jobID);
        oneInputStreamOperatorTestHarness.close();
    }

    private static void assertContextsEqual(WriterInitContext writerInitContext, WriterInitContext writerInitContext2) {
        Assertions.assertThat(writerInitContext.getUserCodeClassLoader().asClassLoader()).isEqualTo(writerInitContext2.getUserCodeClassLoader().asClassLoader());
        Assertions.assertThat(writerInitContext.getMailboxExecutor()).isEqualTo(writerInitContext2.getMailboxExecutor());
        Assertions.assertThat(writerInitContext.getProcessingTimeService()).isEqualTo(writerInitContext2.getProcessingTimeService());
        Assertions.assertThat(writerInitContext.getTaskInfo().getIndexOfThisSubtask()).isEqualTo(writerInitContext2.getTaskInfo().getIndexOfThisSubtask());
        Assertions.assertThat(writerInitContext.getTaskInfo().getNumberOfParallelSubtasks()).isEqualTo(writerInitContext2.getTaskInfo().getNumberOfParallelSubtasks());
        Assertions.assertThat(writerInitContext.getTaskInfo().getAttemptNumber()).isEqualTo(writerInitContext2.getTaskInfo().getAttemptNumber());
        Assertions.assertThat(writerInitContext.metricGroup()).isEqualTo(writerInitContext2.metricGroup());
        Assertions.assertThat(writerInitContext.getRestoredCheckpointId()).isEqualTo(writerInitContext2.getRestoredCheckpointId());
        Assertions.assertThat(writerInitContext.isObjectReuseEnabled()).isEqualTo(writerInitContext2.isObjectReuseEnabled());
        Assertions.assertThat(writerInitContext.createInputSerializer()).isEqualTo(writerInitContext2.createInputSerializer());
        Assertions.assertThat(writerInitContext.getJobInfo().getJobId()).isEqualTo(writerInitContext2.getJobInfo().getJobId());
        Assertions.assertThat(writerInitContext.metadataConsumer()).isEqualTo(writerInitContext2.metadataConsumer());
    }

    private static void assertBasicOutput(List<CommittableMessage<Integer>> list, int i, long j) {
        ListAssert hasSize = Assertions.assertThat(list).hasSize(i + 1);
        hasSize.element(0, Assertions.as(SinkV2Assertions.committableSummary())).hasOverallCommittables(i);
        hasSize.filteredOn(committableMessage -> {
            return committableMessage instanceof CommittableWithLineage;
        }).allSatisfy(committableMessage2 -> {
            SinkV2Assertions.assertThat((CommittableWithLineage) committableMessage2).hasCheckpointId(j).hasSubtaskId(0);
        });
    }

    abstract InspectableSink sinkWithoutCommitter();

    abstract InspectableSink sinkWithTimeBasedWriter();

    abstract InspectableSink sinkWithState(boolean z, String str);

    abstract InspectableSink sinkWithCommitter();

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 1537641779:
                if (implMethodName.equals("lambda$testInitContext$b47bc7cd$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/flink/api/connector/sink2/Sink") && serializedLambda.getFunctionalInterfaceMethodName().equals("createWriter") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Lorg/apache/flink/api/connector/sink2/WriterInitContext;)Lorg/apache/flink/api/connector/sink2/SinkWriter;") && serializedLambda.getImplClass().equals("org/apache/flink/streaming/runtime/operators/sink/SinkWriterOperatorTestBase") && serializedLambda.getImplMethodSignature().equals("(Ljava/util/concurrent/atomic/AtomicReference;Lorg/apache/flink/api/connector/sink2/WriterInitContext;)Lorg/apache/flink/api/connector/sink2/SinkWriter;")) {
                    AtomicReference atomicReference = (AtomicReference) serializedLambda.getCapturedArg(0);
                    return writerInitContext -> {
                        atomicReference.set(writerInitContext);
                        return null;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
