package org.apache.spark.sql.execution.streaming;

import java.util.concurrent.ConcurrentHashMap;
import org.apache.spark.SparkFunSuite;
import org.apache.spark.sql.streaming.util.StreamManualClock;
import org.apache.spark.sql.streaming.util.StreamManualClock$;
import org.scalactic.Bool;
import org.scalactic.Bool$;
import org.scalactic.Equality$;
import org.scalactic.Prettifier$;
import org.scalactic.TripleEqualsSupport;
import org.scalactic.source.Position;
import org.scalatest.Assertions$;
import org.scalatest.concurrent.Eventually$;
import org.scalatest.concurrent.PatienceConfiguration;
import org.scalatest.concurrent.Signaler;
import org.scalatest.concurrent.ThreadSignaler$;
import org.scalatest.enablers.Retrying$;
import org.scalatest.enablers.Timed$;
import org.scalatest.time.Span;
import org.scalatest.time.SpanSugar$;
import scala.Function0;
import scala.Predef$;
import scala.collection.immutable.Nil$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.IntRef;
import scala.runtime.LongRef;
import scala.runtime.RichInt$;
import scala.runtime.VolatileBooleanRef;
import scala.runtime.VolatileLongRef;

/* compiled from: ProcessingTimeExecutorSuite.scala */
@ScalaSignature(bytes = "\u0006\u0005]3A!\u0003\u0006\u0001/!)A\u0005\u0001C\u0001K!9\u0001\u0006\u0001b\u0001\n\u0007I\u0003BB\u0017\u0001A\u0003%!\u0006C\u0004/\u0001\t\u0007I\u0011A\u0018\t\rY\u0002\u0001\u0015!\u00031\u0011\u00159\u0004\u0001\"\u00039\u0011\u0015!\u0005\u0001\"\u0003F\u0011\u0015Y\u0005\u0001\"\u0003M\u0005m\u0001&o\\2fgNLgn\u001a+j[\u0016,\u00050Z2vi>\u00148+^5uK*\u00111\u0002D\u0001\ngR\u0014X-Y7j]\u001eT!!\u0004\b\u0002\u0013\u0015DXmY;uS>t'BA\b\u0011\u0003\r\u0019\u0018\u000f\u001c\u0006\u0003#I\tQa\u001d9be.T!a\u0005\u000b\u0002\r\u0005\u0004\u0018m\u00195f\u0015\u0005)\u0012aA8sO\u000e\u00011c\u0001\u0001\u00199A\u0011\u0011DG\u0007\u0002!%\u00111\u0004\u0005\u0002\u000e'B\f'o\u001b$v]N+\u0018\u000e^3\u0011\u0005u\u0011S\"\u0001\u0010\u000b\u0005}\u0001\u0013AC2p]\u000e,(O]3oi*\u0011\u0011\u0005F\u0001\ng\u000e\fG.\u0019;fgRL!a\t\u0010\u0003\u0015QKW.\u001a'j[&$8/\u0001\u0004=S:LGO\u0010\u000b\u0002MA\u0011q\u0005A\u0007\u0002\u0015\u0005yA-\u001a4bk2$8+[4oC2,'/F\u0001+!\ti2&\u0003\u0002-=\tA1+[4oC2,'/\u0001\teK\u001a\fW\u000f\u001c;TS\u001et\u0017\r\\3sA\u00059A/[7f_V$X#\u0001\u0019\u0011\u0005E\"T\"\u0001\u001a\u000b\u0005M\u0002\u0013\u0001\u0002;j[\u0016L!!\u000e\u001a\u0003\tM\u0003\u0018M\\\u0001\ti&lWm\\;uA\u0005!B/Z:u\u0005\u0006$8\r\u001b+fe6Lg.\u0019;j_:$\"!O \u0011\u0005ijT\"A\u001e\u000b\u0003q\nQa]2bY\u0006L!AP\u001e\u0003\tUs\u0017\u000e\u001e\u0005\u0006\u0001\u001a\u0001\r!Q\u0001\u000bS:$XM\u001d<bY6\u001b\bC\u0001\u001eC\u0013\t\u00195H\u0001\u0003M_:<\u0017AC3wK:$X/\u00197msR\u0011\u0011H\u0012\u0005\u0007\u000f\u001e!\t\u0019\u0001%\u0002\t\t|G-\u001f\t\u0004u%K\u0014B\u0001&<\u0005!a$-\u001f8b[\u0016t\u0014!E<bSR4uN\u001d+ie\u0016\fGMS8j]R\u0011\u0011(\u0014\u0005\u0006\u001d\"\u0001\raT\u0001\u0007i\"\u0014X-\u00193\u0011\u0005A+V\"A)\u000b\u0005I\u001b\u0016\u0001\u00027b]\u001eT\u0011\u0001V\u0001\u0005U\u00064\u0018-\u0003\u0002W#\n1A\u000b\u001b:fC\u0012\u0004")
/* loaded from: input_file:org/apache/spark/sql/execution/streaming/ProcessingTimeExecutorSuite.class */
public class ProcessingTimeExecutorSuite extends SparkFunSuite {
    private final Signaler defaultSignaler = ThreadSignaler$.MODULE$;
    private final Span timeout = SpanSugar$.MODULE$.convertIntToGrainOfTime(10).seconds();

