package io.confluent.ksql.execution.streams.materialization.ks;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Range;
import com.google.common.collect.Streams;
import io.confluent.ksql.GenericKey;
import io.confluent.ksql.GenericRow;
import io.confluent.ksql.execution.streams.materialization.MaterializationException;
import io.confluent.ksql.execution.streams.materialization.StreamsMaterializedWindowedTable;
import io.confluent.ksql.execution.streams.materialization.WindowedRow;
import io.confluent.ksql.util.IteratorUtil;
import java.time.Duration;
import java.time.Instant;
import java.time.temporal.TemporalAmount;
import java.util.Collections;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.Windowed;
import org.apache.kafka.streams.kstream.internals.TimeWindow;
import org.apache.kafka.streams.query.FailureReason;
import org.apache.kafka.streams.query.Position;
import org.apache.kafka.streams.query.PositionBound;
import org.apache.kafka.streams.query.QueryResult;
import org.apache.kafka.streams.query.StateQueryRequest;
import org.apache.kafka.streams.query.WindowKeyQuery;
import org.apache.kafka.streams.query.WindowRangeQuery;
import org.apache.kafka.streams.state.KeyValueIterator;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStoreIterator;

/* loaded from: input_file:io/confluent/ksql/execution/streams/materialization/ks/KsMaterializedWindowTableIQv2.class */
class KsMaterializedWindowTableIQv2 implements StreamsMaterializedWindowedTable {
    private final KsStateStore stateStore;
    private final Duration windowSize;

    KsMaterializedWindowTableIQv2(KsStateStore ksStateStore, Duration duration) {
        this.stateStore = (KsStateStore) Objects.requireNonNull(ksStateStore, "store");
        this.windowSize = (Duration) Objects.requireNonNull(duration, "windowSize");
    }

