package org.apache.flink.state.api.input.operator;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.state.AggregatingStateDescriptor;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ReducingStateDescriptor;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.state.api.functions.WindowReaderFunction;
import org.apache.flink.state.api.input.operator.window.WindowContents;
import org.apache.flink.state.api.runtime.SavepointRuntimeContext;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;

@Internal
/* loaded from: input_file:org/apache/flink/state/api/input/operator/WindowReaderOperator.class */
public class WindowReaderOperator<S extends State, KEY, IN, W extends Window, OUT> extends StateReaderOperator<WindowReaderFunction<IN, OUT, KEY, W>, KEY, W, OUT> {
    private static final String WINDOW_STATE_NAME = "window-contents";
    private static final String WINDOW_TIMER_NAME = "window-timers";
    private final WindowContents<S, IN> contents;
    private final StateDescriptor<S, ?> descriptor;
    private transient WindowReaderOperator<S, KEY, IN, W, OUT>.Context ctx;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/flink/state/api/input/operator/WindowReaderOperator$Context.class */
    public class Context implements WindowReaderFunction.Context<W> {
        private static final String EVENT_TIMER_STATE = "event-time-timers";
        private static final String PROC_TIMER_STATE = "proc-time-timers";
        W window;
        final WindowReaderOperator<S, KEY, IN, W, OUT>.PerWindowKeyedStateStore perWindowKeyedStateStore;
        final DefaultKeyedStateStore keyedStateStore;
        ListState<Long> eventTimers;
        ListState<Long> procTimers;

        private Context(KeyedStateBackend<KEY> keyedStateBackend, InternalTimerService<W> internalTimerService) throws Exception {
            this.keyedStateStore = new DefaultKeyedStateStore(keyedStateBackend, WindowReaderOperator.this.getSerializerFactory());
            this.perWindowKeyedStateStore = new PerWindowKeyedStateStore(keyedStateBackend);
            this.eventTimers = keyedStateBackend.getPartitionedState(WindowReaderOperator.WINDOW_TIMER_NAME, StringSerializer.INSTANCE, new ListStateDescriptor(EVENT_TIMER_STATE, Types.LONG));
            internalTimerService.forEachEventTimeTimer((window, l) -> {
                this.eventTimers.add(l);
            });
            this.procTimers = keyedStateBackend.getPartitionedState(WindowReaderOperator.WINDOW_TIMER_NAME, StringSerializer.INSTANCE, new ListStateDescriptor(PROC_TIMER_STATE, Types.LONG));
            internalTimerService.forEachProcessingTimeTimer((window2, l2) -> {
                this.procTimers.add(l2);
            });
        }

        @Override // org.apache.flink.state.api.functions.WindowReaderFunction.Context
        public W window() {
            return this.window;
        }

        @Override // org.apache.flink.state.api.functions.WindowReaderFunction.Context
        public <TS extends State> TS triggerState(StateDescriptor<TS, ?> stateDescriptor) {
            try {
                return (TS) WindowReaderOperator.this.getKeyedStateBackend().getPartitionedState(this.window, WindowReaderOperator.this.namespaceSerializer, stateDescriptor);
            } catch (Exception e) {
                throw new RuntimeException("Could not retrieve trigger state", e);
            }
        }

        @Override // org.apache.flink.state.api.functions.WindowReaderFunction.Context
        public KeyedStateStore windowState() {
            this.perWindowKeyedStateStore.window = this.window;
            return this.perWindowKeyedStateStore;
        }

        @Override // org.apache.flink.state.api.functions.WindowReaderFunction.Context
        public KeyedStateStore globalState() {
            return this.keyedStateStore;
        }

        @Override // org.apache.flink.state.api.functions.WindowReaderFunction.Context
        public Set<Long> registeredEventTimeTimers() throws Exception {
            Iterable iterable = (Iterable) this.eventTimers.get();
            return iterable == null ? Collections.emptySet() : (Set) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toSet());
        }

        @Override // org.apache.flink.state.api.functions.WindowReaderFunction.Context
        public Set<Long> registeredProcessingTimeTimers() throws Exception {
            Iterable iterable = (Iterable) this.procTimers.get();
            return iterable == null ? Collections.emptySet() : (Set) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toSet());
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/operator/WindowReaderOperator$IteratorWithRemove.class */
    private static class IteratorWithRemove<T> implements CloseableIterator<T> {
        private final Iterator<T> iterator;
        private final AutoCloseable resource;

        private IteratorWithRemove(Stream<T> stream) {
            this.iterator = stream.iterator();
            this.resource = stream;
        }

        public boolean hasNext() {
            return this.iterator.hasNext();
        }

        public T next() {
            return this.iterator.next();
        }

        public void remove() {
        }

        public void close() throws Exception {
            this.resource.close();
        }
    }

    /* loaded from: input_file:org/apache/flink/state/api/input/operator/WindowReaderOperator$PerWindowKeyedStateStore.class */
    private class PerWindowKeyedStateStore extends DefaultKeyedStateStore {
        W window;

