package org.apache.flink.state.changelog.restore;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.base.MapSerializer;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.RegisteredPriorityQueueStateBackendMetaInfo;
import org.apache.flink.runtime.state.changelog.StateChange;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshotReadersWriters;
import org.apache.flink.state.changelog.ChangelogState;
import org.apache.flink.state.changelog.StateChangeOperation;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier.class */
class ChangelogBackendLogApplier {
    private static final Logger LOG = LoggerFactory.getLogger(ChangelogBackendLogApplier.class);

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.flink.state.changelog.restore.ChangelogBackendLogApplier$1, reason: invalid class name */
    /* loaded from: input_file:org/apache/flink/state/changelog/restore/ChangelogBackendLogApplier$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType;
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type = new int[StateDescriptor.Type.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.VALUE.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.MAP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.LIST.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.AGGREGATING.ordinal()] = 4;
            } catch (NoSuchFieldError e4) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[StateDescriptor.Type.REDUCING.ordinal()] = 5;
            } catch (NoSuchFieldError e5) {
            }
            $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType = new int[StateMetaInfoSnapshot.BackendStateType.values().length];
            try {
                $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[StateMetaInfoSnapshot.BackendStateType.KEY_VALUE.ordinal()] = 1;
            } catch (NoSuchFieldError e6) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[StateMetaInfoSnapshot.BackendStateType.PRIORITY_QUEUE.ordinal()] = 2;
            } catch (NoSuchFieldError e7) {
            }
        }
    }

    public static void apply(StateChange stateChange, ChangelogRestoreTarget<?> changelogRestoreTarget, ClassLoader classLoader, Map<Short, StateID> map) throws Exception {
        DataInputViewStreamWrapper dataInputViewStreamWrapper = new DataInputViewStreamWrapper(new ByteArrayInputStream(stateChange.getChange()));
        applyOperation(StateChangeOperation.byCode(dataInputViewStreamWrapper.readByte()), stateChange.getKeyGroup(), changelogRestoreTarget, dataInputViewStreamWrapper, classLoader, ChangelogApplierFactoryImpl.INSTANCE, map);
    }

    private static void applyOperation(StateChangeOperation stateChangeOperation, int i, ChangelogRestoreTarget<?> changelogRestoreTarget, DataInputView dataInputView, ClassLoader classLoader, ChangelogApplierFactory changelogApplierFactory, Map<Short, StateID> map) throws Exception {
        LOG.debug("apply {} in key group {}", stateChangeOperation, Integer.valueOf(i));
        if (stateChangeOperation == StateChangeOperation.METADATA) {
            applyMetaDataChange(dataInputView, changelogRestoreTarget, classLoader, map);
        } else if (changelogRestoreTarget.getKeyGroupRange().contains(i)) {
            applyDataChange(dataInputView, changelogApplierFactory, changelogRestoreTarget, stateChangeOperation, map);
        }
    }

    private static void applyMetaDataChange(DataInputView dataInputView, ChangelogRestoreTarget<?> changelogRestoreTarget, ClassLoader classLoader, Map<Short, StateID> map) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo restorePqMetaData;
        StateMetaInfoSnapshot readStateMetaInfoSnapshot = readStateMetaInfoSnapshot(dataInputView, classLoader);
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$state$metainfo$StateMetaInfoSnapshot$BackendStateType[readStateMetaInfoSnapshot.getBackendStateType().ordinal()]) {
            case 1:
                restorePqMetaData = restoreKvMetaData(changelogRestoreTarget, readStateMetaInfoSnapshot, dataInputView);
                break;
            case 2:
                restorePqMetaData = restorePqMetaData(changelogRestoreTarget, readStateMetaInfoSnapshot);
                break;
            default:
                throw new RuntimeException("Unsupported state type: " + readStateMetaInfoSnapshot.getBackendStateType() + ", sate: " + readStateMetaInfoSnapshot.getName());
        }
        map.put(Short.valueOf(dataInputView.readShort()), new StateID(restorePqMetaData.getName(), StateMetaInfoSnapshot.BackendStateType.byCode(dataInputView.readByte())));
    }

    private static StateTtlConfig readTtlConfig(DataInputView dataInputView) throws IOException {
        if (!dataInputView.readBoolean()) {
            return StateTtlConfig.DISABLED;
        }
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(new DataInputViewStream(dataInputView));
            Throwable th = null;
            try {
                StateTtlConfig stateTtlConfig = (StateTtlConfig) objectInputStream.readObject();
                if (objectInputStream != null) {
                    if (0 != 0) {
                        try {
                            objectInputStream.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        objectInputStream.close();
                    }
                }
                return stateTtlConfig;
            } finally {
            }
        } catch (ClassNotFoundException e) {
            throw new IOException(e);
        }
    }

    @Nullable
    private static Object readDefaultValue(DataInputView dataInputView, RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo) throws IOException {
        if (dataInputView.readBoolean()) {
            return registeredKeyValueStateBackendMetaInfo.getStateSerializer().deserialize(dataInputView);
        }
        return null;
    }

    private static RegisteredKeyValueStateBackendMetaInfo restoreKvMetaData(ChangelogRestoreTarget<?> changelogRestoreTarget, StateMetaInfoSnapshot stateMetaInfoSnapshot, DataInputView dataInputView) throws Exception {
        RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo = new RegisteredKeyValueStateBackendMetaInfo(stateMetaInfoSnapshot);
        StateTtlConfig readTtlConfig = readTtlConfig(dataInputView);
        StateDescriptor<S, V> stateDescriptor = toStateDescriptor(registeredKeyValueStateBackendMetaInfo, readDefaultValue(dataInputView, registeredKeyValueStateBackendMetaInfo));
        if (readTtlConfig.isEnabled()) {
            stateDescriptor.enableTimeToLive(readTtlConfig);
        }
        changelogRestoreTarget.createKeyedState(registeredKeyValueStateBackendMetaInfo.getNamespaceSerializer(), stateDescriptor);
        return registeredKeyValueStateBackendMetaInfo;
    }

    private static StateDescriptor toStateDescriptor(RegisteredKeyValueStateBackendMetaInfo registeredKeyValueStateBackendMetaInfo, @Nullable Object obj) {
        switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$state$StateDescriptor$Type[registeredKeyValueStateBackendMetaInfo.getStateType().ordinal()]) {
            case 1:
                return new ValueStateDescriptor(registeredKeyValueStateBackendMetaInfo.getName(), registeredKeyValueStateBackendMetaInfo.getStateSerializer(), obj);
            case 2:
                MapSerializer stateSerializer = registeredKeyValueStateBackendMetaInfo.getStateSerializer();
                return new MapStateDescriptor(registeredKeyValueStateBackendMetaInfo.getName(), stateSerializer.getKeySerializer(), stateSerializer.getValueSerializer());
            case 3:
                return new ListStateDescriptor(registeredKeyValueStateBackendMetaInfo.getName(), registeredKeyValueStateBackendMetaInfo.getStateSerializer().getElementSerializer());
            case 4:
                return new AggregatingStateDescriptor(registeredKeyValueStateBackendMetaInfo.getName(), FunctionDelegationHelper.delegateAggregateFunction(), registeredKeyValueStateBackendMetaInfo.getStateSerializer());
            case 5:
                return new ReducingStateDescriptor(registeredKeyValueStateBackendMetaInfo.getName(), FunctionDelegationHelper.delegateReduceFunction(), registeredKeyValueStateBackendMetaInfo.getStateSerializer());
            default:
                throw new IllegalArgumentException(registeredKeyValueStateBackendMetaInfo.getStateType().toString());
        }
    }

    private static RegisteredPriorityQueueStateBackendMetaInfo restorePqMetaData(ChangelogRestoreTarget<?> changelogRestoreTarget, StateMetaInfoSnapshot stateMetaInfoSnapshot) {
        RegisteredPriorityQueueStateBackendMetaInfo registeredPriorityQueueStateBackendMetaInfo = new RegisteredPriorityQueueStateBackendMetaInfo(stateMetaInfoSnapshot);
        changelogRestoreTarget.createPqState(registeredPriorityQueueStateBackendMetaInfo.getName(), registeredPriorityQueueStateBackendMetaInfo.getElementSerializer());
        return registeredPriorityQueueStateBackendMetaInfo;
    }

    private static StateMetaInfoSnapshot readStateMetaInfoSnapshot(DataInputView dataInputView, ClassLoader classLoader) throws IOException {
        return StateMetaInfoSnapshotReadersWriters.getReader(dataInputView.readInt(), StateMetaInfoSnapshotReadersWriters.StateTypeHint.KEYED_STATE).readStateMetaInfoSnapshot(dataInputView, classLoader);
    }

    private static void applyDataChange(DataInputView dataInputView, ChangelogApplierFactory changelogApplierFactory, ChangelogRestoreTarget<?> changelogRestoreTarget, StateChangeOperation stateChangeOperation, Map<Short, StateID> map) throws Exception {
        StateID stateID = (StateID) Preconditions.checkNotNull(map.get(Short.valueOf(dataInputView.readShort())));
        ChangelogState existingState = changelogRestoreTarget.getExistingState(stateID.stateName, stateID.stateType);
        Preconditions.checkState(existingState != null, String.format("%s state %s not found", stateID.stateType, stateID.stateName));
        existingState.getChangeApplier(changelogApplierFactory).apply(stateChangeOperation, dataInputView);
    }

    private ChangelogBackendLogApplier() {
    }
}