    @Override // io.confluent.ksql.execution.streams.materialization.StreamsMaterializedWindowedTable
    public KsMaterializedQueryResult<WindowedRow> get(GenericKey genericKey, int i, Range<Instant> range, Range<Instant> range2, Optional<Position> optional) {
        try {
            try {
                StateQueryRequest withQuery = StateQueryRequest.inStore(this.stateStore.getStateStoreName()).withQuery(WindowKeyQuery.withKeyAndWindowStartRange(genericKey, calculateLowerBound(range, range2), calculateUpperBound(range, range2)));
                if (optional.isPresent()) {
                    withQuery = withQuery.withPositionBound(PositionBound.at(optional.get()));
                }
                QueryResult<?> queryResult = (QueryResult) this.stateStore.getKafkaStreams().query(withQuery).getPartitionResults().get(Integer.valueOf(i));
                if (queryResult.isFailure()) {
                    throw failedQueryException(queryResult);
                }
                if (queryResult.getResult() == null) {
                    return KsMaterializedQueryResult.rowIteratorWithPosition(Collections.emptyIterator(), queryResult.getPosition());
                }
                WindowStoreIterator windowStoreIterator = (WindowStoreIterator) queryResult.getResult();
                Throwable th = null;
                try {
                    try {
                        ImmutableList.Builder builder = ImmutableList.builder();
                        while (windowStoreIterator.hasNext()) {
                            KeyValue keyValue = (KeyValue) windowStoreIterator.next();
                            Instant ofEpochMilli = Instant.ofEpochMilli(((Long) keyValue.key).longValue());
                            if (range.contains(ofEpochMilli)) {
                                Instant plus = ofEpochMilli.plus((TemporalAmount) this.windowSize);
                                if (range2.contains(plus)) {
                                    builder.add(WindowedRow.of(this.stateStore.schema(), new Windowed(genericKey, new TimeWindow(ofEpochMilli.toEpochMilli(), plus.toEpochMilli())), (GenericRow) ((ValueAndTimestamp) keyValue.value).value(), ((ValueAndTimestamp) keyValue.value).timestamp()));
                                }
                            }
                        }
                        KsMaterializedQueryResult<WindowedRow> rowIteratorWithPosition = KsMaterializedQueryResult.rowIteratorWithPosition(builder.build().iterator(), queryResult.getPosition());
                        if (windowStoreIterator != null) {
                            if (0 != 0) {
                                try {
                                    windowStoreIterator.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                windowStoreIterator.close();
                            }
                        }
                        return rowIteratorWithPosition;
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (windowStoreIterator != null) {
                        if (th != null) {
                            try {
                                windowStoreIterator.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            windowStoreIterator.close();
                        }
                    }
                    throw th3;
                }
            } catch (Exception e) {
                throw new MaterializationException("Failed to get value from materialized table", e);
            }
        } catch (MaterializationException | NotUpToBoundException e2) {
            throw e2;
        }
    }

    @Override // io.confluent.ksql.execution.streams.materialization.StreamsMaterializedWindowedTable
    public KsMaterializedQueryResult<WindowedRow> get(int i, Range<Instant> range, Range<Instant> range2, Optional<Position> optional) {
        try {
            StateQueryRequest withQuery = StateQueryRequest.inStore(this.stateStore.getStateStoreName()).withQuery(WindowRangeQuery.withWindowStartRange(calculateLowerBound(range, range2), calculateUpperBound(range, range2)));
            if (optional.isPresent()) {
                withQuery = withQuery.withPositionBound(PositionBound.at(optional.get()));
            }
            QueryResult<?> queryResult = (QueryResult) this.stateStore.getKafkaStreams().query(withQuery).getPartitionResults().get(Integer.valueOf(i));
            if (queryResult.isFailure()) {
                throw failedQueryException(queryResult);
            }
            KeyValueIterator keyValueIterator = (KeyValueIterator) queryResult.getResult();
            keyValueIterator.getClass();
            return KsMaterializedQueryResult.rowIteratorWithPosition(Streams.stream(IteratorUtil.onComplete(keyValueIterator, keyValueIterator::close)).map(keyValue -> {
                Instant startTime = ((Windowed) keyValue.key).window().startTime();
                if (!range.contains(startTime)) {
                    return null;
                }
                Instant endTime = ((Windowed) keyValue.key).window().endTime();
                if (!range2.contains(endTime)) {
                    return null;
                }
                return WindowedRow.of(this.stateStore.schema(), new Windowed(((Windowed) keyValue.key).key(), new TimeWindow(startTime.toEpochMilli(), endTime.toEpochMilli())), (GenericRow) ((ValueAndTimestamp) keyValue.value).value(), ((ValueAndTimestamp) keyValue.value).timestamp());
            }).filter((v0) -> {
                return Objects.nonNull(v0);
            }).iterator(), queryResult.getPosition());
        } catch (MaterializationException | NotUpToBoundException e) {
            throw e;
        } catch (Exception e2) {
            throw new MaterializationException("Failed to get value from materialized table", e2);
        }
    }

    private Instant calculateUpperBound(Range<Instant> range, Range<Instant> range2) {
        Instant ofEpochMilli = range.hasUpperBound() ? (Instant) range.upperEndpoint() : Instant.ofEpochMilli(Long.MAX_VALUE);
        Instant minus = range2.hasUpperBound() ? ((Instant) range2.upperEndpoint()).minus((TemporalAmount) this.windowSize) : Instant.ofEpochMilli(Long.MAX_VALUE);
        return ofEpochMilli.compareTo(minus) < 0 ? ofEpochMilli : minus;
    }

    private Instant calculateLowerBound(Range<Instant> range, Range<Instant> range2) {
        Instant ofEpochMilli = range.hasLowerBound() ? (Instant) range.lowerEndpoint() : Instant.ofEpochMilli(0L);
        Instant minus = range2.hasLowerBound() ? ((Instant) range2.lowerEndpoint()).minus((TemporalAmount) this.windowSize) : Instant.ofEpochMilli(0L);
        return ofEpochMilli.compareTo(minus) < 0 ? minus : ofEpochMilli;
    }

    private Exception failedQueryException(QueryResult<?> queryResult) {
        String str = "Failed to get value from materialized table: " + queryResult.getFailureReason() + ": " + queryResult.getFailureMessage();
        return queryResult.getFailureReason().equals(FailureReason.NOT_UP_TO_BOUND) ? new NotUpToBoundException(str) : new MaterializationException(str);
    }
}
