package org.apache.flink.streaming.connectors.kafka.table;

import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Function;
import org.apache.flink.api.common.operators.ProcessingTimeService;
import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.StatefulSink;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.connectors.kafka.shuffle.FlinkKafkaShuffleProducer;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter.class */
class ReducingUpsertWriter<WriterState> implements StatefulSink.StatefulSinkWriter<RowData, WriterState> {
    private final StatefulSink.StatefulSinkWriter<RowData, WriterState> wrappedWriter;
    private final int batchMaxRowNums;
    private final Function<RowData, RowData> valueCopyFunction;
    private final Function<RowData, RowData> keyExtractor;
    private final ProcessingTimeService timeService;
    private final long batchIntervalMs;
    private final WrappedContext wrappedContext = new WrappedContext(null);
    private final Map<RowData, Tuple2<RowData, Long>> reduceBuffer = new HashMap();
    private boolean closed = false;
    private long lastFlush = System.currentTimeMillis();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.streaming.connectors.kafka.table.ReducingUpsertWriter$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$types$RowKind = new int[RowKind.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.INSERT.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_AFTER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.UPDATE_BEFORE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$types$RowKind[RowKind.DELETE.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/streaming/connectors/kafka/table/ReducingUpsertWriter$WrappedContext.class */
    public static class WrappedContext implements SinkWriter.Context {
        private long timestamp;
        private SinkWriter.Context context;

        private WrappedContext() {
        }

        public long currentWatermark() {
            Preconditions.checkNotNull(this.context, "context must be set before retrieving it.");
            return this.context.currentWatermark();
        }

        public Long timestamp() {
            Preconditions.checkNotNull(Long.valueOf(this.timestamp), "timestamp must to be set before retrieving it.");
            return Long.valueOf(this.timestamp);
        }

        public void setTimestamp(long j) {
            this.timestamp = j;
        }

        public void setContext(SinkWriter.Context context) {
            this.context = context;
        }

        /* synthetic */ WrappedContext(AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ReducingUpsertWriter(StatefulSink.StatefulSinkWriter<RowData, WriterState> statefulSinkWriter, DataType dataType, int[] iArr, SinkBufferFlushMode sinkBufferFlushMode, ProcessingTimeService processingTimeService, Function<RowData, RowData> function) {
        Preconditions.checkArgument(sinkBufferFlushMode != null && sinkBufferFlushMode.isEnabled());
        this.wrappedWriter = (StatefulSink.StatefulSinkWriter) Preconditions.checkNotNull(statefulSinkWriter);
        this.timeService = (ProcessingTimeService) Preconditions.checkNotNull(processingTimeService);
        this.batchMaxRowNums = sinkBufferFlushMode.getBatchSize();
        this.batchIntervalMs = sinkBufferFlushMode.getBatchIntervalMs();
        registerFlush();
        List children = dataType.getLogicalType().getChildren();
        RowData.FieldGetter[] fieldGetterArr = (RowData.FieldGetter[]) Arrays.stream(iArr).mapToObj(i -> {
            return RowData.createFieldGetter((LogicalType) children.get(i), i);
        }).toArray(i2 -> {
            return new RowData.FieldGetter[i2];
        });
        this.keyExtractor = rowData -> {
            return DynamicKafkaRecordSerializationSchema.createProjectedRow(rowData, RowKind.INSERT, fieldGetterArr);
        };
        this.valueCopyFunction = function;
    }

    public void write(RowData rowData, SinkWriter.Context context) throws IOException, InterruptedException {
        this.wrappedContext.setContext(context);
        addToBuffer(rowData, context.timestamp());
    }

    public void flush(boolean z) throws IOException, InterruptedException {
        flush();
    }

    public List<WriterState> snapshotState(long j) throws IOException {
        return this.wrappedWriter.snapshotState(j);
    }

    public void close() throws Exception {
        if (this.closed) {
            return;
        }
        this.closed = true;
        this.wrappedWriter.close();
    }

    private void addToBuffer(RowData rowData, Long l) throws IOException, InterruptedException {
        this.reduceBuffer.put(this.keyExtractor.apply(rowData), new Tuple2<>(changeFlag(this.valueCopyFunction.apply(rowData)), l));
        if (this.reduceBuffer.size() >= this.batchMaxRowNums) {
            flush();
        }
    }

    private void registerFlush() {
        if (this.closed) {
            return;
        }
        this.timeService.registerTimer(this.lastFlush + this.batchIntervalMs, j -> {
            if (j >= this.lastFlush + this.batchIntervalMs) {
                flush();
            }
            registerFlush();
        });
    }

    private RowData changeFlag(RowData rowData) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$types$RowKind[rowData.getRowKind().ordinal()]) {
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_REC_WITHOUT_TIMESTAMP /* 1 */:
            case FlinkKafkaShuffleProducer.KafkaSerializer.TAG_WATERMARK /* 2 */:
                rowData.setRowKind(RowKind.UPDATE_AFTER);
                break;
            case 3:
            case 4:
                rowData.setRowKind(RowKind.DELETE);
                break;
        }
        return rowData;
    }

    private void flush() throws IOException, InterruptedException {
        for (Tuple2<RowData, Long> tuple2 : this.reduceBuffer.values()) {
            this.wrappedContext.setTimestamp(((Long) tuple2.f1).longValue());
            this.wrappedWriter.write(tuple2.f0, this.wrappedContext);
        }
        this.lastFlush = System.currentTimeMillis();
        this.reduceBuffer.clear();
    }
}