    public Signaler defaultSignaler() {
        return this.defaultSignaler;
    }

    public Span timeout() {
        return this.timeout;
    }

    private void testBatchTermination(long j) {
        IntRef create = IntRef.create(0);
        new ProcessingTimeExecutor(new ProcessingTimeTrigger(j), ProcessingTimeExecutor$.MODULE$.apply$default$2()).execute(() -> {
            create.elem++;
            return create.elem < 3;
        });
        TripleEqualsSupport.Equalizer convertToEqualizer = convertToEqualizer(BoxesRunTime.boxToInteger(create.elem));
        Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(3), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(3), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 119));
    }

    private void eventually(Function0<BoxedUnit> function0) {
        Eventually$.MODULE$.eventually(new PatienceConfiguration.Timeout(timeout()), function0, Eventually$.MODULE$.patienceConfig(), Retrying$.MODULE$.retryingNatureOfT(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 152));
    }

    private void waitForThreadJoin(Thread thread) {
        failAfter(timeout(), () -> {
            thread.join();
        }, defaultSignaler(), Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 156), Timed$.MODULE$.timed());
    }

    public ProcessingTimeExecutorSuite() {
        test("nextBatchTime", Nil$.MODULE$, () -> {
            ProcessingTimeExecutor processingTimeExecutor = new ProcessingTimeExecutor(new ProcessingTimeTrigger(100L), ProcessingTimeExecutor$.MODULE$.apply$default$2());
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToLong(processingTimeExecutor.nextBatchTime(0L)));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(100), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(100), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 38));
            TripleEqualsSupport.Equalizer convertToEqualizer2 = this.convertToEqualizer(BoxesRunTime.boxToLong(processingTimeExecutor.nextBatchTime(1L)));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer2, "===", BoxesRunTime.boxToInteger(100), convertToEqualizer2.$eq$eq$eq(BoxesRunTime.boxToInteger(100), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 39));
            TripleEqualsSupport.Equalizer convertToEqualizer3 = this.convertToEqualizer(BoxesRunTime.boxToLong(processingTimeExecutor.nextBatchTime(99L)));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer3, "===", BoxesRunTime.boxToInteger(100), convertToEqualizer3.$eq$eq$eq(BoxesRunTime.boxToInteger(100), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 40));
            TripleEqualsSupport.Equalizer convertToEqualizer4 = this.convertToEqualizer(BoxesRunTime.boxToLong(processingTimeExecutor.nextBatchTime(100L)));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer4, "===", BoxesRunTime.boxToInteger(200), convertToEqualizer4.$eq$eq$eq(BoxesRunTime.boxToInteger(200), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 41));
            TripleEqualsSupport.Equalizer convertToEqualizer5 = this.convertToEqualizer(BoxesRunTime.boxToLong(processingTimeExecutor.nextBatchTime(101L)));
            Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer5, "===", BoxesRunTime.boxToInteger(200), convertToEqualizer5.$eq$eq$eq(BoxesRunTime.boxToInteger(200), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 42));
            TripleEqualsSupport.Equalizer convertToEqualizer6 = this.convertToEqualizer(BoxesRunTime.boxToLong(processingTimeExecutor.nextBatchTime(150L)));
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer6, "===", BoxesRunTime.boxToInteger(200), convertToEqualizer6.$eq$eq$eq(BoxesRunTime.boxToInteger(200), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 43));
        }, new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 36));
        test("trigger timing", Nil$.MODULE$, () -> {
            final ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
            final StreamManualClock streamManualClock = new StreamManualClock(StreamManualClock$.MODULE$.$lessinit$greater$default$1());
            final VolatileBooleanRef create = VolatileBooleanRef.create(true);
            final VolatileLongRef create2 = VolatileLongRef.create(0L);
            final ProcessingTimeExecutor processingTimeExecutor = new ProcessingTimeExecutor(ProcessingTimeTrigger$.MODULE$.apply("1000 milliseconds"), streamManualClock);
            final ProcessingTimeExecutorSuite processingTimeExecutorSuite = null;
            Thread thread = new Thread(processingTimeExecutorSuite, processingTimeExecutor, newKeySet, streamManualClock, create2, create) { // from class: org.apache.spark.sql.execution.streaming.ProcessingTimeExecutorSuite$$anon$1
                private final ProcessingTimeExecutor executor$1;
                private final ConcurrentHashMap.KeySetView triggerTimes$1;
                private final StreamManualClock clock$1;
                private final VolatileLongRef clockIncrementInTrigger$1;
                private final VolatileBooleanRef continueExecuting$1;

                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    this.executor$1.execute(() -> {
                        this.triggerTimes$1.add(BoxesRunTime.boxToInteger((int) this.clock$1.getTimeMillis()));
                        this.clock$1.advance(this.clockIncrementInTrigger$1.elem);
                        this.clockIncrementInTrigger$1.elem = 0L;
                        return this.continueExecuting$1.elem;
                    });
                }

                {
                    this.executor$1 = processingTimeExecutor;
                    this.triggerTimes$1 = newKeySet;
                    this.clock$1 = streamManualClock;
                    this.clockIncrementInTrigger$1 = create2;
                    this.continueExecuting$1 = create;
                }
            };
            thread.start();
            this.eventually(() -> {
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(newKeySet, "contains", BoxesRunTime.boxToInteger(0), newKeySet.contains(BoxesRunTime.boxToInteger(0)), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 66));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingAt(0L), "clock.isStreamWaitingAt(0L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 67));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingFor(1000L), "clock.isStreamWaitingFor(1000L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 68));
            });
            create2.elem = 500L;
            streamManualClock.setTime(1000L);
            this.eventually(() -> {
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(newKeySet, "contains", BoxesRunTime.boxToInteger(1000), newKeySet.contains(BoxesRunTime.boxToInteger(1000)), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 76));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingAt(1500L), "clock.isStreamWaitingAt(1500L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 77));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingFor(2000L), "clock.isStreamWaitingFor(2000L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 78));
            });
            create2.elem = 1500L;
            streamManualClock.setTime(2000L);
            this.eventually(() -> {
                Bool binaryMacroBool = Bool$.MODULE$.binaryMacroBool(newKeySet, "contains", BoxesRunTime.boxToInteger(2000), newKeySet.contains(BoxesRunTime.boxToInteger(2000)), Prettifier$.MODULE$.default());
                Bool binaryMacroBool2 = binaryMacroBool.value() ? Bool$.MODULE$.binaryMacroBool(newKeySet, "contains", BoxesRunTime.boxToInteger(3500), newKeySet.contains(BoxesRunTime.boxToInteger(3500)), Prettifier$.MODULE$.default()) : Bool$.MODULE$.simpleMacroBool(false, "", Prettifier$.MODULE$.default());
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(binaryMacroBool, "&&", binaryMacroBool2, binaryMacroBool.$amp$amp(() -> {
                    return binaryMacroBool2;
                }), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 88));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingAt(3500L), "clock.isStreamWaitingAt(3500L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 89));
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingFor(4000L), "clock.isStreamWaitingFor(4000L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 90));
            });
            create.elem = false;
            streamManualClock.advance(1000L);
            this.waitForThreadJoin(thread);
        }, new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 46));
        test("calling nextBatchTime with the result of a previous call should return the next interval", Nil$.MODULE$, () -> {
            ProcessingTimeExecutor processingTimeExecutor = new ProcessingTimeExecutor(new ProcessingTimeTrigger(100), ProcessingTimeExecutor$.MODULE$.apply$default$2());
            LongRef create = LongRef.create(0L);
            RichInt$.MODULE$.to$extension(Predef$.MODULE$.intWrapper(1), 10).foreach$mVc$sp(i -> {
                create.elem = processingTimeExecutor.nextBatchTime(create.elem);
            });
            TripleEqualsSupport.Equalizer convertToEqualizer = this.convertToEqualizer(BoxesRunTime.boxToLong(create.elem));
            int i2 = 100 * 10;
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.binaryMacroBool(convertToEqualizer, "===", BoxesRunTime.boxToInteger(i2), convertToEqualizer.$eq$eq$eq(BoxesRunTime.boxToInteger(i2), Equality$.MODULE$.default()), Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 108));
        }, new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 97));
        test("batch termination", Nil$.MODULE$, () -> {
            this.testBatchTermination(0L);
            this.testBatchTermination(10L);
        }, new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 122));
        test("notifyBatchFallingBehind", Nil$.MODULE$, () -> {
            StreamManualClock streamManualClock = new StreamManualClock(StreamManualClock$.MODULE$.$lessinit$greater$default$1());
            VolatileBooleanRef create = VolatileBooleanRef.create(false);
            ProcessingTimeExecutorSuite$$anon$2 processingTimeExecutorSuite$$anon$2 = new ProcessingTimeExecutorSuite$$anon$2(null, streamManualClock, create);
            processingTimeExecutorSuite$$anon$2.start();
            this.eventually(() -> {
                Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(streamManualClock.isStreamWaitingFor(200L), "clock.isStreamWaitingFor(200L)", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 145));
            });
            streamManualClock.advance(200L);
            this.waitForThreadJoin(processingTimeExecutorSuite$$anon$2);
            return Assertions$.MODULE$.assertionsHelper().macroAssert(Bool$.MODULE$.simpleMacroBool(create.elem, "batchFallingBehindCalled", Prettifier$.MODULE$.default()), "", Prettifier$.MODULE$.default(), new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 148));
        }, new Position("ProcessingTimeExecutorSuite.scala", "Please set the environment variable SCALACTIC_FILL_FILE_PATHNAMES to yes at compile time to enable this feature.", 127));
    }
}
