/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.state.forst.sync;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.ListSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.ListDelimitedSerializer;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshotTransformer;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.state.forst.sync.AbstractForStSyncState;
import org.apache.flink.state.forst.sync.ForStSyncKeyedStateBackend;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
import org.forstdb.ColumnFamilyHandle;
import org.forstdb.RocksDBException;

class ForStSyncListState<K, N, V>
extends AbstractForStSyncState<K, N, List<V>>
implements InternalListState<K, N, V> {
    private TypeSerializer<V> elementSerializer;
    private final ListDelimitedSerializer listSerializer;
    private static final byte DELIMITER = 44;

    private ForStSyncListState(ColumnFamilyHandle columnFamily, TypeSerializer<N> namespaceSerializer, TypeSerializer<List<V>> valueSerializer, List<V> defaultValue, ForStSyncKeyedStateBackend<K> backend) {
        super(columnFamily, namespaceSerializer, valueSerializer, defaultValue, backend);
        ListSerializer castedListSerializer = (ListSerializer)valueSerializer;
        this.elementSerializer = castedListSerializer.getElementSerializer();
        this.listSerializer = new ListDelimitedSerializer();
    }

    public TypeSerializer<K> getKeySerializer() {
        return this.backend.getKeySerializer();
    }

    public TypeSerializer<N> getNamespaceSerializer() {
        return this.namespaceSerializer;
    }

    public TypeSerializer<List<V>> getValueSerializer() {
        return this.valueSerializer;
    }

    public Iterable<V> get() throws IOException, RocksDBException {
        return this.getInternal();
    }

    public List<V> getInternal() throws IOException, RocksDBException {
        byte[] key = this.serializeCurrentKeyWithGroupAndNamespace();
        byte[] valueBytes = this.backend.db.get(this.columnFamily, key);
        return this.listSerializer.deserializeList(valueBytes, this.elementSerializer);
    }

    public void add(V value) throws IOException, RocksDBException {
        Preconditions.checkNotNull(value, (String)"You cannot add null to a ListState.");
        this.backend.db.merge(this.columnFamily, this.writeOptions, this.serializeCurrentKeyWithGroupAndNamespace(), this.serializeValue(value, this.elementSerializer));
    }

    public void mergeNamespaces(N target, Collection<N> sources) {
        if (sources == null || sources.isEmpty()) {
            return;
        }
        try {
            this.setCurrentNamespace(target);
            byte[] targetKey = this.serializeCurrentKeyWithGroupAndNamespace();
            for (N source : sources) {
                if (source == null) continue;
                this.setCurrentNamespace(source);
                byte[] sourceKey = this.serializeCurrentKeyWithGroupAndNamespace();
                byte[] valueBytes = this.backend.db.get(this.columnFamily, sourceKey);
                if (valueBytes == null) continue;
                this.backend.db.delete(this.columnFamily, this.writeOptions, sourceKey);
                this.backend.db.merge(this.columnFamily, this.writeOptions, targetKey, valueBytes);
            }
        }
        catch (Exception e) {
            throw new FlinkRuntimeException("Error while merging state in RocksDB", (Throwable)e);
        }
    }

    public void update(List<V> valueToStore) throws IOException, RocksDBException {
        this.updateInternal(valueToStore);
    }

    public void updateInternal(List<V> values) throws IOException, RocksDBException {
        Preconditions.checkNotNull(values, (String)"List of values to add cannot be null.");
        if (!values.isEmpty()) {
            this.backend.db.put(this.columnFamily, this.writeOptions, this.serializeCurrentKeyWithGroupAndNamespace(), this.listSerializer.serializeList(values, this.elementSerializer));
        } else {
            this.clear();
        }
    }

    public void addAll(List<V> values) throws IOException, RocksDBException {
        Preconditions.checkNotNull(values, (String)"List of values to add cannot be null.");
        if (!values.isEmpty()) {
            this.backend.db.merge(this.columnFamily, this.writeOptions, this.serializeCurrentKeyWithGroupAndNamespace(), this.listSerializer.serializeList(values, this.elementSerializer));
        }
    }

    @Override
    public void migrateSerializedValue(DataInputDeserializer serializedOldValueInput, DataOutputSerializer serializedMigratedValueOutput, TypeSerializer<List<V>> priorSerializer, TypeSerializer<List<V>> newSerializer) throws StateMigrationException {
        Preconditions.checkArgument((boolean)(priorSerializer instanceof ListSerializer));
        Preconditions.checkArgument((boolean)(newSerializer instanceof ListSerializer));
        TypeSerializer priorElementSerializer = ((ListSerializer)priorSerializer).getElementSerializer();
        TypeSerializer newElementSerializer = ((ListSerializer)newSerializer).getElementSerializer();
        try {
            while (serializedOldValueInput.available() > 0) {
                Object element = ListDelimitedSerializer.deserializeNextElement((DataInputDeserializer)serializedOldValueInput, (TypeSerializer)priorElementSerializer);
                newElementSerializer.serialize(element, (DataOutputView)serializedMigratedValueOutput);
                if (serializedOldValueInput.available() <= 0) continue;
                serializedMigratedValueOutput.write(44);
            }
        }
        catch (Exception e) {
            throw new StateMigrationException("Error while trying to migrate RocksDB list state.", (Throwable)e);
        }
    }

    @Override
    protected ForStSyncListState<K, N, V> setValueSerializer(TypeSerializer<List<V>> valueSerializer) {
        super.setValueSerializer(valueSerializer);
        this.elementSerializer = ((ListSerializer)valueSerializer).getElementSerializer();
        return this;
    }

    static <E, K, N, SV, S extends State, IS extends S> IS create(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, ForStSyncKeyedStateBackend<K> backend) {
        return (IS)new ForStSyncListState((ColumnFamilyHandle)registerResult.f0, ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer(), ((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer(), (List)stateDesc.getDefaultValue(), backend);
    }

    static <E, K, N, SV, S extends State, IS extends S> IS update(StateDescriptor<S, SV> stateDesc, Tuple2<ColumnFamilyHandle, RegisteredKeyValueStateBackendMetaInfo<N, SV>> registerResult, IS existingState) {
        return (IS)((ForStSyncListState)existingState).setNamespaceSerializer(((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getNamespaceSerializer()).setValueSerializer(((RegisteredKeyValueStateBackendMetaInfo)registerResult.f1).getStateSerializer()).setDefaultValue((List)stateDesc.getDefaultValue());
    }

    static class StateSnapshotTransformerWrapper<T>
    implements StateSnapshotTransformer<byte[]> {
        private final StateSnapshotTransformer<T> elementTransformer;
        private final TypeSerializer<T> elementSerializer;
        private final StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy transformStrategy;
        private final ListDelimitedSerializer listSerializer;
        private final DataInputDeserializer in = new DataInputDeserializer();

        StateSnapshotTransformerWrapper(StateSnapshotTransformer<T> elementTransformer, TypeSerializer<T> elementSerializer) {
            this.elementTransformer = elementTransformer;
            this.elementSerializer = elementSerializer;
            this.listSerializer = new ListDelimitedSerializer();
            this.transformStrategy = elementTransformer instanceof StateSnapshotTransformer.CollectionStateSnapshotTransformer ? ((StateSnapshotTransformer.CollectionStateSnapshotTransformer)elementTransformer).getFilterStrategy() : StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.TRANSFORM_ALL;
        }

        @Nullable
        public byte[] filterOrTransform(@Nullable byte[] value) {
            if (value == null) {
                return null;
            }
            ArrayList<Object> result = new ArrayList<Object>();
            this.in.setBuffer(value);
            int prevPosition = 0;
            try {
                Object next;
                while ((next = ListDelimitedSerializer.deserializeNextElement((DataInputDeserializer)this.in, this.elementSerializer)) != null) {
                    Object transformedElement = this.elementTransformer.filterOrTransform(next);
                    if (transformedElement != null) {
                        if (this.transformStrategy == StateSnapshotTransformer.CollectionStateSnapshotTransformer.TransformStrategy.STOP_ON_FIRST_INCLUDED) {
                            return Arrays.copyOfRange(value, prevPosition, value.length);
                        }
                        result.add(transformedElement);
                    }
                    prevPosition = this.in.getPosition();
                }
                return result.isEmpty() ? null : this.listSerializer.serializeList(result, this.elementSerializer);
            }
            catch (IOException e) {
                throw new FlinkRuntimeException("Failed to serialize transformed list", (Throwable)e);
            }
        }
    }
}

