/*
 * Decompiled with CFR 0.152.
 */
package com.datastax.dse.driver.internal.core.graph.reactive;

import com.datastax.dse.driver.DseTestDataProviders;
import com.datastax.dse.driver.api.core.DseProtocolVersion;
import com.datastax.dse.driver.api.core.graph.GraphStatement;
import com.datastax.dse.driver.api.core.graph.ScriptGraphStatement;
import com.datastax.dse.driver.api.core.graph.reactive.ReactiveGraphNode;
import com.datastax.dse.driver.api.core.graph.reactive.ReactiveGraphResultSet;
import com.datastax.dse.driver.internal.core.cql.continuous.ContinuousCqlRequestHandlerTestBase;
import com.datastax.dse.driver.internal.core.graph.GraphProtocol;
import com.datastax.dse.driver.internal.core.graph.GraphRequestAsyncProcessor;
import com.datastax.dse.driver.internal.core.graph.GraphRequestHandlerTestHarness;
import com.datastax.dse.driver.internal.core.graph.GraphSupportChecker;
import com.datastax.dse.driver.internal.core.graph.GraphTestUtils;
import com.datastax.dse.driver.internal.core.graph.binary.GraphBinaryModule;
import com.datastax.dse.driver.internal.core.graph.reactive.DefaultReactiveGraphResultSet;
import com.datastax.dse.driver.internal.core.graph.reactive.ReactiveGraphRequestProcessor;
import com.datastax.oss.driver.api.core.ProtocolVersion;
import com.datastax.oss.driver.api.core.config.DriverExecutionProfile;
import com.datastax.oss.driver.api.core.cql.ExecutionInfo;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.session.Request;
import com.datastax.oss.driver.internal.core.context.DefaultDriverContext;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.cql.PoolBehavior;
import com.datastax.oss.driver.internal.core.session.DefaultSession;
import com.datastax.oss.protocol.internal.Message;
import com.tngtech.java.junit.dataprovider.UseDataProvider;
import io.reactivex.Flowable;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.assertj.core.api.Assertions;
import org.assertj.core.api.ListAssert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.reactivestreams.Publisher;

