/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.runtime.operators.sink;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.runtime.generated.GeneratedHashFunction;
import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
import org.apache.flink.table.runtime.operators.TableStreamOperator;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetState;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateConfig;
import org.apache.flink.table.runtime.sequencedmultisetstate.SequencedMultiSetStateContext;
import org.apache.flink.table.runtime.typeutils.InternalSerializers;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.typeutils.RowTypeUtils;
import org.apache.flink.types.RowKind;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Internal
public class SinkUpsertMaterializerV2
extends TableStreamOperator<RowData>
implements OneInputStreamOperator<RowData, RowData> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(SinkUpsertMaterializerV2.class);
    private final SequencedMultiSetStateContext stateParameters;
    private transient TimestampedCollector<RowData> collector;
    private transient SequencedMultiSetState<RowData> orderedMultiSetState;
    private final boolean hasUpsertKey;

    public SinkUpsertMaterializerV2(boolean hasUpsertKey, SequencedMultiSetStateContext stateParameters) {
        this.hasUpsertKey = hasUpsertKey;
        this.stateParameters = stateParameters;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.orderedMultiSetState = SequencedMultiSetState.create(this.stateParameters, (RuntimeContext)this.getRuntimeContext(), this.getKeyedStateStore().getBackendTypeIdentifier());
        this.collector = new TimestampedCollector(this.output);
        LOG.info("Opened {} with upsert key: {}", (Object)((Object)((Object)this)).getClass().getSimpleName(), (Object)this.hasUpsertKey);
    }

    public void processElement(StreamRecord<RowData> element) throws Exception {
        RowData row = (RowData)element.getValue();
        long timestamp = element.getTimestamp();
        block0 : switch (row.getRowKind()) {
            case INSERT: 
            case UPDATE_AFTER: {
                if (this.hasUpsertKey) {
                    this.collect(row, this.orderedMultiSetState.add(row, timestamp).wasEmpty());
                    break;
                }
                this.collect(row, this.orderedMultiSetState.append(row, timestamp).wasEmpty());
                break;
            }
            case UPDATE_BEFORE: 
            case DELETE: {
                SequencedMultiSetState.StateChangeInfo<RowData> removalResult = this.orderedMultiSetState.remove(row);
                switch (removalResult.getChangeType()) {
                    case REMOVAL_OTHER: {
                        break block0;
                    }
                    case REMOVAL_NOT_FOUND: {
                        LOG.warn("Not found record to retract");
                        break block0;
                    }
                    case REMOVAL_ALL: {
                        this.collect(removalResult.getPayload().get(), RowKind.DELETE);
                        break block0;
                    }
                    case REMOVAL_LAST_ADDED: {
                        this.collect(removalResult.getPayload().get(), RowKind.UPDATE_AFTER);
                        break block0;
                    }
                }
                throw new IllegalArgumentException("Unexpected removal result type: " + removalResult.getChangeType());
            }
        }
    }

    private void collect(RowData row, boolean notExisted) {
        this.collect(row, this.getRowKind(notExisted));
    }

    private RowKind getRowKind(boolean notExisted) {
        return notExisted ? RowKind.INSERT : RowKind.UPDATE_AFTER;
    }

    private void collect(RowData row, RowKind withKind) {
        RowKind orig = row.getRowKind();
        row.setRowKind(withKind);
        this.collector.collect((Object)row);
        row.setRowKind(orig);
    }

    public static SinkUpsertMaterializerV2 create(RowType physicalRowType, GeneratedRecordEqualiser rowEqualiser, GeneratedRecordEqualiser upsertKeyEqualiser, GeneratedHashFunction rowHashFunction, GeneratedHashFunction upsertKeyHashFunction, int[] inputUpsertKey, SequencedMultiSetStateConfig stateSettings) {
        boolean hasUpsertKey = inputUpsertKey != null && inputUpsertKey.length > 0;
        return new SinkUpsertMaterializerV2(hasUpsertKey, new SequencedMultiSetStateContext((TypeSerializer<RowData>)((TypeSerializer)Preconditions.checkNotNull((Object)((Object)(hasUpsertKey ? InternalSerializers.create(RowTypeUtils.projectRowType((RowType)physicalRowType, (int[])inputUpsertKey)) : InternalSerializers.create(physicalRowType))))), (GeneratedRecordEqualiser)Preconditions.checkNotNull((Object)(hasUpsertKey ? upsertKeyEqualiser : rowEqualiser)), (GeneratedHashFunction)Preconditions.checkNotNull((Object)(hasUpsertKey ? upsertKeyHashFunction : rowHashFunction)), InternalSerializers.create(physicalRowType), row -> hasUpsertKey ? ProjectedRowData.from((int[])inputUpsertKey, (boolean)true).replaceRow(row) : row, stateSettings));
    }
}

