/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.oss.driver.internal.core.channel;

import com.datastax.oss.driver.Assertions;
import com.datastax.oss.driver.api.core.CqlIdentifier;
import com.datastax.oss.driver.api.core.DefaultProtocolVersion;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.connection.BusyConnectionException;
import com.datastax.oss.driver.api.core.connection.ClosedConnectionException;
import com.datastax.oss.driver.internal.core.channel.ChannelHandlerTestBase;
import com.datastax.oss.driver.internal.core.channel.DriverChannel;
import com.datastax.oss.driver.internal.core.channel.EventCallback;
import com.datastax.oss.driver.internal.core.channel.InFlightHandler;
import com.datastax.oss.driver.internal.core.channel.MockResponseCallback;
import com.datastax.oss.driver.internal.core.channel.ResponseCallback;
import com.datastax.oss.driver.internal.core.channel.StreamIdGenerator;
import com.datastax.oss.driver.internal.core.protocol.FrameDecodingException;
import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableList;
import com.datastax.oss.protocol.internal.Frame;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.request.Query;
import com.datastax.oss.protocol.internal.response.Error;
import com.datastax.oss.protocol.internal.response.event.StatusChangeEvent;
import com.datastax.oss.protocol.internal.response.result.SetKeyspace;
import com.datastax.oss.protocol.internal.response.result.Void;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPromise;
import io.netty.util.concurrent.Promise;
import java.net.InetSocketAddress;
import java.util.Collections;
import java.util.concurrent.TimeUnit;
import org.assertj.core.api.AbstractThrowableAssert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;
import org.mockito.verification.VerificationMode;