public class ReactiveGraphRequestProcessorTest
extends ContinuousCqlRequestHandlerTestBase {
    private GraphRequestAsyncProcessor asyncProcessor;
    private GraphSupportChecker graphSupportChecker;

    @Before
    public void setUp() {
        DefaultDriverContext context = (DefaultDriverContext)Mockito.mock(DefaultDriverContext.class);
        this.graphSupportChecker = (GraphSupportChecker)Mockito.mock(GraphSupportChecker.class);
        this.asyncProcessor = (GraphRequestAsyncProcessor)Mockito.spy((Object)new GraphRequestAsyncProcessor(context, this.graphSupportChecker));
    }

    @Test
    public void should_be_able_to_process_graph_reactive_result_set() {
        ReactiveGraphRequestProcessor processor = new ReactiveGraphRequestProcessor(this.asyncProcessor);
        Assertions.assertThat((boolean)processor.canProcess((Request)ScriptGraphStatement.newInstance((String)"g.V()"), ReactiveGraphRequestProcessor.REACTIVE_GRAPH_RESULT_SET)).isTrue();
    }

    @Test
    public void should_create_reactive_result_set() {
        GraphRequestHandlerTestHarness.Builder builder = GraphRequestHandlerTestHarness.builder().withProtocolVersion((ProtocolVersion)DseProtocolVersion.DSE_V1);
        try (GraphRequestHandlerTestHarness harness = builder.build();){
            ReactiveGraphRequestProcessor processor = new ReactiveGraphRequestProcessor(this.asyncProcessor);
            ScriptGraphStatement graphStatement = ScriptGraphStatement.newInstance((String)"g.V()");
            Assertions.assertThat((Object)processor.process((GraphStatement)graphStatement, harness.getSession(), (InternalDriverContext)harness.getContext(), "test")).isInstanceOf(DefaultReactiveGraphResultSet.class);
        }
    }

    @Test
    @UseDataProvider(value="allDseProtocolVersionsAndSupportedGraphProtocols", location={DseTestDataProviders.class})
    public void should_complete_single_page_result(DseProtocolVersion version, GraphProtocol graphProtocol) throws IOException {
        Mockito.when((Object)this.graphSupportChecker.isPagingEnabled((GraphStatement)ArgumentMatchers.any(), (InternalDriverContext)ArgumentMatchers.any())).thenReturn((Object)false);
        Mockito.when((Object)this.graphSupportChecker.inferGraphProtocol((GraphStatement)ArgumentMatchers.any(), (DriverExecutionProfile)ArgumentMatchers.any(), (InternalDriverContext)ArgumentMatchers.any())).thenReturn((Object)graphProtocol);
        GraphRequestHandlerTestHarness.Builder builder = GraphRequestHandlerTestHarness.builder().withProtocolVersion((ProtocolVersion)version);
        PoolBehavior node1Behavior = builder.customBehavior((Node)this.node1);
        try (GraphRequestHandlerTestHarness harness = builder.build();){
            DefaultSession session = harness.getSession();
            DefaultDriverContext context = harness.getContext();
            ScriptGraphStatement graphStatement = ScriptGraphStatement.newInstance((String)"g.V()");
            GraphBinaryModule graphBinaryModule = GraphTestUtils.createGraphBinaryModule(context);
            Mockito.when((Object)this.asyncProcessor.getGraphBinaryModule()).thenReturn((Object)graphBinaryModule);
            ReactiveGraphResultSet publisher = new ReactiveGraphRequestProcessor(this.asyncProcessor).process((GraphStatement)graphStatement, session, (InternalDriverContext)context, "test");
            Flowable rowsPublisher = Flowable.fromPublisher((Publisher)publisher).cache();
            rowsPublisher.subscribe();
            node1Behavior.setResponseSuccess(GraphTestUtils.defaultDseFrameOf((Message)GraphTestUtils.tenGraphRows(graphProtocol, graphBinaryModule, 1, true)));
            List rows = (List)rowsPublisher.toList().blockingGet();
            Assertions.assertThat((List)rows).hasSize(10);
            this.checkResultSet(rows);
            Flowable execInfosFlowable = Flowable.fromPublisher((Publisher)publisher.getExecutionInfos());
            ((ListAssert)Assertions.assertThat((List)((List)execInfosFlowable.toList().blockingGet())).hasSize(1)).containsExactly((Object[])new ExecutionInfo[]{((ReactiveGraphNode)rows.get(0)).getExecutionInfo()});
        }
    }

    @Test
    @UseDataProvider(value="allDseProtocolVersionsAndSupportedGraphProtocols", location={DseTestDataProviders.class})
    public void should_complete_multi_page_result(DseProtocolVersion version, GraphProtocol graphProtocol) throws IOException {
        Mockito.when((Object)this.graphSupportChecker.isPagingEnabled((GraphStatement)ArgumentMatchers.any(), (InternalDriverContext)ArgumentMatchers.any())).thenReturn((Object)true);
        Mockito.when((Object)this.graphSupportChecker.inferGraphProtocol((GraphStatement)ArgumentMatchers.any(), (DriverExecutionProfile)ArgumentMatchers.any(), (InternalDriverContext)ArgumentMatchers.any())).thenReturn((Object)graphProtocol);
        GraphRequestHandlerTestHarness.Builder builder = GraphRequestHandlerTestHarness.builder().withProtocolVersion((ProtocolVersion)version);
        PoolBehavior node1Behavior = builder.customBehavior((Node)this.node1);
        try (GraphRequestHandlerTestHarness harness = builder.build();){
            DefaultSession session = harness.getSession();
            DefaultDriverContext context = harness.getContext();
            ScriptGraphStatement graphStatement = ScriptGraphStatement.newInstance((String)"g.V()");
            GraphBinaryModule graphBinaryModule = GraphTestUtils.createGraphBinaryModule(context);
            Mockito.when((Object)this.asyncProcessor.getGraphBinaryModule()).thenReturn((Object)graphBinaryModule);
            ReactiveGraphResultSet publisher = new ReactiveGraphRequestProcessor(this.asyncProcessor).process((GraphStatement)graphStatement, session, (InternalDriverContext)context, "test");
            Flowable rowsPublisher = Flowable.fromPublisher((Publisher)publisher).cache();
            rowsPublisher.subscribe();
            node1Behavior.setResponseSuccess(GraphTestUtils.defaultDseFrameOf((Message)GraphTestUtils.tenGraphRows(graphProtocol, graphBinaryModule, 1, false)));
            node1Behavior.setResponseSuccess(GraphTestUtils.defaultDseFrameOf((Message)GraphTestUtils.tenGraphRows(graphProtocol, graphBinaryModule, 2, true)));
            List rows = (List)rowsPublisher.toList().blockingGet();
            Assertions.assertThat((List)rows).hasSize(20);
            this.checkResultSet(rows);
            Flowable execInfosFlowable = Flowable.fromPublisher((Publisher)publisher.getExecutionInfos());
            ((ListAssert)Assertions.assertThat((List)((List)execInfosFlowable.toList().blockingGet())).hasSize(2)).containsExactly((Object[])new ExecutionInfo[]{((ReactiveGraphNode)rows.get(0)).getExecutionInfo(), ((ReactiveGraphNode)rows.get(10)).getExecutionInfo()});
        }
    }

    private void checkResultSet(List<ReactiveGraphNode> rows) {
        for (ReactiveGraphNode row : rows) {
            Assertions.assertThat((boolean)row.isVertex()).isTrue();
            ExecutionInfo executionInfo = row.getExecutionInfo();
            Assertions.assertThat((Object)executionInfo.getCoordinator()).isEqualTo((Object)this.node1);
            Assertions.assertThat((List)executionInfo.getErrors()).isEmpty();
            Assertions.assertThat((Map)executionInfo.getIncomingPayload()).isEmpty();
            Assertions.assertThat((int)executionInfo.getSpeculativeExecutionCount()).isEqualTo(0);
            Assertions.assertThat((int)executionInfo.getSuccessfulExecutionIndex()).isEqualTo(0);
            Assertions.assertThat((List)executionInfo.getWarnings()).isEmpty();
        }
    }
}

