package org.apache.beam.runners.direct;

import com.google.auto.value.AutoValue;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.NavigableSet;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nonnull;
import javax.annotation.concurrent.GuardedBy;
import org.apache.beam.repackaged.direct_java.runners.core.StateNamespace;
import org.apache.beam.repackaged.direct_java.runners.core.TimerInternals;
import org.apache.beam.repackaged.direct_java.runners.local.Bundle;
import org.apache.beam.repackaged.direct_java.runners.local.StructuralKey;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.util.WindowTracing;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ComparisonChain;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.HashBasedTable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Ordering;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.SortedMultiset;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Table;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.TreeMultiset;
import org.joda.time.Instant;

/* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager.class */
public class WatermarkManager<ExecutableT, CollectionT> {
    private static final int MAX_INCREMENTAL_UPDATES = 10;
    private static final Watermark THE_END_OF_TIME = new Watermark() { // from class: org.apache.beam.runners.direct.WatermarkManager.1
        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public String getName() {
            return "THE_END_OF_TIME";
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public WatermarkUpdate refresh() {
            return WatermarkUpdate.NO_CHANGE;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public Instant get() {
            return BoundedWindow.TIMESTAMP_MAX_VALUE;
        }
    };
    private static final Ordering<Instant> INSTANT_ORDERING = Ordering.natural();
    private final Clock clock;
    private final ExecutableGraph<ExecutableT, CollectionT> graph;
    private final Function<ExecutableT, String> getName;
    private final Map<ExecutableT, Set<String>> transformsWithAlreadyExtractedTimers = new ConcurrentHashMap();
    private final ConcurrentLinkedQueue<PendingWatermarkUpdate<ExecutableT, CollectionT>> pendingUpdates = new ConcurrentLinkedQueue<>();
    private final Lock refreshLock = new ReentrantLock();

    @GuardedBy("refreshLock")
    private final Set<ExecutableT> pendingRefreshes = new HashSet();
    private final Map<ExecutableT, WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks> transformToWatermarks = new HashMap();

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: org.apache.beam.runners.direct.WatermarkManager$2, reason: invalid class name */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$2.class */
    public static /* synthetic */ class AnonymousClass2 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$beam$sdk$state$TimeDomain = new int[TimeDomain.values().length];

        static {
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.PROCESSING_TIME.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$beam$sdk$state$TimeDomain[TimeDomain.SYNCHRONIZED_PROCESSING_TIME.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$AppliedPTransformInputWatermark.class */
    public static class AppliedPTransformInputWatermark implements Watermark {
        private final String name;
        private final Collection<? extends Watermark> inputWatermarks;
        private final Consumer<TimerInternals.TimerData> timerUpdateNotification;
        private final SortedMultiset<Bundle<?, ?>> pendingElements = TreeMultiset.create(new BundleByElementTimestampComparator().compound(Ordering.arbitrary()));
        private final SortedMultiset<TimerInternals.TimerData> pendingTimers = TreeMultiset.create();
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> objectTimers = new HashMap();
        private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerInternals.TimerData>> existingTimers = new HashMap();
        private final AtomicReference<Instant> currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);

        public AppliedPTransformInputWatermark(String str, Collection<? extends Watermark> collection, Consumer<TimerInternals.TimerData> consumer) {
            this.name = str;
            this.inputWatermarks = collection;
            this.timerUpdateNotification = consumer;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public String getName() {
            return this.name;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public synchronized WatermarkUpdate refresh() {
            Instant instant = this.currentWatermark.get();
            Instant instant2 = BoundedWindow.TIMESTAMP_MAX_VALUE;
            Iterator<? extends Watermark> it = this.inputWatermarks.iterator();
            while (it.hasNext()) {
                instant2 = (Instant) WatermarkManager.INSTANT_ORDERING.min(instant2, it.next().get());
            }
            if (!this.pendingElements.isEmpty()) {
                instant2 = (Instant) WatermarkManager.INSTANT_ORDERING.min(instant2, ((Bundle) this.pendingElements.firstEntry().getElement()).getMinimumTimestamp());
            }
            Instant instant3 = (Instant) WatermarkManager.INSTANT_ORDERING.max(instant, instant2);
            this.currentWatermark.set(instant3);
            return WatermarkManager.updateAndTrace(getName(), instant, instant3);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void addPending(Bundle<?, ?> bundle) {
            this.pendingElements.add(bundle);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void removePending(Bundle<?, ?> bundle) {
            this.pendingElements.remove(bundle);
        }

        @VisibleForTesting
        synchronized Instant getEarliestTimerTimestamp() {
            return this.pendingTimers.isEmpty() ? BoundedWindow.TIMESTAMP_MAX_VALUE : getMinimumOutputTimestamp(this.pendingTimers);
        }

        private Instant getMinimumOutputTimestamp(SortedMultiset<TimerInternals.TimerData> sortedMultiset) {
            Instant outputTimestamp = ((TimerInternals.TimerData) sortedMultiset.firstEntry().getElement()).getOutputTimestamp();
            Iterator it = sortedMultiset.iterator();
            while (it.hasNext()) {
                outputTimestamp = (Instant) WatermarkManager.INSTANT_ORDERING.min(((TimerInternals.TimerData) it.next()).getOutputTimestamp(), outputTimestamp);
            }
            return outputTimestamp;
        }

        @VisibleForTesting
        synchronized void updateTimers(TimerUpdate timerUpdate) {
            TimerInternals.TimerData timerData;
            NavigableSet<TimerInternals.TimerData> computeIfAbsent = this.objectTimers.computeIfAbsent(timerUpdate.key, structuralKey -> {
                return new TreeSet();
            });
            Table<StateNamespace, String, TimerInternals.TimerData> computeIfAbsent2 = this.existingTimers.computeIfAbsent(timerUpdate.key, structuralKey2 -> {
                return HashBasedTable.create();
            });
            for (TimerInternals.TimerData timerData2 : timerUpdate.getSetTimers()) {
                if (TimeDomain.EVENT_TIME.equals(timerData2.getDomain())) {
                    TimerInternals.TimerData timerData3 = (TimerInternals.TimerData) computeIfAbsent2.get(timerData2.getNamespace(), timerData2.getTimerId() + '+' + timerData2.getTimerFamilyId());
                    if (timerData3 == null) {
                        this.pendingTimers.add(timerData2);
                        computeIfAbsent.add(timerData2);
                    } else {
                        this.pendingTimers.remove(timerData3);
                        computeIfAbsent.remove(timerData3);
                        this.pendingTimers.add(timerData2);
                        computeIfAbsent.add(timerData2);
                    }
                    computeIfAbsent2.put(timerData2.getNamespace(), timerData2.getTimerId() + '+' + timerData2.getTimerFamilyId(), timerData2);
                }
            }
            for (TimerInternals.TimerData timerData4 : timerUpdate.getDeletedTimers()) {
                if (TimeDomain.EVENT_TIME.equals(timerData4.getDomain()) && (timerData = (TimerInternals.TimerData) computeIfAbsent2.get(timerData4.getNamespace(), timerData4.getTimerId() + '+' + timerData4.getTimerFamilyId())) != null) {
                    this.pendingTimers.remove(timerData);
                    computeIfAbsent.remove(timerData);
                    computeIfAbsent2.remove(timerData.getNamespace(), timerData.getTimerId() + '+' + timerData.getTimerFamilyId());
                }
            }
            for (TimerInternals.TimerData timerData5 : timerUpdate.getCompletedTimers()) {
                if (TimeDomain.EVENT_TIME.equals(timerData5.getDomain())) {
                    computeIfAbsent.remove(timerData5);
                    this.pendingTimers.remove(timerData5);
                }
            }
            if (timerUpdate.isEmpty()) {
                return;
            }
            Iterables.concat(timerUpdate.getCompletedTimers(), timerUpdate.getDeletedTimers(), timerUpdate.getSetTimers()).forEach(this.timerUpdateNotification);
        }

        @VisibleForTesting
        synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredEventTimeTimers() {
            return WatermarkManager.extractFiredTimers(this.currentWatermark.get(), this.objectTimers);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformInputWatermark.class).add("pendingElements", this.pendingElements).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$AppliedPTransformOutputWatermark.class */
    public static class AppliedPTransformOutputWatermark implements Watermark {
        private final String name;
        private final AppliedPTransformInputWatermark inputWatermark;
        private final PerKeyHolds holds = new PerKeyHolds();
        private AtomicReference<Instant> currentWatermark = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);

        public AppliedPTransformOutputWatermark(String str, AppliedPTransformInputWatermark appliedPTransformInputWatermark) {
            this.name = str;
            this.inputWatermark = appliedPTransformInputWatermark;
        }

        public synchronized void updateHold(Object obj, Instant instant) {
            if (instant == null) {
                this.holds.removeHold(obj);
            } else {
                this.holds.updateHold(obj, instant);
            }
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public String getName() {
            return this.name;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public Instant get() {
            return this.currentWatermark.get();
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public synchronized WatermarkUpdate refresh() {
            Instant instant = this.currentWatermark.get();
            Instant instant2 = (Instant) WatermarkManager.INSTANT_ORDERING.max(instant, (Instant) WatermarkManager.INSTANT_ORDERING.min(this.inputWatermark.get(), this.holds.getMinHold(), this.inputWatermark.getEarliestTimerTimestamp(), new Instant[0]));
            this.currentWatermark.set(instant2);
            return WatermarkManager.updateAndTrace(getName(), instant, instant2);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(AppliedPTransformOutputWatermark.class).add("holds", this.holds).add("currentWatermark", this.currentWatermark).toString();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$BundleByElementTimestampComparator.class */
    private static class BundleByElementTimestampComparator extends Ordering<Bundle<?, ?>> implements Serializable {
        private BundleByElementTimestampComparator() {
        }

        @SuppressFBWarnings(value = {"NP_METHOD_PARAMETER_TIGHTENS_ANNOTATION"}, justification = "https://github.com/google/guava/issues/920")
        public int compare(@Nonnull Bundle<?, ?> bundle, @Nonnull Bundle<?, ?> bundle2) {
            return ComparisonChain.start().compare(bundle.getMinimumTimestamp(), bundle2.getMinimumTimestamp()).result();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$FiredTimers.class */
    public static class FiredTimers<ExecutableT> {
        private final ExecutableT executable;
        private final StructuralKey<?> key;
        private final Collection<TimerInternals.TimerData> timers;

        private FiredTimers(ExecutableT executablet, StructuralKey<?> structuralKey, Collection<TimerInternals.TimerData> collection) {
            this.executable = executablet;
            this.key = structuralKey;
            this.timers = collection;
        }

        public ExecutableT getExecutable() {
            return this.executable;
        }

        public StructuralKey<?> getKey() {
            return this.key;
        }

        public Collection<TimerInternals.TimerData> getTimers() {
            return this.timers;
        }

        public String toString() {
            return MoreObjects.toStringHelper(FiredTimers.class).add("key", this.key).add("timers", this.timers).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$KeyedHold.class */
    public static final class KeyedHold implements Comparable<KeyedHold> {
        private static final Ordering<Object> KEY_ORDERING = Ordering.arbitrary().nullsLast();
        private final Object key;
        private final Instant timestamp;

        public static KeyedHold of(Object obj, Instant instant) {
            return new KeyedHold(obj, (Instant) MoreObjects.firstNonNull(instant, WatermarkManager.THE_END_OF_TIME.get()));
        }

        private KeyedHold(Object obj, Instant instant) {
            this.key = obj;
            this.timestamp = instant;
        }

        @Override // java.lang.Comparable
        public int compareTo(KeyedHold keyedHold) {
            return ComparisonChain.start().compare(this.timestamp, keyedHold.timestamp).compare(this.key, keyedHold.key, KEY_ORDERING).result();
        }

        public int hashCode() {
            return Objects.hash(this.timestamp, this.key);
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof KeyedHold)) {
                return false;
            }
            KeyedHold keyedHold = (KeyedHold) obj;
            return Objects.equals(this.timestamp, keyedHold.timestamp) && Objects.equals(this.key, keyedHold.key);
        }

        public Instant getTimestamp() {
            return this.timestamp;
        }

        public String toString() {
            return MoreObjects.toStringHelper(KeyedHold.class).add("key", this.key).add("hold", this.timestamp).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @AutoValue
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$PendingWatermarkUpdate.class */
    public static abstract class PendingWatermarkUpdate<ExecutableT, CollectionT> {
        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract ExecutableT getExecutable();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Bundle<?, ? extends CollectionT> getInputBundle();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract TimerUpdate getTimerUpdate();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Bundle<?, ? extends CollectionT> getUnprocessedInputs();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Iterable<? extends Bundle<?, ? extends CollectionT>> getOutputs();

        /* JADX INFO: Access modifiers changed from: package-private */
        public abstract Instant getEarliestHold();

        public static <ExecutableT, CollectionT> PendingWatermarkUpdate<ExecutableT, CollectionT> create(ExecutableT executablet, Bundle<?, ? extends CollectionT> bundle, TimerUpdate timerUpdate, Bundle<?, ? extends CollectionT> bundle2, Iterable<? extends Bundle<?, ? extends CollectionT>> iterable, Instant instant) {
            return new AutoValue_WatermarkManager_PendingWatermarkUpdate(executablet, bundle, timerUpdate, bundle2, iterable, instant);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$PerKeyHolds.class */
    public static class PerKeyHolds {
        private final Map<Object, KeyedHold> keyedHolds;
        private final NavigableSet<KeyedHold> allHolds;

        private PerKeyHolds() {
            this.keyedHolds = new HashMap();
            this.allHolds = new TreeSet();
        }

        public Instant getMinHold() {
            return this.allHolds.isEmpty() ? WatermarkManager.THE_END_OF_TIME.get() : this.allHolds.first().getTimestamp();
        }

        public void updateHold(Object obj, Instant instant) {
            removeHold(obj);
            KeyedHold of = KeyedHold.of(obj, instant);
            this.keyedHolds.put(obj, of);
            this.allHolds.add(of);
        }

        public void removeHold(Object obj) {
            KeyedHold remove = this.keyedHolds.remove(obj);
            if (remove != null) {
                this.allHolds.remove(remove);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$SynchronizedProcessingTimeInputWatermark.class */
    public static class SynchronizedProcessingTimeInputWatermark implements Watermark {
        private final String name;
        private final Collection<? extends Watermark> inputWms;
        private final Collection<Bundle<?, ?>> pendingBundles = new HashSet();
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> processingTimers = new HashMap();
        private final Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> synchronizedProcessingTimers = new HashMap();
        private final Map<StructuralKey<?>, Table<StateNamespace, String, TimerInternals.TimerData>> existingTimers = new HashMap();
        private final NavigableSet<TimerInternals.TimerData> pendingTimers = new TreeSet();
        private AtomicReference<Instant> earliestHold;
        private final Consumer<TimerInternals.TimerData> timerUpdateNotification;

        public SynchronizedProcessingTimeInputWatermark(String str, Collection<? extends Watermark> collection, Consumer<TimerInternals.TimerData> consumer) {
            this.name = str;
            this.inputWms = collection;
            Instant instant = BoundedWindow.TIMESTAMP_MAX_VALUE;
            Iterator<? extends Watermark> it = collection.iterator();
            while (it.hasNext()) {
                instant = (Instant) WatermarkManager.INSTANT_ORDERING.min(instant, it.next().get());
            }
            this.earliestHold = new AtomicReference<>(instant);
            this.timerUpdateNotification = consumer;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public String getName() {
            return this.name;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public Instant get() {
            return this.earliestHold.get();
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public synchronized WatermarkUpdate refresh() {
            Instant instant = this.earliestHold.get();
            Instant instant2 = WatermarkManager.THE_END_OF_TIME.get();
            Iterator<? extends Watermark> it = this.inputWms.iterator();
            while (it.hasNext()) {
                instant2 = (Instant) WatermarkManager.INSTANT_ORDERING.min(instant2, it.next().get());
            }
            Iterator<Bundle<?, ?>> it2 = this.pendingBundles.iterator();
            while (it2.hasNext()) {
                instant2 = (Instant) WatermarkManager.INSTANT_ORDERING.min(instant2, it2.next().getSynchronizedProcessingOutputWatermark());
            }
            this.earliestHold.set(instant2);
            return WatermarkManager.updateAndTrace(getName(), instant, instant2);
        }

        public synchronized void addPending(Bundle<?, ?> bundle) {
            this.pendingBundles.add(bundle);
        }

        public synchronized void removePending(Bundle<?, ?> bundle) {
            this.pendingBundles.remove(bundle);
        }

        public synchronized Instant getEarliestTimerTimestamp() {
            Instant instant = WatermarkManager.THE_END_OF_TIME.get();
            for (NavigableSet<TimerInternals.TimerData> navigableSet : this.processingTimers.values()) {
                if (!navigableSet.isEmpty()) {
                    instant = (Instant) WatermarkManager.INSTANT_ORDERING.min(getMinimumOutputTimestamp(navigableSet), instant);
                }
            }
            for (NavigableSet<TimerInternals.TimerData> navigableSet2 : this.synchronizedProcessingTimers.values()) {
                if (!navigableSet2.isEmpty()) {
                    instant = (Instant) WatermarkManager.INSTANT_ORDERING.min(getMinimumOutputTimestamp(navigableSet2), instant);
                }
            }
            if (!this.pendingTimers.isEmpty()) {
                instant = (Instant) WatermarkManager.INSTANT_ORDERING.min(getMinimumOutputTimestamp(this.pendingTimers), instant);
            }
            return instant;
        }

        private Instant getMinimumOutputTimestamp(NavigableSet<TimerInternals.TimerData> navigableSet) {
            Instant outputTimestamp = navigableSet.first().getOutputTimestamp();
            Iterator<TimerInternals.TimerData> it = navigableSet.iterator();
            while (it.hasNext()) {
                outputTimestamp = (Instant) WatermarkManager.INSTANT_ORDERING.min(it.next().getOutputTimestamp(), outputTimestamp);
            }
            return outputTimestamp;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized void updateTimers(TimerUpdate timerUpdate) {
            TimerInternals.TimerData timerData;
            Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap = timerMap(timerUpdate.key);
            Table<StateNamespace, String, TimerInternals.TimerData> computeIfAbsent = this.existingTimers.computeIfAbsent(timerUpdate.key, structuralKey -> {
                return HashBasedTable.create();
            });
            for (TimerInternals.TimerData timerData2 : timerUpdate.setTimers) {
                NavigableSet<TimerInternals.TimerData> navigableSet = timerMap.get(timerData2.getDomain());
                if (navigableSet != null) {
                    TimerInternals.TimerData timerData3 = (TimerInternals.TimerData) computeIfAbsent.get(timerData2.getNamespace(), timerData2.getTimerId() + '+' + timerData2.getTimerFamilyId());
                    if (timerData3 == null) {
                        navigableSet.add(timerData2);
                    } else if (!timerData3.equals(timerData2)) {
                        navigableSet.remove(timerData3);
                        navigableSet.add(timerData2);
                    }
                    computeIfAbsent.put(timerData2.getNamespace(), timerData2.getTimerId() + '+' + timerData2.getTimerFamilyId(), timerData2);
                }
            }
            for (TimerInternals.TimerData timerData4 : timerUpdate.deletedTimers) {
                NavigableSet<TimerInternals.TimerData> navigableSet2 = timerMap.get(timerData4.getDomain());
                if (navigableSet2 != null && (timerData = (TimerInternals.TimerData) computeIfAbsent.get(timerData4.getNamespace(), timerData4.getTimerId() + '+' + timerData4.getTimerFamilyId())) != null) {
                    this.pendingTimers.remove(timerData4);
                    navigableSet2.remove(timerData4);
                    computeIfAbsent.remove(timerData.getNamespace(), timerData.getTimerId() + '+' + timerData.getTimerFamilyId());
                }
            }
            Iterator it = timerUpdate.completedTimers.iterator();
            while (it.hasNext()) {
                this.pendingTimers.remove((TimerInternals.TimerData) it.next());
            }
            Iterables.concat(timerUpdate.getCompletedTimers(), timerUpdate.getDeletedTimers(), timerUpdate.getSetTimers()).forEach(this.timerUpdateNotification);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public synchronized Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredDomainTimers(TimeDomain timeDomain, Instant instant) {
            Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredTimers;
            switch (AnonymousClass2.$SwitchMap$org$apache$beam$sdk$state$TimeDomain[timeDomain.ordinal()]) {
                case 1:
                    extractFiredTimers = WatermarkManager.extractFiredTimers(instant, this.processingTimers);
                    break;
                case 2:
                    extractFiredTimers = WatermarkManager.extractFiredTimers((Instant) WatermarkManager.INSTANT_ORDERING.min(instant, this.earliestHold.get()), this.synchronizedProcessingTimers);
                    break;
                default:
                    throw new IllegalArgumentException("Called getFiredTimers on a Synchronized Processing Time watermark and gave a non-processing time domain " + timeDomain);
            }
            Iterator<Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>>> it = extractFiredTimers.entrySet().iterator();
            while (it.hasNext()) {
                this.pendingTimers.addAll(it.next().getValue());
            }
            return extractFiredTimers;
        }

        private Map<TimeDomain, NavigableSet<TimerInternals.TimerData>> timerMap(StructuralKey<?> structuralKey) {
            NavigableSet<TimerInternals.TimerData> computeIfAbsent = this.processingTimers.computeIfAbsent(structuralKey, structuralKey2 -> {
                return new TreeSet();
            });
            NavigableSet<TimerInternals.TimerData> computeIfAbsent2 = this.synchronizedProcessingTimers.computeIfAbsent(structuralKey, structuralKey3 -> {
                return new TreeSet();
            });
            EnumMap enumMap = new EnumMap(TimeDomain.class);
            enumMap.put((EnumMap) TimeDomain.PROCESSING_TIME, (TimeDomain) computeIfAbsent);
            enumMap.put((EnumMap) TimeDomain.SYNCHRONIZED_PROCESSING_TIME, (TimeDomain) computeIfAbsent2);
            return enumMap;
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeInputWatermark.class).add("earliestHold", this.earliestHold).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$SynchronizedProcessingTimeOutputWatermark.class */
    public static class SynchronizedProcessingTimeOutputWatermark implements Watermark {
        private final String name;
        private final SynchronizedProcessingTimeInputWatermark inputWm;
        private final PerKeyHolds holds = new PerKeyHolds();
        private AtomicReference<Instant> latestRefresh = new AtomicReference<>(BoundedWindow.TIMESTAMP_MIN_VALUE);

        public SynchronizedProcessingTimeOutputWatermark(String str, SynchronizedProcessingTimeInputWatermark synchronizedProcessingTimeInputWatermark) {
            this.name = str;
            this.inputWm = synchronizedProcessingTimeInputWatermark;
        }

        public synchronized void updateHold(Object obj, Instant instant) {
            if (instant == null) {
                this.holds.removeHold(obj);
            } else {
                this.holds.updateHold(obj, instant);
            }
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public String getName() {
            return this.name;
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public Instant get() {
            return this.latestRefresh.get();
        }

        @Override // org.apache.beam.runners.direct.WatermarkManager.Watermark
        public synchronized WatermarkUpdate refresh() {
            Instant instant = this.latestRefresh.get();
            Instant instant2 = (Instant) WatermarkManager.INSTANT_ORDERING.min(this.inputWm.get(), this.holds.getMinHold(), this.inputWm.getEarliestTimerTimestamp(), new Instant[0]);
            this.latestRefresh.set(instant2);
            return WatermarkManager.updateAndTrace(getName(), instant, instant2);
        }

        public synchronized String toString() {
            return MoreObjects.toStringHelper(SynchronizedProcessingTimeOutputWatermark.class).add("holds", this.holds).add("latestRefresh", this.latestRefresh).toString();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$TimerUpdate.class */
    public static class TimerUpdate {
        private final StructuralKey<?> key;
        private final Iterable<? extends TimerInternals.TimerData> completedTimers;
        private final Iterable<? extends TimerInternals.TimerData> setTimers;
        private final Iterable<? extends TimerInternals.TimerData> deletedTimers;
        private final Iterable<? extends TimerInternals.TimerData> pushedBackTimers;

        /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$TimerUpdate$TimerUpdateBuilder.class */
        public static final class TimerUpdateBuilder {
            private final StructuralKey<?> key;
            private final Collection<TimerInternals.TimerData> completedTimers;
            private final Collection<TimerInternals.TimerData> setTimers;
            private final Collection<TimerInternals.TimerData> deletedTimers;

            private TimerUpdateBuilder(StructuralKey<?> structuralKey) {
                this.key = structuralKey;
                this.completedTimers = new LinkedHashSet();
                this.setTimers = new LinkedHashSet();
                this.deletedTimers = new LinkedHashSet();
            }

            public TimerUpdateBuilder withCompletedTimers(Iterable<TimerInternals.TimerData> iterable) {
                Iterables.addAll(this.completedTimers, iterable);
                return this;
            }

            public TimerUpdateBuilder setTimer(TimerInternals.TimerData timerData) {
                Preconditions.checkArgument(timerData.getTimestamp().isBefore(BoundedWindow.TIMESTAMP_MAX_VALUE), "Got a timer for after the end of time (%s), got %s", BoundedWindow.TIMESTAMP_MAX_VALUE, timerData.getTimestamp());
                this.deletedTimers.remove(timerData);
                this.setTimers.add(timerData);
                return this;
            }

            public TimerUpdateBuilder deletedTimer(TimerInternals.TimerData timerData) {
                this.deletedTimers.add(timerData);
                this.setTimers.remove(timerData);
                return this;
            }

            public TimerUpdate build() {
                return new TimerUpdate(this.key, ImmutableList.copyOf(this.completedTimers), ImmutableList.copyOf(this.setTimers), ImmutableList.copyOf(this.deletedTimers), Collections.emptyList());
            }
        }

        public static TimerUpdate empty() {
            return new TimerUpdate(null, Collections.emptyList(), Collections.emptyList(), Collections.emptyList(), Collections.emptyList());
        }

        public static TimerUpdateBuilder builder(StructuralKey<?> structuralKey) {
            return new TimerUpdateBuilder(structuralKey);
        }

        private static Map<String, TimerInternals.TimerData> indexTimerData(Iterable<? extends TimerInternals.TimerData> iterable) {
            return (Map) StreamSupport.stream(iterable.spliterator(), false).collect(Collectors.toMap(TimerUpdate::getTimerIdAndTimerFamilyIdWithNamespace, timerData -> {
                return timerData;
            }, (timerData2, timerData3) -> {
                return timerData3;
            }));
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static String getTimerIdAndTimerFamilyIdWithNamespace(TimerInternals.TimerData timerData) {
            return timerData.getNamespace() + timerData.getTimerId() + timerData.getTimerFamilyId();
        }

        private TimerUpdate(StructuralKey<?> structuralKey, Iterable<? extends TimerInternals.TimerData> iterable, Iterable<? extends TimerInternals.TimerData> iterable2, Iterable<? extends TimerInternals.TimerData> iterable3, Iterable<? extends TimerInternals.TimerData> iterable4) {
            this.key = structuralKey;
            this.completedTimers = iterable;
            this.setTimers = iterable2;
            this.deletedTimers = iterable3;
            this.pushedBackTimers = iterable4;
        }

        @VisibleForTesting
        StructuralKey<?> getKey() {
            return this.key;
        }

        @VisibleForTesting
        public Iterable<? extends TimerInternals.TimerData> getCompletedTimers() {
            return this.completedTimers;
        }

        @VisibleForTesting
        public Iterable<? extends TimerInternals.TimerData> getSetTimers() {
            return this.setTimers;
        }

        @VisibleForTesting
        public Iterable<? extends TimerInternals.TimerData> getDeletedTimers() {
            return this.deletedTimers;
        }

        Iterable<? extends TimerInternals.TimerData> getPushedBackTimers() {
            return this.pushedBackTimers;
        }

        boolean isEmpty() {
            return Iterables.isEmpty(this.completedTimers) && Iterables.isEmpty(this.setTimers) && Iterables.isEmpty(this.deletedTimers) && Iterables.isEmpty(this.pushedBackTimers);
        }

        public TimerUpdate withCompletedTimers(Iterable<TimerInternals.TimerData> iterable) {
            ArrayList arrayList = new ArrayList();
            HashSet newHashSet = Sets.newHashSet(this.pushedBackTimers);
            Map<String, TimerInternals.TimerData> indexTimerData = indexTimerData(this.setTimers);
            for (TimerInternals.TimerData timerData : iterable) {
                String timerIdAndTimerFamilyIdWithNamespace = getTimerIdAndTimerFamilyIdWithNamespace(timerData);
                if (!newHashSet.contains(timerData)) {
                    arrayList.add(timerData);
                } else if (!indexTimerData.containsKey(timerIdAndTimerFamilyIdWithNamespace)) {
                    indexTimerData.put(timerIdAndTimerFamilyIdWithNamespace, timerData);
                }
            }
            return new TimerUpdate(this.key, arrayList, indexTimerData.values(), this.deletedTimers, Collections.emptyList());
        }

        public TimerUpdate withPushedBackTimers(Iterable<TimerInternals.TimerData> iterable) {
            return new TimerUpdate(this.key, this.completedTimers, this.setTimers, this.deletedTimers, Lists.newArrayList(iterable));
        }

        public int hashCode() {
            return Objects.hash(this.key, this.completedTimers, this.setTimers, this.deletedTimers);
        }

        public boolean equals(Object obj) {
            if (obj == null || !(obj instanceof TimerUpdate)) {
                return false;
            }
            TimerUpdate timerUpdate = (TimerUpdate) obj;
            return Objects.equals(this.key, timerUpdate.key) && Objects.equals(this.completedTimers, timerUpdate.completedTimers) && Objects.equals(this.setTimers, timerUpdate.setTimers) && Objects.equals(this.deletedTimers, timerUpdate.deletedTimers);
        }

        public String toString() {
            return MoreObjects.toStringHelper(this).add("key", this.key).add("setTimers", this.setTimers).add("completedTimers", this.completedTimers).add("deletedTimers", this.deletedTimers).add("pushedBackTimers", this.pushedBackTimers).toString();
        }
    }

    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$TransformWatermarks.class */
    public class TransformWatermarks {
        private final ExecutableT executable;
        private final AppliedPTransformInputWatermark inputWatermark;
        private final AppliedPTransformOutputWatermark outputWatermark;
        private final SynchronizedProcessingTimeInputWatermark synchronizedProcessingInputWatermark;
        private final SynchronizedProcessingTimeOutputWatermark synchronizedProcessingOutputWatermark;
        private Instant latestSynchronizedInputWm;
        private Instant latestSynchronizedOutputWm;
        private final ReadWriteLock transformWatermarkLock;

        private TransformWatermarks(ExecutableT executablet, AppliedPTransformInputWatermark appliedPTransformInputWatermark, AppliedPTransformOutputWatermark appliedPTransformOutputWatermark, SynchronizedProcessingTimeInputWatermark synchronizedProcessingTimeInputWatermark, SynchronizedProcessingTimeOutputWatermark synchronizedProcessingTimeOutputWatermark) {
            this.transformWatermarkLock = new ReentrantReadWriteLock();
            this.executable = executablet;
            this.inputWatermark = appliedPTransformInputWatermark;
            this.outputWatermark = appliedPTransformOutputWatermark;
            this.synchronizedProcessingInputWatermark = synchronizedProcessingTimeInputWatermark;
            this.synchronizedProcessingOutputWatermark = synchronizedProcessingTimeOutputWatermark;
            this.latestSynchronizedInputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
            this.latestSynchronizedOutputWm = BoundedWindow.TIMESTAMP_MIN_VALUE;
        }

        public Instant getInputWatermark() {
            return (Instant) Preconditions.checkNotNull(this.inputWatermark.get());
        }

        public Instant getOutputWatermark() {
            return this.outputWatermark.get();
        }

        public synchronized Instant getSynchronizedProcessingInputTime() {
            this.latestSynchronizedInputWm = (Instant) WatermarkManager.INSTANT_ORDERING.max(this.latestSynchronizedInputWm, (Instant) WatermarkManager.INSTANT_ORDERING.min(WatermarkManager.this.clock.now(), this.synchronizedProcessingInputWatermark.get()));
            return this.latestSynchronizedInputWm;
        }

        public synchronized Instant getSynchronizedProcessingOutputTime() {
            this.latestSynchronizedOutputWm = (Instant) WatermarkManager.INSTANT_ORDERING.max(this.latestSynchronizedOutputWm, (Instant) WatermarkManager.INSTANT_ORDERING.min(WatermarkManager.this.clock.now(), this.synchronizedProcessingOutputWatermark.get()));
            return this.latestSynchronizedOutputWm;
        }

        private ReadWriteLock getWatermarkLock() {
            return this.transformWatermarkLock;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public WatermarkUpdate refresh() {
            this.inputWatermark.refresh();
            this.synchronizedProcessingInputWatermark.refresh();
            return this.outputWatermark.refresh().union(this.synchronizedProcessingOutputWatermark.refresh());
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setEventTimeHold(Object obj, Instant instant) {
            this.outputWatermark.updateHold(obj, instant);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void setSynchronizedProcessingTimeHold(Object obj, Instant instant) {
            this.synchronizedProcessingOutputWatermark.updateHold(obj, instant);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void removePending(Bundle<?, ?> bundle) {
            this.inputWatermark.removePending(bundle);
            this.synchronizedProcessingInputWatermark.removePending(bundle);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void addPending(Bundle<?, ?> bundle) {
            this.inputWatermark.addPending(bundle);
            this.synchronizedProcessingInputWatermark.addPending(bundle);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public Collection<FiredTimers<ExecutableT>> extractFiredTimers() {
            Map<StructuralKey<?>, List<TimerInternals.TimerData>> groupFiredTimers = groupFiredTimers(this.inputWatermark.extractFiredEventTimeTimers(), this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.PROCESSING_TIME, WatermarkManager.this.clock.now()), this.synchronizedProcessingInputWatermark.extractFiredDomainTimers(TimeDomain.SYNCHRONIZED_PROCESSING_TIME, getSynchronizedProcessingInputTime()));
            ArrayList arrayList = new ArrayList(groupFiredTimers.size());
            for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> entry : groupFiredTimers.entrySet()) {
                arrayList.add(new FiredTimers(this.executable, entry.getKey(), entry.getValue()));
            }
            return arrayList;
        }

        @SafeVarargs
        private final Map<StructuralKey<?>, List<TimerInternals.TimerData>> groupFiredTimers(Map<StructuralKey<?>, List<TimerInternals.TimerData>>... mapArr) {
            HashMap hashMap = new HashMap();
            for (Map<StructuralKey<?>, List<TimerInternals.TimerData>> map : mapArr) {
                for (Map.Entry<StructuralKey<?>, List<TimerInternals.TimerData>> entry : map.entrySet()) {
                    ((List) hashMap.computeIfAbsent(entry.getKey(), structuralKey -> {
                        return new ArrayList();
                    })).addAll(entry.getValue());
                }
            }
            return hashMap;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void updateTimers(TimerUpdate timerUpdate) {
            this.inputWatermark.updateTimers(timerUpdate);
            this.synchronizedProcessingInputWatermark.updateTimers(timerUpdate);
        }

        public String toString() {
            return MoreObjects.toStringHelper(TransformWatermarks.class).add("inputWatermark", this.inputWatermark).add("outputWatermark", this.outputWatermark).add("inputProcessingTime", this.synchronizedProcessingInputWatermark).add("outputProcessingTime", this.synchronizedProcessingOutputWatermark).toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    @VisibleForTesting
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$Watermark.class */
    public interface Watermark {
        String getName();

        Instant get();

        WatermarkUpdate refresh();
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/runners/direct/WatermarkManager$WatermarkUpdate.class */
    public enum WatermarkUpdate {
        ADVANCED(true),
        NO_CHANGE(false);

        private final boolean advanced;

        WatermarkUpdate(boolean z) {
            this.advanced = z;
        }

        public boolean isAdvanced() {
            return this.advanced;
        }

        public WatermarkUpdate union(WatermarkUpdate watermarkUpdate) {
            return this.advanced ? this : watermarkUpdate;
        }

        public static WatermarkUpdate fromTimestamps(Instant instant, Instant instant2) {
            return instant2.isAfter(instant) ? ADVANCED : NO_CHANGE;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static WatermarkUpdate updateAndTrace(String str, Instant instant, Instant instant2) {
        WatermarkUpdate fromTimestamps = WatermarkUpdate.fromTimestamps(instant, instant2);
        if (fromTimestamps.isAdvanced()) {
            WindowTracing.debug("Watermark {} advanced from {} to {}", new Object[]{str, instant, instant2});
        }
        return fromTimestamps;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static Map<StructuralKey<?>, List<TimerInternals.TimerData>> extractFiredTimers(Instant instant, Map<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> map) {
        HashMap hashMap = new HashMap();
        HashSet hashSet = new HashSet();
        for (Map.Entry<StructuralKey<?>, NavigableSet<TimerInternals.TimerData>> entry : map.entrySet()) {
            NavigableSet<TimerInternals.TimerData> value = entry.getValue();
            if (!value.isEmpty() && value.first().getTimestamp().isBefore(instant)) {
                ArrayList arrayList = new ArrayList();
                hashMap.put(entry.getKey(), arrayList);
                while (!value.isEmpty() && value.first().getTimestamp().isBefore(instant)) {
                    arrayList.add(value.first());
                    value.remove(value.first());
                }
            }
            if (value.isEmpty()) {
                hashSet.add(entry.getKey());
            }
        }
        map.keySet().removeAll(hashSet);
        return hashMap;
    }

    public static <ExecutableT, CollectionT> WatermarkManager<ExecutableT, ? super CollectionT> create(Clock clock, ExecutableGraph<ExecutableT, ? super CollectionT> executableGraph, Function<ExecutableT, String> function) {
        return new WatermarkManager<>(clock, executableGraph, function);
    }

    private WatermarkManager(Clock clock, ExecutableGraph<ExecutableT, CollectionT> executableGraph, Function<ExecutableT, String> function) {
        this.clock = clock;
        this.graph = executableGraph;
        this.getName = function;
        Iterator<ExecutableT> it = executableGraph.getRootTransforms2().iterator();
        while (it.hasNext()) {
            getTransformWatermark(it.next());
        }
        Iterator<ExecutableT> it2 = executableGraph.getExecutables().iterator();
        while (it2.hasNext()) {
            getTransformWatermark(it2.next());
        }
    }

    private WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks getValueWatermark(CollectionT collectiont) {
        return getTransformWatermark(this.graph.getProducer(collectiont));
    }

    private WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks getTransformWatermark(ExecutableT executablet) {
        String apply = this.getName.apply(executablet);
        WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks transformWatermarks = this.transformToWatermarks.get(executablet);
        if (transformWatermarks == null) {
            AppliedPTransformInputWatermark appliedPTransformInputWatermark = new AppliedPTransformInputWatermark(apply + ".in", getInputWatermarks(executablet), timerUpdateConsumer(this.transformsWithAlreadyExtractedTimers, executablet));
            AppliedPTransformOutputWatermark appliedPTransformOutputWatermark = new AppliedPTransformOutputWatermark(apply + ".out", appliedPTransformInputWatermark);
            SynchronizedProcessingTimeInputWatermark synchronizedProcessingTimeInputWatermark = new SynchronizedProcessingTimeInputWatermark(apply + ".inProcessing", getInputProcessingWatermarks(executablet), timerUpdateConsumer(this.transformsWithAlreadyExtractedTimers, executablet));
            transformWatermarks = new TransformWatermarks(executablet, appliedPTransformInputWatermark, appliedPTransformOutputWatermark, synchronizedProcessingTimeInputWatermark, new SynchronizedProcessingTimeOutputWatermark(apply + ".outProcessing", synchronizedProcessingTimeInputWatermark));
            this.transformToWatermarks.put(executablet, transformWatermarks);
        }
        return transformWatermarks;
    }

    private static <ExecutableT> Consumer<TimerInternals.TimerData> timerUpdateConsumer(Map<ExecutableT, Set<String>> map, ExecutableT executablet) {
        return timerData -> {
            String timerIdAndTimerFamilyIdWithNamespace = TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(timerData);
            map.compute(executablet, (obj, set) -> {
                if (set != null) {
                    set.remove(timerIdAndTimerFamilyIdWithNamespace);
                    if (set.isEmpty()) {
                        set = null;
                    }
                }
                return set;
            });
        };
    }

    private Collection<Watermark> getInputProcessingWatermarks(ExecutableT executablet) {
        ImmutableList.Builder builder = ImmutableList.builder();
        Collection<CollectionT> perElementInputs = this.graph.getPerElementInputs(executablet);
        if (perElementInputs.isEmpty()) {
            builder.add(THE_END_OF_TIME);
        }
        Iterator<CollectionT> it = perElementInputs.iterator();
        while (it.hasNext()) {
            builder.add(((TransformWatermarks) getValueWatermark(it.next())).synchronizedProcessingOutputWatermark);
        }
        return builder.build();
    }

    private List<Watermark> getInputWatermarks(ExecutableT executablet) {
        ImmutableList.Builder builder = ImmutableList.builder();
        Collection<CollectionT> perElementInputs = this.graph.getPerElementInputs(executablet);
        if (perElementInputs.isEmpty()) {
            builder.add(THE_END_OF_TIME);
        }
        Iterator<CollectionT> it = perElementInputs.iterator();
        while (it.hasNext()) {
            builder.add(((TransformWatermarks) getValueWatermark(it.next())).outputWatermark);
        }
        return builder.build();
    }

    public WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks getWatermarks(ExecutableT executablet) {
        return this.transformToWatermarks.get(executablet);
    }

    public void initialize(Map<ExecutableT, ? extends Iterable<Bundle<?, CollectionT>>> map) {
        this.refreshLock.lock();
        try {
            for (Map.Entry<ExecutableT, ? extends Iterable<Bundle<?, CollectionT>>> entry : map.entrySet()) {
                WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks transformWatermarks = this.transformToWatermarks.get(entry.getKey());
                Iterator<Bundle<?, CollectionT>> it = entry.getValue().iterator();
                while (it.hasNext()) {
                    transformWatermarks.addPending(it.next());
                }
                this.pendingRefreshes.add(entry.getKey());
            }
        } finally {
            this.refreshLock.unlock();
        }
    }

    public void updateWatermarks(Bundle<?, ? extends CollectionT> bundle, TimerUpdate timerUpdate, ExecutableT executablet, Bundle<?, ? extends CollectionT> bundle2, Iterable<? extends Bundle<?, ? extends CollectionT>> iterable, Instant instant) {
        this.pendingUpdates.offer(PendingWatermarkUpdate.create(executablet, bundle, timerUpdate, bundle2, iterable, instant));
        tryApplyPendingUpdates();
    }

    private void tryApplyPendingUpdates() {
        if (this.refreshLock.tryLock()) {
            try {
                applyNUpdates(10);
            } finally {
                this.refreshLock.unlock();
            }
        }
    }

    private void applyAllPendingUpdates() {
        this.refreshLock.lock();
        try {
            applyNUpdates(-1);
        } finally {
            this.refreshLock.unlock();
        }
    }

    @GuardedBy("refreshLock")
    private void applyNUpdates(int i) {
        int i2 = 0;
        while (!this.pendingUpdates.isEmpty()) {
            if (i2 >= i && i > 0) {
                return;
            }
            PendingWatermarkUpdate<ExecutableT, CollectionT> poll = this.pendingUpdates.poll();
            applyPendingUpdate(poll);
            this.pendingRefreshes.add(poll.getExecutable());
            i2++;
        }
    }

    private void applyPendingUpdate(PendingWatermarkUpdate<ExecutableT, CollectionT> pendingWatermarkUpdate) {
        ExecutableT executable = pendingWatermarkUpdate.getExecutable();
        Bundle<?, ? extends CollectionT> inputBundle = pendingWatermarkUpdate.getInputBundle();
        updatePending(inputBundle, pendingWatermarkUpdate.getTimerUpdate(), executable, pendingWatermarkUpdate.getUnprocessedInputs(), pendingWatermarkUpdate.getOutputs());
        WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks transformWatermarks = this.transformToWatermarks.get(executable);
        transformWatermarks.setEventTimeHold(inputBundle == null ? null : inputBundle.getKey(), pendingWatermarkUpdate.getEarliestHold());
        transformWatermarks.setSynchronizedProcessingTimeHold(inputBundle == null ? null : inputBundle.getKey(), pendingWatermarkUpdate.getEarliestHold());
    }

    private void updatePending(Bundle<?, ? extends CollectionT> bundle, TimerUpdate timerUpdate, ExecutableT executablet, Bundle<?, ? extends CollectionT> bundle2, Iterable<? extends Bundle<?, ? extends CollectionT>> iterable) {
        for (Bundle<?, ? extends CollectionT> bundle3 : iterable) {
            Iterator<ExecutableT> it = this.graph.getPerElementConsumers(bundle3.getPCollection()).iterator();
            while (it.hasNext()) {
                this.transformToWatermarks.get(it.next()).addPending(bundle3);
            }
        }
        WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks transformWatermarks = this.transformToWatermarks.get(executablet);
        if (bundle2 != null) {
            transformWatermarks.addPending(bundle2);
        }
        transformWatermarks.updateTimers(timerUpdate);
        if (bundle != null) {
            transformWatermarks.removePending(bundle);
        }
    }

    public synchronized void refreshAll() {
        this.refreshLock.lock();
        try {
            applyAllPendingUpdates();
            Set<ExecutableT> set = this.pendingRefreshes;
            while (!set.isEmpty()) {
                set = refreshAllOf(set);
            }
            this.pendingRefreshes.clear();
        } finally {
            this.refreshLock.unlock();
        }
    }

    private Set<ExecutableT> refreshAllOf(Set<ExecutableT> set) {
        HashSet hashSet = new HashSet();
        Iterator<ExecutableT> it = set.iterator();
        while (it.hasNext()) {
            hashSet.addAll(refreshWatermarks(it.next()));
        }
        return hashSet;
    }

    private Set<ExecutableT> refreshWatermarks(ExecutableT executablet) {
        if (!this.transformToWatermarks.get(executablet).refresh().isAdvanced()) {
            return Collections.emptySet();
        }
        HashSet hashSet = new HashSet();
        Iterator<CollectionT> it = this.graph.getProduced(executablet).iterator();
        while (it.hasNext()) {
            hashSet.addAll(this.graph.getPerElementConsumers(it.next()));
        }
        return hashSet;
    }

    @VisibleForTesting
    Collection<FiredTimers<ExecutableT>> extractFiredTimers() {
        return extractFiredTimers(Collections.emptyList());
    }

    public Collection<FiredTimers<ExecutableT>> extractFiredTimers(Collection<ExecutableT> collection) {
        ArrayList arrayList = new ArrayList();
        this.refreshLock.lock();
        try {
            for (Map.Entry<ExecutableT, WatermarkManager<ExecutableT, CollectionT>.TransformWatermarks> entry : this.transformToWatermarks.entrySet()) {
                ExecutableT key = entry.getKey();
                if (!collection.contains(key)) {
                    if (!this.transformsWithAlreadyExtractedTimers.containsKey(key)) {
                        Collection extractFiredTimers = entry.getValue().extractFiredTimers();
                        if (!extractFiredTimers.isEmpty()) {
                            List list = (List) extractFiredTimers.stream().flatMap(firedTimers -> {
                                return firedTimers.getTimers().stream();
                            }).collect(Collectors.toList());
                            this.transformsWithAlreadyExtractedTimers.compute(key, (obj, set) -> {
                                if (set == null) {
                                    set = new HashSet();
                                }
                                Set set = set;
                                list.forEach(timerData -> {
                                    set.add(TimerUpdate.getTimerIdAndTimerFamilyIdWithNamespace(timerData));
                                });
                                return set;
                            });
                            arrayList.addAll(extractFiredTimers);
                        }
                    }
                }
            }
            return arrayList;
        } finally {
            this.refreshLock.unlock();
        }
    }
}
