package org.apache.kafka.server.purgatory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.kafka.common.utils.Time;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

/* loaded from: input_file:org/apache/kafka/server/purgatory/DelayedOperationTest.class */
public class DelayedOperationTest {
    private final MockKey test1 = new MockKey("test1");
    private final MockKey test2 = new MockKey("test2");
    private final MockKey test3 = new MockKey("test3");
    private final Random random = new Random();
    private DelayedOperationPurgatory<DelayedOperation> purgatory;
    private ScheduledExecutorService executorService;

    /* loaded from: input_file:org/apache/kafka/server/purgatory/DelayedOperationTest$MockDelayedOperation.class */
    private static class MockDelayedOperation extends DelayedOperation {
        private final Optional<Lock> responseLockOpt;
        boolean completable;

        MockDelayedOperation(long j) {
            this(j, Optional.empty());
        }

        MockDelayedOperation(long j, Optional<Lock> optional) {
            super(j);
            this.completable = false;
            this.responseLockOpt = optional;
        }

        public boolean tryComplete() {
            if (this.completable) {
                return forceComplete();
            }
            return false;
        }

        public void onExpiration() {
        }

        public void onComplete() {
            this.responseLockOpt.ifPresent(lock -> {
                if (!lock.tryLock()) {
                    throw new IllegalStateException("Response callback lock could not be acquired in callback");
                }
            });
            synchronized (this) {
                notify();
            }
        }

