package org.apache.beam.repackaged.direct_java.runners.core;

import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.reflect.DoFnSignature;
import org.apache.beam.sdk.transforms.reflect.DoFnSignatures;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
import org.apache.beam.sdk.transforms.windowing.WindowFn;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.sdk.util.WindowedValue;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunner.class */
public class StatefulDoFnRunner<InputT, OutputT, W extends BoundedWindow> implements DoFnRunner<InputT, OutputT> {
    public static final String DROPPED_DUE_TO_LATENESS_COUNTER = "StatefulParDoDropped";
    private final DoFnRunner<InputT, OutputT> doFnRunner;
    private final WindowingStrategy<?, ?> windowingStrategy;
    private final Counter droppedDueToLateness = Metrics.counter(StatefulDoFnRunner.class, DROPPED_DUE_TO_LATENESS_COUNTER);
    private final CleanupTimer<InputT> cleanupTimer;
    private final StateCleaner stateCleaner;

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunner$CleanupTimer.class */
    public interface CleanupTimer<InputT> {
        Instant currentInputWatermarkTime();

        void setForWindow(InputT inputt, BoundedWindow boundedWindow);

        boolean isForWindow(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain);
    }

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunner$StateCleaner.class */
    public interface StateCleaner<W extends BoundedWindow> {
        void clearForWindow(W w);
    }

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunner$StateInternalsStateCleaner.class */
    public static class StateInternalsStateCleaner<W extends BoundedWindow> implements StateCleaner<W> {
        private final DoFn<?, ?> fn;
        private final DoFnSignature signature;
        private final StateInternals stateInternals;
        private final Coder<W> windowCoder;

        public StateInternalsStateCleaner(DoFn<?, ?> doFn, StateInternals stateInternals, Coder<W> coder) {
            this.fn = doFn;
            this.signature = DoFnSignatures.getSignature(doFn.getClass());
            this.stateInternals = stateInternals;
            this.windowCoder = coder;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.StateCleaner
        public void clearForWindow(W w) {
            for (Map.Entry entry : this.signature.stateDeclarations().entrySet()) {
                try {
                    this.stateInternals.state(StateNamespaces.window(this.windowCoder, w), StateTags.tagForSpec((String) entry.getKey(), (StateSpec) ((DoFnSignature.StateDeclaration) entry.getValue()).field().get(this.fn))).clear();
                } catch (IllegalAccessException e) {
                    throw new RuntimeException(e);
                }
            }
        }
    }

    /* loaded from: input_file:org/apache/beam/repackaged/direct_java/runners/core/StatefulDoFnRunner$TimeInternalsCleanupTimer.class */
    public static class TimeInternalsCleanupTimer<InputT> implements CleanupTimer<InputT> {
        public static final String GC_TIMER_ID = "__StatefulParDoGcTimerId";
        public static final long GC_DELAY_MS = 1;
        private final TimerInternals timerInternals;
        private final WindowingStrategy<?, ?> windowingStrategy;
        private final Coder<BoundedWindow> windowCoder;

        public TimeInternalsCleanupTimer(TimerInternals timerInternals, WindowingStrategy<?, ?> windowingStrategy) {
            this.windowingStrategy = windowingStrategy;
            this.windowCoder = windowingStrategy.getWindowFn().windowCoder();
            this.timerInternals = timerInternals;
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.CleanupTimer
        public Instant currentInputWatermarkTime() {
            return this.timerInternals.currentInputWatermarkTime();
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.CleanupTimer
        public void setForWindow(InputT inputt, BoundedWindow boundedWindow) {
            this.timerInternals.setTimer(StateNamespaces.window(this.windowCoder, boundedWindow), GC_TIMER_ID, LateDataUtils.garbageCollectionTime(boundedWindow, this.windowingStrategy).plus(1L), TimeDomain.EVENT_TIME);
        }

        @Override // org.apache.beam.repackaged.direct_java.runners.core.StatefulDoFnRunner.CleanupTimer
        public boolean isForWindow(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
            return timeDomain.equals(TimeDomain.EVENT_TIME) && GC_TIMER_ID.equals(str) && LateDataUtils.garbageCollectionTime(boundedWindow, this.windowingStrategy).plus(1L).equals(instant);
        }
    }

    public StatefulDoFnRunner(DoFnRunner<InputT, OutputT> doFnRunner, WindowingStrategy<?, ?> windowingStrategy, CleanupTimer<InputT> cleanupTimer, StateCleaner<W> stateCleaner) {
        this.doFnRunner = doFnRunner;
        this.windowingStrategy = windowingStrategy;
        this.cleanupTimer = cleanupTimer;
        this.stateCleaner = stateCleaner;
        rejectMergingWindowFn(windowingStrategy.getWindowFn());
    }

    private void rejectMergingWindowFn(WindowFn<?, ?> windowFn) {
        if (!(windowFn instanceof NonMergingWindowFn)) {
            throw new UnsupportedOperationException("MergingWindowFn is not supported for stateful DoFns, WindowFn is: " + windowFn);
        }
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
    public DoFn<InputT, OutputT> getFn() {
        return this.doFnRunner.getFn();
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
    public void startBundle() {
        this.doFnRunner.startBundle();
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
    public void processElement(WindowedValue<InputT> windowedValue) {
        for (WindowedValue<InputT> windowedValue2 : windowedValue.explodeWindows()) {
            BoundedWindow boundedWindow = (BoundedWindow) windowedValue2.getWindows().iterator().next();
            if (isLate(boundedWindow)) {
                this.droppedDueToLateness.inc();
                WindowTracing.debug("StatefulDoFnRunner.processElement: Dropping element at {}; window:{} since too far behind inputWatermark:{}", new Object[]{windowedValue.getTimestamp(), boundedWindow, this.cleanupTimer.currentInputWatermarkTime()});
            } else {
                this.cleanupTimer.setForWindow(windowedValue2.getValue(), boundedWindow);
                this.doFnRunner.processElement(windowedValue2);
            }
        }
    }

    private boolean isLate(BoundedWindow boundedWindow) {
        return LateDataUtils.garbageCollectionTime(boundedWindow, this.windowingStrategy).isBefore(this.cleanupTimer.currentInputWatermarkTime());
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
    public void onTimer(String str, BoundedWindow boundedWindow, Instant instant, TimeDomain timeDomain) {
        if (this.cleanupTimer.isForWindow(str, boundedWindow, instant, timeDomain)) {
            this.stateCleaner.clearForWindow(boundedWindow);
        } else if (timeDomain.equals(TimeDomain.EVENT_TIME) || !isLate(boundedWindow)) {
            this.doFnRunner.onTimer(str, boundedWindow, instant, timeDomain);
        } else {
            WindowTracing.debug("StatefulDoFnRunner.onTimer: Ignoring processing-time timer at {}; window:{} since window is too far behind inputWatermark:{}", new Object[]{instant, boundedWindow, this.cleanupTimer.currentInputWatermarkTime()});
        }
    }

    @Override // org.apache.beam.repackaged.direct_java.runners.core.DoFnRunner
    public void finishBundle() {
        this.doFnRunner.finishBundle();
    }
}