public class InFlightHandlerTest
extends ChannelHandlerTestBase {
    private static final Query QUERY = new Query("select * from foo");
    private static final int SET_KEYSPACE_TIMEOUT_MILLIS = 100;
    private static final int MAX_ORPHAN_IDS = 10;
    @Mock
    private StreamIdGenerator streamIds;

    @Override
    @Before
    public void setup() {
        super.setup();
        MockitoAnnotations.initMocks((Object)this);
        Mockito.when((Object)this.streamIds.preAcquire()).thenReturn((Object)true);
    }

    @Test
    public void should_fail_if_connection_busy() throws Throwable {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)-1);
        ChannelFuture writeFuture = this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)new MockResponseCallback()));
        Assertions.assertThat(writeFuture).isFailed(e -> {
            AbstractThrowableAssert cfr_ignored_0 = (AbstractThrowableAssert)Assertions.assertThat((Throwable)e).isInstanceOf(BusyConnectionException.class);
        });
    }

    @Test
    public void should_assign_streamid_and_send_frame() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        ChannelFuture writeFuture = this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback));
        Assertions.assertThat(writeFuture).isSuccess();
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds)).acquire();
        Frame frame = this.readOutboundFrame();
        Assertions.assertThat((int)frame.streamId).isEqualTo(42);
        Assertions.assertThat((Object)frame.message).isEqualTo((Object)QUERY);
    }

    @Test
    public void should_notify_callback_of_response() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback));
        Frame requestFrame = this.readOutboundFrame();
        Frame responseFrame = this.buildInboundFrame(requestFrame, (Message)Void.INSTANCE);
        this.writeInboundFrame(responseFrame);
        Assertions.assertThat((Object)responseCallback.getLastResponse()).isSameAs((Object)responseFrame);
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds)).release(42);
    }

    @Test
    public void should_notify_response_promise_when_decoding_fails() throws Throwable {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        RuntimeException mockCause = new RuntimeException("test");
        this.channel.pipeline().fireExceptionCaught((Throwable)new FrameDecodingException(42, (Throwable)mockCause));
        Assertions.assertThat((Throwable)responseCallback.getFailure()).isSameAs((Object)mockCause);
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds)).release(42);
    }

    @Test
    public void should_release_stream_id_when_orphaned_callback_receives_response() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback));
        Frame requestFrame = this.readOutboundFrame();
        this.channel.writeAndFlush((Object)responseCallback);
        Frame responseFrame = this.buildInboundFrame(requestFrame, (Message)Void.INSTANCE);
        this.writeInboundFrame(responseFrame);
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds)).release(42);
        Assertions.assertThat((Object)responseCallback.getLastResponse()).isNull();
    }

    @Test
    public void should_delay_graceful_close_and_complete_when_last_pending_completes() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        this.channel.write(DriverChannel.GRACEFUL_CLOSE_MESSAGE);
        Assertions.assertThat(this.channel.closeFuture()).isNotDone();
        Frame requestFrame = this.readOutboundFrame();
        this.writeInboundFrame(requestFrame, (Message)Void.INSTANCE);
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
    }

    @Test
    public void should_delay_graceful_close_and_complete_when_last_pending_cancelled() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        this.channel.write(DriverChannel.GRACEFUL_CLOSE_MESSAGE);
        Assertions.assertThat(this.channel.closeFuture()).isNotDone();
        this.channel.write((Object)responseCallback);
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
    }

    @Test
    public void should_graceful_close_immediately_if_no_pending() {
        this.addToPipeline();
        this.channel.write(DriverChannel.GRACEFUL_CLOSE_MESSAGE);
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
    }

    @Test
    public void should_refuse_new_writes_during_graceful_close() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        this.channel.write(DriverChannel.GRACEFUL_CLOSE_MESSAGE);
        Assertions.assertThat(this.channel.closeFuture()).isNotDone();
        ChannelFuture otherWriteFuture = this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback));
        Assertions.assertThat(otherWriteFuture).isFailed(e -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)e).isInstanceOf(IllegalStateException.class)).hasMessage("Channel is closing"));
    }

    @Test
    public void should_close_gracefully_if_orphan_ids_above_max_and_pending_requests() {
        MockResponseCallback responseCallback;
        this.addToPipeline();
        for (int i = 0; i < 10; ++i) {
            Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)i);
            responseCallback = new MockResponseCallback();
            this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
            this.channel.writeAndFlush((Object)responseCallback).awaitUninterruptibly();
        }
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)10);
        MockResponseCallback pendingResponseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)pendingResponseCallback)).awaitUninterruptibly();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)11);
        responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        this.channel.writeAndFlush((Object)responseCallback).awaitUninterruptibly();
        Assertions.assertThat(this.channel.closeFuture()).isNotDone();
        ChannelFuture otherWriteFuture = this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback));
        Assertions.assertThat(otherWriteFuture).isFailed(e -> ((AbstractThrowableAssert)Assertions.assertThat((Throwable)e).isInstanceOf(IllegalStateException.class)).hasMessage("Channel is closing"));
        this.channel.writeAndFlush((Object)pendingResponseCallback).awaitUninterruptibly();
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
    }

    @Test
    public void should_close_immediately_if_orphan_ids_above_max_and_no_pending_requests() {
        this.addToPipeline();
        for (int i = 0; i < 10; ++i) {
            Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)i);
            MockResponseCallback responseCallback = new MockResponseCallback();
            this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
            this.channel.writeAndFlush((Object)responseCallback).awaitUninterruptibly();
        }
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)10);
        MockResponseCallback responseCallback = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        this.channel.writeAndFlush((Object)responseCallback).awaitUninterruptibly();
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
    }

    @Test
    public void should_fail_all_pending_when_force_closed() throws Throwable {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42, (Object[])new Integer[]{43});
        MockResponseCallback responseCallback1 = new MockResponseCallback();
        MockResponseCallback responseCallback2 = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback1)).awaitUninterruptibly();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback2)).awaitUninterruptibly();
        this.channel.write(DriverChannel.FORCEFUL_CLOSE_MESSAGE);
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
        for (MockResponseCallback callback : ImmutableList.of((Object)responseCallback1, (Object)responseCallback2)) {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)callback.getFailure()).isInstanceOf(ClosedConnectionException.class)).hasMessageContaining("Channel was force-closed");
        }
    }

    @Test
    public void should_fail_all_pending_and_close_on_unexpected_inbound_exception() throws Throwable {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42, (Object[])new Integer[]{43});
        MockResponseCallback responseCallback1 = new MockResponseCallback();
        MockResponseCallback responseCallback2 = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback1)).awaitUninterruptibly();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback2)).awaitUninterruptibly();
        RuntimeException mockException = new RuntimeException("test");
        this.channel.pipeline().fireExceptionCaught((Throwable)mockException);
        Assertions.assertThat(this.channel.closeFuture()).isSuccess();
        for (MockResponseCallback callback : ImmutableList.of((Object)responseCallback1, (Object)responseCallback2)) {
            Throwable failure = callback.getFailure();
            Assertions.assertThat((Throwable)failure).isInstanceOf(ClosedConnectionException.class);
            Assertions.assertThat((Throwable)failure.getCause()).isSameAs((Object)mockException);
        }
    }

    @Test
    public void should_fail_all_pending_if_connection_lost() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42, (Object[])new Integer[]{43});
        MockResponseCallback responseCallback1 = new MockResponseCallback();
        MockResponseCallback responseCallback2 = new MockResponseCallback();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback1)).awaitUninterruptibly();
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback2)).awaitUninterruptibly();
        this.channel.pipeline().fireChannelInactive();
        for (MockResponseCallback callback : ImmutableList.of((Object)responseCallback1, (Object)responseCallback2)) {
            ((AbstractThrowableAssert)Assertions.assertThat((Throwable)callback.getFailure()).isInstanceOf(ClosedConnectionException.class)).hasMessageContaining("Lost connection to remote peer");
        }
    }

    @Test
    public void should_hold_stream_id_for_multi_response_callback() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback(frame -> frame.message instanceof Error);
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        Assertions.assertThat((int)responseCallback.streamId).isEqualTo(42);
        Frame requestFrame = this.readOutboundFrame();
        for (int i = 0; i < 5; ++i) {
            Frame responseFrame = this.buildInboundFrame(requestFrame, (Message)Void.INSTANCE);
            this.writeInboundFrame(responseFrame);
            Assertions.assertThat((Object)responseCallback.getLastResponse()).isSameAs((Object)responseFrame);
            ((StreamIdGenerator)Mockito.verify((Object)this.streamIds, (VerificationMode)Mockito.never())).release(42);
        }
        Frame responseFrame = this.buildInboundFrame(requestFrame, (Message)new Error(0, "test"));
        this.writeInboundFrame(responseFrame);
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds)).release(42);
        Assertions.assertThat((Object)responseCallback.getLastResponse()).isSameAs((Object)responseFrame);
        this.writeInboundFrame(requestFrame, (Message)Void.INSTANCE);
        Assertions.assertThat((Object)responseCallback.getLastResponse()).isNull();
    }

    @Test
    public void should_release_stream_id_when_orphaned_multi_response_callback_receives_last_response() {
        this.addToPipeline();
        Mockito.when((Object)this.streamIds.acquire()).thenReturn((Object)42);
        MockResponseCallback responseCallback = new MockResponseCallback(frame -> frame.message instanceof Error);
        this.channel.writeAndFlush((Object)new DriverChannel.RequestMessage((Message)QUERY, false, Frame.NO_PAYLOAD, (ResponseCallback)responseCallback)).awaitUninterruptibly();
        Frame requestFrame = this.readOutboundFrame();
        for (int i = 0; i < 5; ++i) {
            Frame responseFrame = this.buildInboundFrame(requestFrame, (Message)Void.INSTANCE);
            this.writeInboundFrame(responseFrame);
            Assertions.assertThat((Object)responseCallback.getLastResponse()).isSameAs((Object)responseFrame);
            ((StreamIdGenerator)Mockito.verify((Object)this.streamIds, (VerificationMode)Mockito.never())).release(42);
        }
        this.channel.writeAndFlush((Object)responseCallback);
        this.writeInboundFrame(requestFrame, (Message)Void.INSTANCE);
        Assertions.assertThat((Object)responseCallback.getLastResponse()).isNull();
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds, (VerificationMode)Mockito.never())).release(42);
        this.writeInboundFrame(requestFrame, (Message)new Error(0, "test"));
        Assertions.assertThat((Object)responseCallback.getLastResponse()).isNull();
        ((StreamIdGenerator)Mockito.verify((Object)this.streamIds)).release(42);
    }

    @Test
    public void should_set_keyspace() {
        this.addToPipeline();
        ChannelPromise setKeyspacePromise = this.channel.newPromise();
        DriverChannel.SetKeyspaceEvent setKeyspaceEvent = new DriverChannel.SetKeyspaceEvent(CqlIdentifier.fromCql((String)"ks"), (Promise)setKeyspacePromise);
        this.channel.pipeline().fireUserEventTriggered((Object)setKeyspaceEvent);
        Frame requestFrame = this.readOutboundFrame();
        Assertions.assertThat((Object)requestFrame.message).isInstanceOf(Query.class);
        this.writeInboundFrame(requestFrame, (Message)new SetKeyspace("ks"));
        Assertions.assertThat(setKeyspacePromise).isSuccess();
    }

    @Test
    public void should_fail_to_set_keyspace_if_query_times_out() throws InterruptedException {
        this.addToPipeline();
        ChannelPromise setKeyspacePromise = this.channel.newPromise();
        DriverChannel.SetKeyspaceEvent setKeyspaceEvent = new DriverChannel.SetKeyspaceEvent(CqlIdentifier.fromCql((String)"ks"), (Promise)setKeyspacePromise);
        this.channel.pipeline().fireUserEventTriggered((Object)setKeyspaceEvent);
        TimeUnit.MILLISECONDS.sleep(200L);
        this.channel.runPendingTasks();
        Assertions.assertThat(setKeyspacePromise).isFailed();
    }

    @Test
    public void should_notify_callback_of_events() {
        EventCallback eventCallback = (EventCallback)Mockito.mock(EventCallback.class);
        this.addToPipelineWithEventCallback(eventCallback);
        StatusChangeEvent event = new StatusChangeEvent("UP", new InetSocketAddress("127.0.0.1", 9042));
        Frame eventFrame = Frame.forResponse((int)DefaultProtocolVersion.V3.getCode(), (int)-1, null, Collections.emptyMap(), Collections.emptyList(), (Message)event);
        this.writeInboundFrame(eventFrame);
        ArgumentCaptor captor = ArgumentCaptor.forClass(StatusChangeEvent.class);
        ((EventCallback)Mockito.verify((Object)eventCallback)).onEvent((Message)captor.capture());
        Assertions.assertThat((Object)((StatusChangeEvent)captor.getValue())).isSameAs((Object)event);
    }

    private void addToPipeline() {
        this.addToPipelineWithEventCallback(null);
    }

    private void addToPipelineWithEventCallback(EventCallback eventCallback) {
        this.channel.pipeline().addLast(new ChannelHandler[]{new InFlightHandler((ProtocolVersion)DefaultProtocolVersion.V3, this.streamIds, 10, 100L, this.channel.newPromise(), eventCallback, "test")});
    }
}