        PerWindowKeyedStateStore(KeyedStateBackend<?> keyedStateBackend) {
            super(keyedStateBackend, WindowReaderOperator.this.getSerializerFactory());
        }

        protected <SS extends State> SS getPartitionedState(StateDescriptor<SS, ?> stateDescriptor) throws Exception {
            return (SS) this.keyedStateBackend.getPartitionedState(this.window, WindowReaderOperator.this.namespaceSerializer, stateDescriptor);
        }
    }

    public static <KEY, T, W extends Window, OUT> WindowReaderOperator<?, KEY, T, W, OUT> reduce(ReduceFunction<T> reduceFunction, WindowReaderFunction<T, OUT, KEY, W> windowReaderFunction, TypeInformation<KEY> typeInformation, TypeSerializer<W> typeSerializer, TypeInformation<T> typeInformation2) {
        return new WindowReaderOperator<>(windowReaderFunction, typeInformation, typeSerializer, WindowContents.reducingState(), new ReducingStateDescriptor(WINDOW_STATE_NAME, reduceFunction, typeInformation2));
    }

    public static <KEY, T, ACC, R, OUT, W extends Window> WindowReaderOperator<?, KEY, R, W, OUT> aggregate(AggregateFunction<T, ACC, R> aggregateFunction, WindowReaderFunction<R, OUT, KEY, W> windowReaderFunction, TypeInformation<KEY> typeInformation, TypeSerializer<W> typeSerializer, TypeInformation<ACC> typeInformation2) {
        return new WindowReaderOperator<>(windowReaderFunction, typeInformation, typeSerializer, WindowContents.aggregatingState(), new AggregatingStateDescriptor(WINDOW_STATE_NAME, aggregateFunction, typeInformation2));
    }

    public static <KEY, T, W extends Window, OUT> WindowReaderOperator<?, KEY, T, W, OUT> process(WindowReaderFunction<T, OUT, KEY, W> windowReaderFunction, TypeInformation<KEY> typeInformation, TypeSerializer<W> typeSerializer, TypeInformation<T> typeInformation2) {
        return new WindowReaderOperator<>(windowReaderFunction, typeInformation, typeSerializer, WindowContents.listState(), new ListStateDescriptor(WINDOW_STATE_NAME, typeInformation2));
    }

    public static <KEY, T, W extends Window, OUT> WindowReaderOperator<?, KEY, StreamRecord<T>, W, OUT> evictingWindow(WindowReaderFunction<StreamRecord<T>, OUT, KEY, W> windowReaderFunction, TypeInformation<KEY> typeInformation, TypeSerializer<W> typeSerializer, TypeInformation<T> typeInformation2, ExecutionConfig executionConfig) {
        return new WindowReaderOperator<>(windowReaderFunction, typeInformation, typeSerializer, WindowContents.listState(), new ListStateDescriptor(WINDOW_STATE_NAME, new StreamElementSerializer(typeInformation2.createSerializer(executionConfig.getSerializerConfig()))));
    }

    private WindowReaderOperator(WindowReaderFunction<IN, OUT, KEY, W> windowReaderFunction, TypeInformation<KEY> typeInformation, TypeSerializer<W> typeSerializer, WindowContents<S, IN> windowContents, StateDescriptor<S, ?> stateDescriptor) {
        super(windowReaderFunction, typeInformation, typeSerializer);
        Preconditions.checkNotNull(windowContents, "WindowContents must not be null");
        Preconditions.checkNotNull(stateDescriptor, "The state descriptor must not be null");
        this.contents = windowContents;
        this.descriptor = stateDescriptor;
    }

    @Override // org.apache.flink.state.api.input.operator.StateReaderOperator
    public void open() throws Exception {
        super.open();
        this.ctx = new Context(getKeyedStateBackend(), getInternalTimerService(WINDOW_TIMER_NAME));
    }

    /* JADX WARN: Multi-variable type inference failed */
    public void processElement(KEY key, W w, Collector<OUT> collector) throws Exception {
        this.ctx.window = w;
        this.function.readWindow(key, this.ctx, this.contents.contents(getKeyedStateBackend().getPartitionedState(w, this.namespaceSerializer, this.descriptor)), collector);
    }

    @Override // org.apache.flink.state.api.input.operator.StateReaderOperator
    public CloseableIterator<Tuple2<KEY, W>> getKeysAndNamespaces(SavepointRuntimeContext savepointRuntimeContext) throws Exception {
        return new IteratorWithRemove(getKeyedStateBackend().getKeysAndNamespaces(this.descriptor.getName()));
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.state.api.input.operator.StateReaderOperator
    public /* bridge */ /* synthetic */ void processElement(Object obj, Object obj2, Collector collector) throws Exception {
        processElement((WindowReaderOperator<S, KEY, IN, W, OUT>) obj, obj2, collector);
    }
}
