package org.apache.flink.state.forst;

import java.io.IOException;
import java.util.function.Supplier;
import org.apache.flink.api.common.state.v2.AggregatingStateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.runtime.asyncprocessing.StateRequest;
import org.apache.flink.runtime.asyncprocessing.StateRequestHandler;
import org.apache.flink.runtime.asyncprocessing.StateRequestType;
import org.apache.flink.runtime.state.SerializedCompositeKeyBuilder;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.v2.AbstractAggregatingState;
import org.apache.flink.util.Preconditions;
import org.forstdb.ColumnFamilyHandle;

/* loaded from: input_file:org/apache/flink/state/forst/ForStAggregatingState.class */
public class ForStAggregatingState<K, N, IN, ACC, OUT> extends AbstractAggregatingState<K, N, IN, ACC, OUT> implements ForStInnerTable<K, N, ACC> {
    private final ColumnFamilyHandle columnFamilyHandle;
    private final ThreadLocal<SerializedCompositeKeyBuilder<K>> serializedKeyBuilder;
    private final ThreadLocal<DataOutputSerializer> valueSerializerView;
    private final ThreadLocal<DataInputDeserializer> valueDeserializerView;
    private final ThreadLocal<TypeSerializer<N>> namespaceSerializer;
    private final N defaultNamespace;
    private final boolean enableKeyReuse;

    public ForStAggregatingState(AggregatingStateDescriptor<IN, ACC, OUT> aggregatingStateDescriptor, StateRequestHandler stateRequestHandler, ColumnFamilyHandle columnFamilyHandle, Supplier<SerializedCompositeKeyBuilder<K>> supplier, N n, Supplier<TypeSerializer<N>> supplier2, Supplier<DataOutputSerializer> supplier3, Supplier<DataInputDeserializer> supplier4) {
        super(stateRequestHandler, aggregatingStateDescriptor);
        this.columnFamilyHandle = columnFamilyHandle;
        this.serializedKeyBuilder = ThreadLocal.withInitial(supplier);
        this.namespaceSerializer = ThreadLocal.withInitial(supplier2);
        this.defaultNamespace = n;
        this.valueDeserializerView = ThreadLocal.withInitial(supplier4);
        this.valueSerializerView = ThreadLocal.withInitial(supplier3);
        this.enableKeyReuse = (n instanceof VoidNamespace) && (supplier2.get() instanceof VoidNamespaceSerializer);
    }

    @Override // org.apache.flink.state.forst.ForStInnerTable
    public ColumnFamilyHandle getColumnFamilyHandle() {
        return this.columnFamilyHandle;
    }

    @Override // org.apache.flink.state.forst.ForStInnerTable
    public byte[] serializeKey(ContextKey<K, N> contextKey) throws IOException {
        return ForStSerializerUtils.serializeKeyAndNamespace(contextKey, this.serializedKeyBuilder.get(), this.defaultNamespace, this.namespaceSerializer.get(), this.enableKeyReuse);
    }

    @Override // org.apache.flink.state.forst.ForStInnerTable
    public byte[] serializeValue(ACC acc) throws IOException {
        DataOutputSerializer dataOutputSerializer = this.valueSerializerView.get();
        dataOutputSerializer.clear();
        getValueSerializer().serialize(acc, dataOutputSerializer);
        return dataOutputSerializer.getCopyOfBuffer();
    }

    @Override // org.apache.flink.state.forst.ForStInnerTable
    public ACC deserializeValue(byte[] bArr) throws IOException {
        DataInputDeserializer dataInputDeserializer = this.valueDeserializerView.get();
        dataInputDeserializer.setBuffer(bArr);
        return (ACC) getValueSerializer().deserialize(dataInputDeserializer);
    }

    @Override // org.apache.flink.state.forst.ForStInnerTable
    public ForStDBGetRequest<K, N, ACC, ?> buildDBGetRequest(StateRequest<?, ?, ?, ?> stateRequest) {
        Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.AGGREGATING_GET);
        return new ForStDBSingleGetRequest(new ContextKey(stateRequest.getRecordContext(), stateRequest.getNamespace()), this, stateRequest.getFuture());
    }

    @Override // org.apache.flink.state.forst.ForStInnerTable
    public ForStDBPutRequest<?, ?, ?> buildDBPutRequest(StateRequest<?, ?, ?, ?> stateRequest) {
        Preconditions.checkArgument(stateRequest.getRequestType() == StateRequestType.AGGREGATING_ADD || stateRequest.getRequestType() == StateRequestType.CLEAR);
        return ForStDBPutRequest.of(new ContextKey(stateRequest.getRecordContext(), stateRequest.getNamespace()), stateRequest.getRequestType() == StateRequestType.CLEAR ? null : stateRequest.getPayload(), this, stateRequest.getFuture());
    }
}
