/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.stream.Stream;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkThread;
import org.apache.kafka.clients.consumer.internals.CoordinatorRequestManager;
import org.apache.kafka.clients.consumer.internals.HeartbeatRequestManager;
import org.apache.kafka.clients.consumer.internals.NetworkClientDelegate;
import org.apache.kafka.clients.consumer.internals.OffsetsRequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManager;
import org.apache.kafka.clients.consumer.internals.RequestManagers;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEvent;
import org.apache.kafka.clients.consumer.internals.events.ApplicationEventProcessor;
import org.apache.kafka.clients.consumer.internals.events.AssignmentChangeEvent;
import org.apache.kafka.clients.consumer.internals.events.AsyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEvent;
import org.apache.kafka.clients.consumer.internals.events.CompletableEventReaper;
import org.apache.kafka.clients.consumer.internals.events.ListOffsetsEvent;
import org.apache.kafka.clients.consumer.internals.events.NewTopicsMetadataUpdateRequestEvent;
import org.apache.kafka.clients.consumer.internals.events.PollEvent;
import org.apache.kafka.clients.consumer.internals.events.ResetPositionsEvent;
import org.apache.kafka.clients.consumer.internals.events.SyncCommitEvent;
import org.apache.kafka.clients.consumer.internals.events.TopicMetadataEvent;
import org.apache.kafka.clients.consumer.internals.events.ValidatePositionsEvent;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.utils.LogContext;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestCondition;
import org.apache.kafka.test.TestUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;

public class ConsumerNetworkThreadTest {
    private final Time time;
    private final BlockingQueue<ApplicationEvent> applicationEventsQueue;
    private final ApplicationEventProcessor applicationEventProcessor;
    private final OffsetsRequestManager offsetsRequestManager;
    private final HeartbeatRequestManager heartbeatRequestManager;
    private final CoordinatorRequestManager coordinatorRequestManager;
    private final ConsumerNetworkThread consumerNetworkThread;
    private final NetworkClientDelegate networkClientDelegate = (NetworkClientDelegate)Mockito.mock(NetworkClientDelegate.class);
    private final RequestManagers requestManagers = (RequestManagers)Mockito.mock(RequestManagers.class);
    private final CompletableEventReaper applicationEventReaper;

    ConsumerNetworkThreadTest() {
        this.offsetsRequestManager = (OffsetsRequestManager)Mockito.mock(OffsetsRequestManager.class);
        this.heartbeatRequestManager = (HeartbeatRequestManager)Mockito.mock(HeartbeatRequestManager.class);
        this.coordinatorRequestManager = (CoordinatorRequestManager)Mockito.mock(CoordinatorRequestManager.class);
        this.applicationEventProcessor = (ApplicationEventProcessor)Mockito.mock(ApplicationEventProcessor.class);
        this.applicationEventReaper = (CompletableEventReaper)Mockito.mock(CompletableEventReaper.class);
        this.time = new MockTime();
        this.applicationEventsQueue = new LinkedBlockingQueue<ApplicationEvent>();
        LogContext logContext = new LogContext();
        this.consumerNetworkThread = new ConsumerNetworkThread(logContext, this.time, this.applicationEventsQueue, this.applicationEventReaper, () -> this.applicationEventProcessor, () -> this.networkClientDelegate, () -> this.requestManagers);
    }

    @BeforeEach
    public void setup() {
        this.consumerNetworkThread.initializeResources();
    }

    @AfterEach
    public void tearDown() {
        if (this.consumerNetworkThread != null) {
            this.consumerNetworkThread.close();
        }
    }

    @Test
    public void testEnsureCloseStopsRunningThread() {
        Assertions.assertTrue((boolean)this.consumerNetworkThread.isRunning(), (String)"ConsumerNetworkThread should start running when created");
        this.consumerNetworkThread.close();
        Assertions.assertFalse((boolean)this.consumerNetworkThread.isRunning(), (String)"close() should make consumerNetworkThread.running false by calling closeInternal(Duration timeout)");
    }

