package org.apache.flink.state.changelog;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nullable;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.changelog.StateChangelogWriter;
import org.apache.flink.runtime.state.heap.InternalKeyContext;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.ThrowingConsumer;

/* loaded from: input_file:org/apache/flink/state/changelog/AbstractStateChangeLogger.class */
abstract class AbstractStateChangeLogger<Key, Value, Ns> implements StateChangeLogger<Value, Ns> {
    static final int COMMON_KEY_GROUP = -1;
    protected final StateChangelogWriter<?> stateChangelogWriter;
    protected final InternalKeyContext<Key> keyContext;
    protected final RegisteredStateMetaInfoBase metaInfo;
    private final StateMetaInfoSnapshot.BackendStateType stateType;
    private boolean metaDataWritten = false;

    public AbstractStateChangeLogger(StateChangelogWriter<?> stateChangelogWriter, InternalKeyContext<Key> internalKeyContext, RegisteredStateMetaInfoBase registeredStateMetaInfoBase) {
        this.stateChangelogWriter = (StateChangelogWriter) Preconditions.checkNotNull(stateChangelogWriter);
        this.keyContext = (InternalKeyContext) Preconditions.checkNotNull(internalKeyContext);
        this.metaInfo = (RegisteredStateMetaInfoBase) Preconditions.checkNotNull(registeredStateMetaInfoBase);
        if (registeredStateMetaInfoBase instanceof RegisteredKeyValueStateBackendMetaInfo) {
            this.stateType = StateMetaInfoSnapshot.BackendStateType.KEY_VALUE;
        } else {
            if (!(registeredStateMetaInfoBase instanceof RegisteredPriorityQueueStateBackendMetaInfo)) {
                throw new IllegalArgumentException("Unsupported state type: " + registeredStateMetaInfoBase);
            }
            this.stateType = StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE;
        }
    }

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueUpdated(Value value, Ns ns) throws IOException {
        if (value == null) {
            valueCleared(ns);
        } else {
            log(StateChangeOperation.SET, dataOutputViewStreamWrapper -> {
                serializeValue(value, dataOutputViewStreamWrapper);
            }, ns);
        }
    }

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueUpdatedInternal(Value value, Ns ns) throws IOException {
        if (value == null) {
            valueCleared(ns);
        } else {
            log(StateChangeOperation.SET_INTERNAL, dataOutputViewStreamWrapper -> {
                serializeValue(value, dataOutputViewStreamWrapper);
            }, ns);
        }
    }

    protected abstract void serializeValue(Value value, DataOutputViewStreamWrapper dataOutputViewStreamWrapper) throws IOException;

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueAdded(Value value, Ns ns) throws IOException {
        log(StateChangeOperation.ADD, dataOutputViewStreamWrapper -> {
            serializeValue(value, dataOutputViewStreamWrapper);
        }, ns);
    }

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueCleared(Ns ns) throws IOException {
        log(StateChangeOperation.CLEAR, ns);
    }

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueElementAdded(ThrowingConsumer<DataOutputViewStreamWrapper, IOException> throwingConsumer, Ns ns) throws IOException {
        log(StateChangeOperation.ADD_ELEMENT, throwingConsumer, ns);
    }

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueElementAddedOrUpdated(ThrowingConsumer<DataOutputViewStreamWrapper, IOException> throwingConsumer, Ns ns) throws IOException {
        log(StateChangeOperation.ADD_OR_UPDATE_ELEMENT, throwingConsumer, ns);
    }

    @Override // org.apache.flink.state.changelog.StateChangeLogger
    public void valueElementRemoved(ThrowingConsumer<DataOutputViewStreamWrapper, IOException> throwingConsumer, Ns ns) throws IOException {
        log(StateChangeOperation.REMOVE_ELEMENT, throwingConsumer, ns);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void log(StateChangeOperation stateChangeOperation, Ns ns) throws IOException {
        logMetaIfNeeded();
        this.stateChangelogWriter.append(this.keyContext.getCurrentKeyGroupIndex(), serialize(stateChangeOperation, ns, null));
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void log(StateChangeOperation stateChangeOperation, @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, IOException> throwingConsumer, Ns ns) throws IOException {
        logMetaIfNeeded();
        this.stateChangelogWriter.append(this.keyContext.getCurrentKeyGroupIndex(), serialize(stateChangeOperation, ns, throwingConsumer));
    }

    private void logMetaIfNeeded() throws IOException {
        if (this.metaDataWritten) {
            return;
        }
        this.stateChangelogWriter.append(COMMON_KEY_GROUP, serializeRaw(dataOutputViewStreamWrapper -> {
            dataOutputViewStreamWrapper.writeByte(StateChangeOperation.METADATA.getCode());
            dataOutputViewStreamWrapper.writeInt(6);
            StateMetaInfoSnapshotReadersWriters.getWriter().writeStateMetaInfoSnapshot(this.metaInfo.snapshot(), dataOutputViewStreamWrapper);
            writeDefaultValueAndTtl(dataOutputViewStreamWrapper);
        }));
        this.metaDataWritten = true;
    }

    protected void writeDefaultValueAndTtl(DataOutputViewStreamWrapper dataOutputViewStreamWrapper) throws IOException {
    }

    private byte[] serialize(StateChangeOperation stateChangeOperation, Ns ns, @Nullable ThrowingConsumer<DataOutputViewStreamWrapper, IOException> throwingConsumer) throws IOException {
        return serializeRaw(dataOutputViewStreamWrapper -> {
            dataOutputViewStreamWrapper.writeByte(stateChangeOperation.getCode());
            dataOutputViewStreamWrapper.writeUTF(this.metaInfo.getName());
            dataOutputViewStreamWrapper.writeByte(this.stateType.getCode());
            serializeScope(ns, dataOutputViewStreamWrapper);
            if (throwingConsumer != null) {
                throwingConsumer.accept(dataOutputViewStreamWrapper);
            }
        });
    }

    protected abstract void serializeScope(Ns ns, DataOutputViewStreamWrapper dataOutputViewStreamWrapper) throws IOException;

    private byte[] serializeRaw(ThrowingConsumer<DataOutputViewStreamWrapper, IOException> throwingConsumer) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        Throwable th = null;
        try {
            DataOutputViewStreamWrapper dataOutputViewStreamWrapper = new DataOutputViewStreamWrapper(byteArrayOutputStream);
            Throwable th2 = null;
            try {
                throwingConsumer.accept(dataOutputViewStreamWrapper);
                byte[] byteArray = byteArrayOutputStream.toByteArray();
                if (dataOutputViewStreamWrapper != null) {
                    if (0 != 0) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                return byteArray;
            } catch (Throwable th4) {
                if (dataOutputViewStreamWrapper != null) {
                    if (0 != 0) {
                        try {
                            dataOutputViewStreamWrapper.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        dataOutputViewStreamWrapper.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (byteArrayOutputStream != null) {
                if (0 != 0) {
                    try {
                        byteArrayOutputStream.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    byteArrayOutputStream.close();
                }
            }
        }
    }
}
