package org.apache.kafka.streams.state.internals;

import java.io.IOException;
import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.function.Function;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.streams.errors.ProcessorStateException;
import org.apache.kafka.streams.processor.StateStore;
import org.apache.kafka.streams.processor.StateStoreContext;
import org.apache.kafka.streams.processor.api.RecordMetadata;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.KeyQuery;
import org.apache.kafka.streams.query.MultiVersionedKeyQuery;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.Query;
import org.apache.kafka.streams.query.QueryConfig;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.RangeQuery;
import org.apache.kafka.streams.query.ResultOrder;
import org.apache.kafka.streams.query.VersionedKeyQuery;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.SessionStore;
import org.apache.kafka.streams.state.StateSerdes;
import org.apache.kafka.streams.state.VersionedKeyValueStore;
import org.apache.kafka.streams.state.VersionedRecord;
import org.apache.kafka.streams.state.WindowStore;

/* loaded from: input_file:org/apache/kafka/streams/state/internals/StoreQueryUtils.class */
public final class StoreQueryUtils {
    private static final Map<Class, QueryHandler> QUERY_HANDLER_MAP = Utils.mkMap(new Map.Entry[]{Utils.mkEntry(RangeQuery.class, StoreQueryUtils::runRangeQuery), Utils.mkEntry(KeyQuery.class, StoreQueryUtils::runKeyQuery), Utils.mkEntry(WindowKeyQuery.class, StoreQueryUtils::runWindowKeyQuery), Utils.mkEntry(WindowRangeQuery.class, StoreQueryUtils::runWindowRangeQuery), Utils.mkEntry(VersionedKeyQuery.class, StoreQueryUtils::runVersionedKeyQuery), Utils.mkEntry(MultiVersionedKeyQuery.class, StoreQueryUtils::runMultiVersionedKeyQuery)});

    @FunctionalInterface
    /* loaded from: input_file:org/apache/kafka/streams/state/internals/StoreQueryUtils$QueryHandler.class */
    public interface QueryHandler {
        QueryResult<?> apply(Query<?> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore);
    }

    private StoreQueryUtils() {
    }

    public static <R> QueryResult<R> handleBasicQueries(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore, Position position, StateStoreContext stateStoreContext) {
        QueryResult<?> notUpToBound;
        long nanoTime = queryConfig.isCollectExecutionInfo() ? System.nanoTime() : -1L;
        QueryHandler queryHandler = QUERY_HANDLER_MAP.get(query.getClass());
        synchronized (position) {
            if (queryHandler == null) {
                notUpToBound = QueryResult.forUnknownQueryType(query, stateStore);
            } else if (stateStoreContext == null || !isPermitted(position, positionBound, stateStoreContext.taskId().partition())) {
                notUpToBound = QueryResult.notUpToBound(position, positionBound, stateStoreContext == null ? null : Integer.valueOf(stateStoreContext.taskId().partition()));
            } else {
                notUpToBound = queryHandler.apply(query, positionBound, queryConfig, stateStore);
            }
            if (queryConfig.isCollectExecutionInfo()) {
                notUpToBound.addExecutionInfo("Handled in " + stateStore.getClass() + " in " + (System.nanoTime() - nanoTime) + "ns");
            }
            notUpToBound.setPosition(position.copy());
        }
        return (QueryResult<R>) notUpToBound;
    }

    public static void updatePosition(Position position, StateStoreContext stateStoreContext) {
        if (stateStoreContext == null || !stateStoreContext.recordMetadata().isPresent()) {
            return;
        }
        RecordMetadata recordMetadata = stateStoreContext.recordMetadata().get();
        if (recordMetadata.topic() != null) {
            position.withComponent(recordMetadata.topic(), recordMetadata.partition(), recordMetadata.offset());
        }
    }

    public static boolean isPermitted(Position position, PositionBound positionBound, int i) {
        Position position2 = positionBound.position();
        for (String str : position2.getTopics()) {
            Map<Integer, Long> partitionPositions = position2.getPartitionPositions(str);
            Map<Integer, Long> partitionPositions2 = position.getPartitionPositions(str);
            if (partitionPositions.containsKey(Integer.valueOf(i)) && (!partitionPositions2.containsKey(Integer.valueOf(i)) || partitionPositions2.get(Integer.valueOf(i)).longValue() < partitionPositions.get(Integer.valueOf(i)).longValue())) {
                return false;
            }
        }
        return true;
    }

