/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.dse.driver.internal.core.cql.reactive;

import com.datastax.dse.driver.DseTestDataProviders;
import com.datastax.dse.driver.DseTestFixtures;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveResultSet;
import com.datastax.dse.driver.api.core.cql.reactive.ReactiveRow;
import com.datastax.dse.driver.internal.core.cql.reactive.CqlRequestReactiveProcessor;
import com.datastax.dse.driver.internal.core.cql.reactive.DefaultReactiveResultSet;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.cql.AsyncResultSet;
import com.datastax.oss.driver.api.core.cql.ColumnDefinitions;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.cql.Conversions;
import com.datastax.oss.driver.internal.core.cql.CqlRequestAsyncProcessor;
import com.datastax.oss.driver.internal.core.cql.CqlRequestHandlerTestBase;
import com.datastax.oss.driver.internal.core.cql.PoolBehavior;
import com.datastax.oss.driver.internal.core.cql.RequestHandlerTestHarness;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.protocol.internal.Message;
import com.datastax.oss.protocol.internal.response.Result;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import io.reactivex.Flowable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;

public class CqlRequestReactiveProcessorTest
extends CqlRequestHandlerTestBase {
    @Test
    public void should_be_able_to_process_reactive_result_set() {
        CqlRequestReactiveProcessor processor = new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor());
        Assertions.assertThat((boolean)processor.canProcess((Request)UNDEFINED_IDEMPOTENCE_STATEMENT, CqlRequestReactiveProcessor.REACTIVE_RESULT_SET)).isTrue();
    }

    @Test
    public void should_create_request_handler() {
        RequestHandlerTestHarness.Builder builder = RequestHandlerTestHarness.builder().withProtocolVersion((ProtocolVersion)DseProtocolVersion.DSE_V1);
        try (RequestHandlerTestHarness harness = builder.build();){
            CqlRequestReactiveProcessor processor = new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor());
            Assertions.assertThat((Object)processor.process((Statement)UNDEFINED_IDEMPOTENCE_STATEMENT, harness.getSession(), harness.getContext(), "test")).isInstanceOf(DefaultReactiveResultSet.class);
        }
    }

    @Test
    @UseDataProvider(value="allDseAndOssProtocolVersions", location={DseTestDataProviders.class})
    public void should_complete_single_page_result(ProtocolVersion version) {
        try (RequestHandlerTestHarness harness = RequestHandlerTestHarness.builder().withProtocolVersion(version).withResponse((Node)this.node1, CqlRequestReactiveProcessorTest.defaultFrameOf((Message)DseTestFixtures.singleDseRow())).build();){
            DefaultSession session = harness.getSession();
            InternalDriverContext context = harness.getContext();
            ReactiveResultSet publisher = new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor()).process((Statement)UNDEFINED_IDEMPOTENCE_STATEMENT, session, context, "test");
            List rows = (List)Flowable.fromPublisher((Publisher)publisher).toList().blockingGet();
            Assertions.assertThat((List)rows).hasSize(1);
            ReactiveRow row = (ReactiveRow)rows.get(0);
            Assertions.assertThat((String)row.getString("message")).isEqualTo("hello, world");
            ExecutionInfo executionInfo = row.getExecutionInfo();
            Assertions.assertThat((Object)executionInfo.getCoordinator()).isEqualTo((Object)this.node1);
            Assertions.assertThat((List)executionInfo.getErrors()).isEmpty();
            Assertions.assertThat((Map)executionInfo.getIncomingPayload()).isEmpty();
            Assertions.assertThat((Comparable)executionInfo.getPagingState()).isNull();
            Assertions.assertThat((int)executionInfo.getSpeculativeExecutionCount()).isEqualTo(0);
            Assertions.assertThat((int)executionInfo.getSuccessfulExecutionIndex()).isEqualTo(0);
            Assertions.assertThat((List)executionInfo.getWarnings()).isEmpty();
            Flowable execInfosFlowable = Flowable.fromPublisher((Publisher)publisher.getExecutionInfos());
            Assertions.assertThat((List)((List)execInfosFlowable.toList().blockingGet())).containsExactly((Object[])new ExecutionInfo[]{executionInfo});
            Flowable colDefsFlowable = Flowable.fromPublisher((Publisher)publisher.getColumnDefinitions());
            Assertions.assertThat((List)((List)colDefsFlowable.toList().blockingGet())).containsExactly((Object[])new ColumnDefinitions[]{row.getColumnDefinitions()});
            Flowable wasAppliedFlowable = Flowable.fromPublisher((Publisher)publisher.wasApplied());
            Assertions.assertThat((List)((List)wasAppliedFlowable.toList().blockingGet())).containsExactly((Object[])new Boolean[]{row.wasApplied()});
        }
    }

    @Test
    @UseDataProvider(value="allDseAndOssProtocolVersions", location={DseTestDataProviders.class})
    public void should_complete_multi_page_result(ProtocolVersion version) {
        RequestHandlerTestHarness.Builder builder = RequestHandlerTestHarness.builder().withProtocolVersion(version);
        PoolBehavior node1Behavior = builder.customBehavior((Node)this.node1);
        try (RequestHandlerTestHarness harness = builder.build();){
            DefaultSession session = harness.getSession();
            InternalDriverContext context = harness.getContext();
            CompletableFuture<AsyncResultSet> page2Future = new CompletableFuture<AsyncResultSet>();
            Mockito.when((Object)session.executeAsync((Statement)ArgumentMatchers.any(Statement.class))).thenAnswer(invocation -> page2Future);
            ExecutionInfo mockInfo = (ExecutionInfo)Mockito.mock(ExecutionInfo.class);
            ReactiveResultSet publisher = new CqlRequestReactiveProcessor(new CqlRequestAsyncProcessor()).process((Statement)UNDEFINED_IDEMPOTENCE_STATEMENT, session, context, "test");
            Flowable rowsPublisher = Flowable.fromPublisher((Publisher)publisher).cache();
            rowsPublisher.subscribe();
            node1Behavior.setResponseSuccess(CqlRequestReactiveProcessorTest.defaultFrameOf((Message)DseTestFixtures.tenDseRows(1, false)));
            page2Future.complete(Conversions.toResultSet((Result)DseTestFixtures.tenDseRows(2, true), (ExecutionInfo)mockInfo, (CqlSession)harness.getSession(), (InternalDriverContext)harness.getContext()));
            List rows = (List)rowsPublisher.toList().blockingGet();
            Assertions.assertThat((List)rows).hasSize(20);
            ReactiveRow first = (ReactiveRow)rows.get(0);
            ExecutionInfo firstExecutionInfo = first.getExecutionInfo();
            Assertions.assertThat((Object)firstExecutionInfo.getCoordinator()).isEqualTo((Object)this.node1);
            Assertions.assertThat((List)firstExecutionInfo.getErrors()).isEmpty();
            Assertions.assertThat((Map)firstExecutionInfo.getIncomingPayload()).isEmpty();
            Assertions.assertThat((Comparable)firstExecutionInfo.getPagingState()).isNotNull();
            Assertions.assertThat((int)firstExecutionInfo.getSpeculativeExecutionCount()).isEqualTo(0);
            Assertions.assertThat((int)firstExecutionInfo.getSuccessfulExecutionIndex()).isEqualTo(0);
            Assertions.assertThat((List)firstExecutionInfo.getWarnings()).isEmpty();
            ReactiveRow inSecondPage = (ReactiveRow)rows.get(10);
            ExecutionInfo secondExecutionInfo = inSecondPage.getExecutionInfo();
            Assertions.assertThat((Object)secondExecutionInfo).isSameAs((Object)mockInfo);
            Flowable execInfosFlowable = Flowable.fromPublisher((Publisher)publisher.getExecutionInfos());
            Assertions.assertThat((List)((List)execInfosFlowable.toList().blockingGet())).containsExactly((Object[])new ExecutionInfo[]{firstExecutionInfo, secondExecutionInfo});
            Flowable colDefsFlowable = Flowable.fromPublisher((Publisher)publisher.getColumnDefinitions());
            Assertions.assertThat((List)((List)colDefsFlowable.toList().blockingGet())).containsExactly((Object[])new ColumnDefinitions[]{first.getColumnDefinitions()});
            Flowable wasAppliedFlowable = Flowable.fromPublisher((Publisher)publisher.wasApplied());
            Assertions.assertThat((List)((List)wasAppliedFlowable.toList().blockingGet())).containsExactly((Object[])new Boolean[]{first.wasApplied()});
        }
    }
}