    @ParameterizedTest
    @ValueSource(longs={4999L, 5000L, 5001L})
    public void testConsumerNetworkThreadPollTimeComputations(long exampleTime) {
        ArrayList<Optional<Object>> list = new ArrayList<Optional<Object>>();
        list.add(Optional.of(this.coordinatorRequestManager));
        list.add(Optional.of(this.heartbeatRequestManager));
        Mockito.when((Object)this.requestManagers.entries()).thenReturn(list);
        NetworkClientDelegate.PollResult pollResult = new NetworkClientDelegate.PollResult(exampleTime);
        NetworkClientDelegate.PollResult pollResult1 = new NetworkClientDelegate.PollResult(exampleTime + 100L);
        long t = this.time.milliseconds();
        Mockito.when((Object)this.coordinatorRequestManager.poll(t)).thenReturn((Object)pollResult);
        Mockito.when((Object)this.coordinatorRequestManager.maximumTimeToWait(t)).thenReturn((Object)exampleTime);
        Mockito.when((Object)this.heartbeatRequestManager.poll(t)).thenReturn((Object)pollResult1);
        Mockito.when((Object)this.heartbeatRequestManager.maximumTimeToWait(t)).thenReturn((Object)(exampleTime + 100L));
        Mockito.when((Object)this.networkClientDelegate.addAll(pollResult)).thenReturn((Object)pollResult.timeUntilNextPollMs);
        Mockito.when((Object)this.networkClientDelegate.addAll(pollResult1)).thenReturn((Object)pollResult1.timeUntilNextPollMs);
        this.consumerNetworkThread.runOnce();
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate)).poll(Math.min(exampleTime, 5000L), this.time.milliseconds());
        Assertions.assertEquals((long)this.consumerNetworkThread.maximumTimeToWait(), (long)exampleTime);
    }

    @Test
    public void testStartupAndTearDown() throws InterruptedException {
        this.consumerNetworkThread.start();
        TestCondition isStarted = () -> ((ConsumerNetworkThread)this.consumerNetworkThread).isRunning();
        TestCondition isClosed = () -> !this.consumerNetworkThread.isRunning() && !this.consumerNetworkThread.isAlive();
        TestUtils.waitForCondition(isStarted, "The consumer network thread did not start within 15000 ms");
        this.consumerNetworkThread.close(Duration.ofMillis(15000L));
        TestUtils.waitForCondition(isClosed, "The consumer network thread did not stop within 15000 ms");
    }

    @Test
    public void testRequestsTransferFromManagersToClientOnThreadRun() {
        ArrayList<Optional<Object>> list = new ArrayList<Optional<Object>>();
        list.add(Optional.of(this.coordinatorRequestManager));
        list.add(Optional.of(this.heartbeatRequestManager));
        list.add(Optional.of(this.offsetsRequestManager));
        Mockito.when((Object)this.requestManagers.entries()).thenReturn(list);
        Mockito.when((Object)this.coordinatorRequestManager.poll(ArgumentMatchers.anyLong())).thenReturn(Mockito.mock(NetworkClientDelegate.PollResult.class));
        this.consumerNetworkThread.runOnce();
        this.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> ((RequestManager)Mockito.verify((Object)rm)).poll(ArgumentMatchers.anyLong())));
        this.requestManagers.entries().forEach(rmo -> rmo.ifPresent(rm -> ((RequestManager)Mockito.verify((Object)rm)).maximumTimeToWait(ArgumentMatchers.anyLong())));
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate)).addAll((NetworkClientDelegate.PollResult)ArgumentMatchers.any(NetworkClientDelegate.PollResult.class));
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate)).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    @ParameterizedTest
    @MethodSource(value={"applicationEvents"})
    public void testApplicationEventIsProcessed(ApplicationEvent e) {
        this.applicationEventsQueue.add(e);
        this.consumerNetworkThread.runOnce();
        if (e instanceof CompletableEvent) {
            ((CompletableEventReaper)Mockito.verify((Object)this.applicationEventReaper)).add((CompletableEvent)e);
        }
        ((ApplicationEventProcessor)Mockito.verify((Object)this.applicationEventProcessor)).process((ApplicationEvent)ArgumentMatchers.any(e.getClass()));
        Assertions.assertTrue((boolean)this.applicationEventsQueue.isEmpty());
    }

    @ParameterizedTest
    @ValueSource(booleans={true, false})
    public void testListOffsetsEventIsProcessed(boolean requireTimestamp) {
        Map<TopicPartition, Long> timestamps = Collections.singletonMap(new TopicPartition("topic1", 1), 5L);
        ListOffsetsEvent e = new ListOffsetsEvent(timestamps, CompletableEvent.calculateDeadlineMs((Time)this.time, (long)100L), requireTimestamp);
        this.applicationEventsQueue.add((ApplicationEvent)e);
        this.consumerNetworkThread.runOnce();
        ((ApplicationEventProcessor)Mockito.verify((Object)this.applicationEventProcessor)).process((ApplicationEvent)ArgumentMatchers.any(ListOffsetsEvent.class));
        Assertions.assertTrue((boolean)this.applicationEventsQueue.isEmpty());
    }

    @Test
    public void testResetPositionsProcessFailureIsIgnored() {
        ((OffsetsRequestManager)Mockito.doThrow((Throwable[])new Throwable[]{new NullPointerException()}).when((Object)this.offsetsRequestManager)).resetPositionsIfNeeded();
        ResetPositionsEvent event = new ResetPositionsEvent(CompletableEvent.calculateDeadlineMs((Time)this.time, (long)100L));
        this.applicationEventsQueue.add((ApplicationEvent)event);
        Assertions.assertDoesNotThrow(() -> this.consumerNetworkThread.runOnce());
        ((ApplicationEventProcessor)Mockito.verify((Object)this.applicationEventProcessor)).process((ApplicationEvent)ArgumentMatchers.any(ResetPositionsEvent.class));
    }

    @Test
    public void testMaximumTimeToWait() {
        int defaultHeartbeatIntervalMs = 1000;
        Assertions.assertEquals((long)5000L, (long)this.consumerNetworkThread.maximumTimeToWait());
        Mockito.when((Object)this.requestManagers.entries()).thenReturn(Collections.singletonList(Optional.of(this.heartbeatRequestManager)));
        Mockito.when((Object)this.heartbeatRequestManager.maximumTimeToWait(this.time.milliseconds())).thenReturn((Object)1000L);
        this.consumerNetworkThread.runOnce();
        Assertions.assertEquals((long)1000L, (long)this.consumerNetworkThread.maximumTimeToWait());
    }

    @Test
    public void testCleanupInvokesReaper() {
        LinkedList queue = new LinkedList();
        Mockito.when((Object)this.networkClientDelegate.unsentRequests()).thenReturn(queue);
        this.consumerNetworkThread.cleanup();
        ((CompletableEventReaper)Mockito.verify((Object)this.applicationEventReaper)).reap(this.applicationEventsQueue);
    }

    @Test
    public void testRunOnceInvokesReaper() {
        this.consumerNetworkThread.runOnce();
        ((CompletableEventReaper)Mockito.verify((Object)this.applicationEventReaper)).reap(((Long)ArgumentMatchers.any(Long.class)).longValue());
    }

    @Test
    public void testSendUnsentRequests() {
        Mockito.when((Object)this.networkClientDelegate.hasAnyPendingRequests()).thenReturn((Object)true).thenReturn((Object)true).thenReturn((Object)false);
        this.consumerNetworkThread.cleanup();
        ((NetworkClientDelegate)Mockito.verify((Object)this.networkClientDelegate, (VerificationMode)Mockito.times((int)2))).poll(ArgumentMatchers.anyLong(), ArgumentMatchers.anyLong());
    }

    private static Stream<Arguments> applicationEvents() {
        HashMap offset = new HashMap();
        long currentTimeMs = 12345L;
        return Stream.of(Arguments.of((Object[])new Object[]{new PollEvent(100L)}), Arguments.of((Object[])new Object[]{new NewTopicsMetadataUpdateRequestEvent()}), Arguments.of((Object[])new Object[]{new AsyncCommitEvent(new HashMap())}), Arguments.of((Object[])new Object[]{new SyncCommitEvent(new HashMap(), 500L)}), Arguments.of((Object[])new Object[]{new ResetPositionsEvent(500L)}), Arguments.of((Object[])new Object[]{new ValidatePositionsEvent(500L)}), Arguments.of((Object[])new Object[]{new TopicMetadataEvent("topic", Long.MAX_VALUE)}), Arguments.of((Object[])new Object[]{new AssignmentChangeEvent(offset, 12345L)}));
    }
}