    private static <R> QueryResult<R> runRangeQuery(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore) {
        if (!(stateStore instanceof KeyValueStore)) {
            return QueryResult.forUnknownQueryType(query, stateStore);
        }
        KeyValueStore keyValueStore = (KeyValueStore) stateStore;
        RangeQuery rangeQuery = (RangeQuery) query;
        Optional lowerBound = rangeQuery.getLowerBound();
        Optional upperBound = rangeQuery.getUpperBound();
        ResultOrder resultOrder = rangeQuery.resultOrder();
        try {
            return QueryResult.forResult((lowerBound.isPresent() || upperBound.isPresent() || resultOrder.equals(ResultOrder.DESCENDING)) ? !resultOrder.equals(ResultOrder.DESCENDING) ? keyValueStore.range(lowerBound.orElse(null), upperBound.orElse(null)) : (lowerBound.isPresent() || upperBound.isPresent()) ? keyValueStore.reverseRange(lowerBound.orElse(null), upperBound.orElse(null)) : keyValueStore.reverseAll() : keyValueStore.all());
        } catch (Exception e) {
            return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e, stateStore, query));
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    private static <R> QueryResult<R> runKeyQuery(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore) {
        if (!(stateStore instanceof KeyValueStore)) {
            return QueryResult.forUnknownQueryType(query, stateStore);
        }
        try {
            return QueryResult.forResult((byte[]) ((KeyValueStore) stateStore).get(((KeyQuery) query).getKey()));
        } catch (Exception e) {
            return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e, stateStore, query));
        }
    }

    private static <R> QueryResult<R> runWindowKeyQuery(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore) {
        if (!(stateStore instanceof WindowStore)) {
            return QueryResult.forUnknownQueryType(query, stateStore);
        }
        WindowKeyQuery windowKeyQuery = (WindowKeyQuery) query;
        try {
            return (windowKeyQuery.getTimeFrom().isPresent() && windowKeyQuery.getTimeTo().isPresent()) ? QueryResult.forResult(((WindowStore) stateStore).fetch((WindowStore) windowKeyQuery.getKey(), windowKeyQuery.getTimeFrom().get(), windowKeyQuery.getTimeTo().get())) : QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "This store (" + stateStore.getClass() + ") doesn't know how to execute the given query (" + query + ") because it only supports closed-range queries. Contact the store maintainer if you need support for a new query type.");
        } catch (Exception e) {
            return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e, stateStore, query));
        }
    }

    private static <R> QueryResult<R> runWindowRangeQuery(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore) {
        if (stateStore instanceof WindowStore) {
            WindowRangeQuery windowRangeQuery = (WindowRangeQuery) query;
            try {
                return (windowRangeQuery.getTimeFrom().isPresent() && windowRangeQuery.getTimeTo().isPresent()) ? QueryResult.forResult(((WindowStore) stateStore).fetchAll(windowRangeQuery.getTimeFrom().get(), windowRangeQuery.getTimeTo().get())) : QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "This store (" + stateStore.getClass() + ") doesn't know how to execute the given query (" + query + ") because WindowStores only supports WindowRangeQuery.withWindowStartRange. Contact the store maintainer if you need support for a new query type.");
            } catch (Exception e) {
                return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e, stateStore, query));
            }
        }
        if (!(stateStore instanceof SessionStore)) {
            return QueryResult.forUnknownQueryType(query, stateStore);
        }
        WindowRangeQuery windowRangeQuery2 = (WindowRangeQuery) query;
        try {
            return windowRangeQuery2.getKey().isPresent() ? QueryResult.forResult(((SessionStore) stateStore).fetch(windowRangeQuery2.getKey().get())) : QueryResult.forFailure(FailureReason.UNKNOWN_QUERY_TYPE, "This store (" + stateStore.getClass() + ") doesn't know how to execute the given query (" + query + ") because SessionStores only support WindowRangeQuery.withKey. Contact the store maintainer if you need support for a new query type.");
        } catch (Exception e2) {
            return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e2, stateStore, query));
        }
    }

    private static <R> QueryResult<R> runVersionedKeyQuery(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore) {
        if (!(stateStore instanceof VersionedKeyValueStore)) {
            return QueryResult.forUnknownQueryType(query, stateStore);
        }
        VersionedKeyValueStore versionedKeyValueStore = (VersionedKeyValueStore) stateStore;
        VersionedKeyQuery versionedKeyQuery = (VersionedKeyQuery) query;
        try {
            return QueryResult.forResult(((VersionedKeyQuery) query).asOfTimestamp().isPresent() ? versionedKeyValueStore.get(versionedKeyQuery.key(), ((VersionedKeyQuery) query).asOfTimestamp().get().toEpochMilli()) : versionedKeyValueStore.get(versionedKeyQuery.key()));
        } catch (Exception e) {
            return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e, stateStore, query));
        }
    }

    private static <R> QueryResult<R> runMultiVersionedKeyQuery(Query<R> query, PositionBound positionBound, QueryConfig queryConfig, StateStore stateStore) {
        if (!(stateStore instanceof VersionedKeyValueStore)) {
            return QueryResult.forUnknownQueryType(query, stateStore);
        }
        MultiVersionedKeyQuery multiVersionedKeyQuery = (MultiVersionedKeyQuery) query;
        try {
            return QueryResult.forResult(((RocksDBVersionedStore) stateStore).get((Bytes) multiVersionedKeyQuery.key(), multiVersionedKeyQuery.fromTime().get().toEpochMilli(), multiVersionedKeyQuery.toTime().get().toEpochMilli(), multiVersionedKeyQuery.resultOrder()));
        } catch (Exception e) {
            return QueryResult.forFailure(FailureReason.STORE_EXCEPTION, parseStoreException(e, stateStore, query));
        }
    }

    public static <V> Function<byte[], V> getDeserializeValue(StateSerdes<?, V> stateSerdes, StateStore stateStore) {
        Serde<V> valueSerde = stateSerdes.valueSerde();
        Deserializer<V> deserializer = ((WrappedStateStore.isTimestamped(stateStore) || isAdapter(stateStore)) || !(valueSerde instanceof ValueAndTimestampSerde)) ? valueSerde.deserializer() : ((ValueAndTimestampDeserializer) ((ValueAndTimestampSerde) valueSerde).deserializer()).valueDeserializer;
        return bArr -> {
            return deserializer.deserialize(stateSerdes.topic(), bArr);
        };
    }

    public static boolean isAdapter(StateStore stateStore) {
        if (stateStore instanceof KeyValueToTimestampedKeyValueByteStoreAdapter) {
            return true;
        }
        if (stateStore instanceof WrappedStateStore) {
            return isAdapter(((WrappedStateStore) stateStore).wrapped());
        }
        return false;
    }

    public static <V> Function<VersionedRecord<byte[]>, VersionedRecord<V>> getDeserializeValue(StateSerdes<?, V> stateSerdes) {
        Deserializer deserializer = stateSerdes.valueSerde().deserializer();
        return versionedRecord -> {
            return versionedRecord.validTo().isPresent() ? new VersionedRecord(deserializer.deserialize(stateSerdes.topic(), (byte[]) versionedRecord.value()), versionedRecord.timestamp(), versionedRecord.validTo().get().longValue()) : new VersionedRecord(deserializer.deserialize(stateSerdes.topic(), (byte[]) versionedRecord.value()), versionedRecord.timestamp());
        };
    }

    public static <V> VersionedRecord<V> deserializeVersionedRecord(StateSerdes<?, V> stateSerdes, VersionedRecord<byte[]> versionedRecord) {
        Object deserialize = stateSerdes.valueDeserializer().deserialize(stateSerdes.topic(), versionedRecord.value());
        return versionedRecord.validTo().isPresent() ? new VersionedRecord<>(deserialize, versionedRecord.timestamp(), versionedRecord.validTo().get().longValue()) : new VersionedRecord<>(deserialize, versionedRecord.timestamp());
    }

    public static void checkpointPosition(OffsetCheckpoint offsetCheckpoint, Position position) {
        try {
            offsetCheckpoint.write(positionToTopicPartitionMap(position));
        } catch (IOException e) {
            throw new ProcessorStateException("Error writing checkpoint file", e);
        }
    }

    public static Position readPositionFromCheckpoint(OffsetCheckpoint offsetCheckpoint) {
        try {
            return topicPartitionMapToPosition(offsetCheckpoint.read());
        } catch (IOException e) {
            throw new ProcessorStateException("Error reading checkpoint file", e);
        }
    }

    private static Map<TopicPartition, Long> positionToTopicPartitionMap(Position position) {
        HashMap hashMap = new HashMap();
        for (String str : position.getTopics()) {
            for (Map.Entry<Integer, Long> entry : position.getPartitionPositions(str).entrySet()) {
                hashMap.put(new TopicPartition(str, entry.getKey().intValue()), entry.getValue());
            }
        }
        return hashMap;
    }

    private static Position topicPartitionMapToPosition(Map<TopicPartition, Long> map) {
        HashMap hashMap = new HashMap();
        for (Map.Entry<TopicPartition, Long> entry : map.entrySet()) {
            ((Map) hashMap.computeIfAbsent(entry.getKey().topic(), str -> {
                return new HashMap();
            })).put(Integer.valueOf(entry.getKey().partition()), entry.getValue());
        }
        return Position.fromMap(hashMap);
    }

    private static <R> String parseStoreException(Exception exc, StateStore stateStore, Query<R> query) {
        StringWriter stringWriter = new StringWriter();
        PrintWriter printWriter = new PrintWriter(stringWriter);
        printWriter.println(stateStore.getClass() + " failed to handle query " + query + ":");
        exc.printStackTrace(printWriter);
        printWriter.flush();
        return stringWriter.toString();
    }
}