        void awaitExpiration() throws InterruptedException {
            synchronized (this) {
                wait();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/purgatory/DelayedOperationTest$MockKey.class */
    public static class MockKey implements DelayedOperationKey {
        final String key;

        public boolean equals(Object obj) {
            if (this == obj) {
                return true;
            }
            if (obj == null || getClass() != obj.getClass()) {
                return false;
            }
            return Objects.equals(this.key, ((MockKey) obj).key);
        }

        public int hashCode() {
            if (this.key != null) {
                return this.key.hashCode();
            }
            return 0;
        }

        MockKey(String str) {
            this.key = str;
        }

        public String keyLabel() {
            return this.key;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/kafka/server/purgatory/DelayedOperationTest$TestDelayOperation.class */
    public class TestDelayOperation extends MockDelayedOperation {
        private final MockKey key;
        private final AtomicInteger completionAttemptsRemaining;
        private final int maxDelayMs;

        TestDelayOperation(int i, int i2, int i3) {
            super(10000L, Optional.empty());
            this.key = new MockKey("key" + i);
            this.completionAttemptsRemaining = new AtomicInteger(i2);
            this.maxDelayMs = i3;
        }

        @Override // org.apache.kafka.server.purgatory.DelayedOperationTest.MockDelayedOperation
        public boolean tryComplete() {
            boolean z = this.completable;
            try {
                Thread.sleep(DelayedOperationTest.this.random.nextInt(this.maxDelayMs));
                if (z) {
                    return forceComplete();
                }
                return false;
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            }
        }
    }

    @BeforeEach
    public void setUp() {
        this.purgatory = new DelayedOperationPurgatory<>("mock", 0);
    }

    @AfterEach
    public void tearDown() throws Exception {
        this.purgatory.shutdown();
        if (this.executorService != null) {
            this.executorService.shutdown();
        }
    }

    @Test
    public void testLockInTryCompleteElseWatch() {
        this.purgatory.tryCompleteElseWatch(new DelayedOperation(100000L) { // from class: org.apache.kafka.server.purgatory.DelayedOperationTest.1
            public void onExpiration() {
            }

            public void onComplete() {
            }

            public boolean tryComplete() {
                Assertions.assertTrue(((ReentrantLock) this.lock).isHeldByCurrentThread());
                return false;
            }

            public boolean safeTryComplete() {
                Assertions.fail("tryCompleteElseWatch should not use safeTryComplete");
                return super.safeTryComplete();
            }
        }, Collections.singletonList(new MockKey("key")));
    }

    private DelayedOperation op(final boolean z) {
        return new DelayedOperation(100000L) { // from class: org.apache.kafka.server.purgatory.DelayedOperationTest.2
            public void onExpiration() {
            }

            public void onComplete() {
            }

            public boolean tryComplete() {
                Assertions.assertTrue(((ReentrantLock) this.lock).isHeldByCurrentThread());
                return z;
            }
        };
    }

    @Test
    public void testSafeTryCompleteOrElse() {
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        Assertions.assertFalse(op(false).safeTryCompleteOrElse(() -> {
            atomicBoolean.set(true);
        }));
        Assertions.assertTrue(atomicBoolean.get());
        Assertions.assertTrue(op(true).safeTryCompleteOrElse(() -> {
            Assertions.fail("this method should NOT be executed");
        }));
    }

    @Test
    public void testRequestSatisfaction() {
        MockDelayedOperation mockDelayedOperation = new MockDelayedOperation(100000L);
        MockDelayedOperation mockDelayedOperation2 = new MockDelayedOperation(100000L);
        Assertions.assertEquals(0, this.purgatory.checkAndComplete(this.test1), "With no waiting requests, nothing should be satisfied");
        Assertions.assertFalse(this.purgatory.tryCompleteElseWatch(mockDelayedOperation, Collections.singletonList(new MockKey("test1"))), "r1 not satisfied and hence watched");
        Assertions.assertEquals(0, this.purgatory.checkAndComplete(this.test1), "Still nothing satisfied");
        Assertions.assertFalse(this.purgatory.tryCompleteElseWatch(mockDelayedOperation2, Collections.singletonList(new MockKey("test2"))), "r2 not satisfied and hence watched");
        Assertions.assertEquals(0, this.purgatory.checkAndComplete(this.test2), "Still nothing satisfied");
        mockDelayedOperation.completable = true;
        Assertions.assertEquals(1, this.purgatory.checkAndComplete(this.test1), "r1 satisfied");
        Assertions.assertEquals(0, this.purgatory.checkAndComplete(this.test1), "Nothing satisfied");
        mockDelayedOperation2.completable = true;
        Assertions.assertEquals(1, this.purgatory.checkAndComplete(this.test2), "r2 satisfied");
        Assertions.assertEquals(0, this.purgatory.checkAndComplete(this.test2), "Nothing satisfied");
    }

    @Test
    public void testRequestExpiry() throws Exception {
        long hiResClockMs = Time.SYSTEM.hiResClockMs();
        MockDelayedOperation mockDelayedOperation = new MockDelayedOperation(20L);
        MockDelayedOperation mockDelayedOperation2 = new MockDelayedOperation(200000L);
        Assertions.assertFalse(this.purgatory.tryCompleteElseWatch(mockDelayedOperation, Collections.singletonList(this.test1)), "r1 not satisfied and hence watched");
        Assertions.assertFalse(this.purgatory.tryCompleteElseWatch(mockDelayedOperation2, Collections.singletonList(this.test2)), "r2 not satisfied and hence watched");
        mockDelayedOperation.awaitExpiration();
        long hiResClockMs2 = Time.SYSTEM.hiResClockMs() - hiResClockMs;
        Assertions.assertTrue(mockDelayedOperation.isCompleted(), "r1 completed due to expiration");
        Assertions.assertFalse(mockDelayedOperation2.isCompleted(), "r2 hasn't completed");
        boolean z = hiResClockMs2 >= 20;
        Assertions.assertTrue(z, "Time for expiration " + hiResClockMs2 + " should at least " + z);
    }

    @Test
    public void testRequestPurge() {
        MockDelayedOperation mockDelayedOperation = new MockDelayedOperation(100000L);
        MockDelayedOperation mockDelayedOperation2 = new MockDelayedOperation(100000L);
        MockDelayedOperation mockDelayedOperation3 = new MockDelayedOperation(100000L);
        this.purgatory.tryCompleteElseWatch(mockDelayedOperation, Collections.singletonList(this.test1));
        this.purgatory.tryCompleteElseWatch(mockDelayedOperation2, Arrays.asList(this.test1, this.test2));
        this.purgatory.tryCompleteElseWatch(mockDelayedOperation3, Arrays.asList(this.test1, this.test2, this.test3));
        Assertions.assertEquals(3, this.purgatory.numDelayed(), "Purgatory should have 3 total delayed operations");
        Assertions.assertEquals(6, this.purgatory.watched(), "Purgatory should have 6 watched elements");
        mockDelayedOperation2.completable = true;
        mockDelayedOperation2.tryComplete();
        Assertions.assertEquals(2, this.purgatory.numDelayed(), "Purgatory should have 2 total delayed operations instead of " + this.purgatory.numDelayed());
        mockDelayedOperation3.completable = true;
        mockDelayedOperation3.tryComplete();
        Assertions.assertEquals(1, this.purgatory.numDelayed(), "Purgatory should have 1 total delayed operations instead of " + this.purgatory.numDelayed());
        this.purgatory.checkAndComplete(this.test1);
        Assertions.assertEquals(4, this.purgatory.watched(), "Purgatory should have 4 watched elements instead of " + this.purgatory.watched());
        this.purgatory.checkAndComplete(this.test2);
        Assertions.assertEquals(2, this.purgatory.watched(), "Purgatory should have 2 watched elements instead of " + this.purgatory.watched());
        this.purgatory.checkAndComplete(this.test3);
        Assertions.assertEquals(1, this.purgatory.watched(), "Purgatory should have 1 watched elements instead of " + this.purgatory.watched());
    }

    @Test
    public void shouldCancelForKeyReturningCancelledOperations() {
        this.purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(this.test1));
        this.purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(this.test1));
        this.purgatory.tryCompleteElseWatch(new MockDelayedOperation(10000L), Collections.singletonList(this.test2));
        Assertions.assertEquals(2, this.purgatory.cancelForKey(this.test1).size());
        Assertions.assertEquals(1, this.purgatory.numDelayed());
        Assertions.assertEquals(1, this.purgatory.watched());
    }

    @Test
    public void shouldReturnNilOperationsOnCancelForKeyWhenKeyDoesntExist() {
        Assertions.assertTrue(this.purgatory.cancelForKey(this.test1).isEmpty());
    }

    @Test
    public void testTryCompleteWithMultipleThreads() throws ExecutionException, InterruptedException {
        this.executorService = Executors.newScheduledThreadPool(20);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 100; i++) {
            TestDelayOperation testDelayOperation = new TestDelayOperation(i, 20, 10);
            this.purgatory.tryCompleteElseWatch(testDelayOperation, Collections.singletonList(testDelayOperation.key));
            arrayList.add(testDelayOperation);
        }
        ArrayList arrayList2 = new ArrayList();
        for (int i2 = 1; i2 <= 20; i2++) {
            Iterator it = arrayList.iterator();
            while (it.hasNext()) {
                arrayList2.add(scheduleTryComplete(this.executorService, (TestDelayOperation) it.next(), this.random.nextInt(10)));
            }
        }
        Iterator it2 = arrayList2.iterator();
        while (it2.hasNext()) {
            ((Future) it2.next()).get();
        }
        arrayList.forEach(testDelayOperation2 -> {
            Assertions.assertTrue(testDelayOperation2.isCompleted(), "Operation " + testDelayOperation2.key.keyLabel() + " should have completed");
        });
    }

    private Future<?> scheduleTryComplete(ScheduledExecutorService scheduledExecutorService, TestDelayOperation testDelayOperation, long j) {
        return scheduledExecutorService.schedule(() -> {
            if (testDelayOperation.completionAttemptsRemaining.decrementAndGet() == 0) {
                testDelayOperation.completable = true;
            }
            this.purgatory.checkAndComplete(testDelayOperation.key);
        }, j, TimeUnit.MILLISECONDS);
    }
}
