package org.apache.druid.frame.file;

import com.google.common.math.LongMath;
import com.google.common.primitives.Ints;
import com.google.common.util.concurrent.ListenableFuture;
import it.unimi.dsi.fastutil.bytes.ByteArrays;
import java.io.File;
import java.io.IOException;
import java.math.RoundingMode;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.util.ArrayList;
import org.apache.druid.frame.FrameType;
import org.apache.druid.frame.channel.ReadableByteChunksFrameChannel;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.frame.testutil.FrameSequenceBuilder;
import org.apache.druid.frame.testutil.FrameTestUtil;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.http.client.response.ClientResponse;
import org.apache.druid.segment.StorageAdapter;
import org.apache.druid.segment.TestIndex;
import org.apache.druid.segment.incremental.IncrementalIndexStorageAdapter;
import org.apache.druid.testing.InitializedNullHandlingTest;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.hamcrest.MatcherAssert;
import org.jboss.netty.buffer.ByteBufferBackedChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.handler.codec.http.DefaultHttpChunk;
import org.jboss.netty.handler.codec.http.DefaultHttpResponse;
import org.jboss.netty.handler.codec.http.HttpChunk;
import org.jboss.netty.handler.codec.http.HttpResponse;
import org.jboss.netty.handler.codec.http.HttpResponseStatus;
import org.jboss.netty.handler.codec.http.HttpVersion;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.internal.matchers.ThrowableMessageMatcher;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
/* loaded from: input_file:org/apache/druid/frame/file/FrameFileHttpResponseHandlerTest.class */
public class FrameFileHttpResponseHandlerTest extends InitializedNullHandlingTest {

    @Rule
    public TemporaryFolder temporaryFolder = new TemporaryFolder();
    private final int maxRowsPerFrame;
    private StorageAdapter adapter;
    private File file;
    private ReadableByteChunksFrameChannel channel;
    private FrameFileHttpResponseHandler handler;

    public FrameFileHttpResponseHandlerTest(int i) {
        this.maxRowsPerFrame = i;
    }

    @Parameterized.Parameters(name = "maxRowsPerFrame = {0}")
    public static Iterable<Object[]> constructorFeeder() {
        ArrayList arrayList = new ArrayList();
        for (int i : new int[]{1, 50, Integer.MAX_VALUE}) {
            arrayList.add(new Object[]{Integer.valueOf(i)});
        }
        return arrayList;
    }

    @Before
    public void setUp() throws IOException {
        this.adapter = new IncrementalIndexStorageAdapter(TestIndex.getIncrementalTestIndex());
        this.file = FrameTestUtil.writeFrameFile(FrameSequenceBuilder.fromAdapter(this.adapter).maxRowsPerFrame(this.maxRowsPerFrame).frameType(FrameType.ROW_BASED).frames(), this.temporaryFolder.newFile());
        this.channel = ReadableByteChunksFrameChannel.create("test", false);
        this.handler = new FrameFileHttpResponseHandler(this.channel);
    }

    @Test
    public void testNonChunkedResponse() throws Exception {
        ClientResponse<FrameFilePartialFetch> handleResponse = this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, Files.readAllBytes(this.file.toPath())), null);
        Assert.assertFalse(handleResponse.isFinished());
        Assert.assertTrue(handleResponse.isContinueReading());
        Assert.assertFalse(handleResponse.getObj().isExceptionCaught());
        Assert.assertFalse(handleResponse.getObj().isLastFetch());
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(handleResponse);
        Assert.assertTrue(done.isFinished());
        Assert.assertTrue(done.isContinueReading());
        Assert.assertFalse(done.getObj().isExceptionCaught());
        Assert.assertFalse(done.getObj().isLastFetch());
        ListenableFuture<?> backpressureFuture = done.getObj().backpressureFuture();
        Assert.assertFalse(backpressureFuture.isDone());
        this.channel.doneWriting();
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter(this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel(this.channel, FrameReader.create(this.adapter.getRowSignature())));
        Assert.assertTrue(backpressureFuture.isDone());
    }

    @Test
    public void testEmptyResponseWithoutLastFetchHeader() {
        ClientResponse<FrameFilePartialFetch> handleResponse = this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY), null);
        Assert.assertFalse(handleResponse.isFinished());
        Assert.assertTrue(handleResponse.isContinueReading());
        Assert.assertFalse(handleResponse.getObj().isExceptionCaught());
        Assert.assertFalse(handleResponse.getObj().isLastFetch());
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(handleResponse);
        Assert.assertTrue(done.isFinished());
        Assert.assertTrue(done.isContinueReading());
        Assert.assertFalse(done.getObj().isExceptionCaught());
        Assert.assertFalse(done.getObj().isLastFetch());
        Assert.assertTrue(done.getObj().backpressureFuture().isDone());
    }

    @Test
    public void testEmptyResponseWithLastFetchHeader() {
        HttpResponse makeResponse = makeResponse(HttpResponseStatus.OK, ByteArrays.EMPTY_ARRAY);
        makeResponse.headers().set(FrameFileHttpResponseHandler.HEADER_LAST_FETCH_NAME, "yes");
        ClientResponse<FrameFilePartialFetch> handleResponse = this.handler.handleResponse(makeResponse, null);
        Assert.assertFalse(handleResponse.isFinished());
        Assert.assertTrue(handleResponse.isContinueReading());
        Assert.assertFalse(handleResponse.getObj().isExceptionCaught());
        Assert.assertTrue(handleResponse.getObj().isLastFetch());
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(handleResponse);
        Assert.assertTrue(done.isFinished());
        Assert.assertTrue(done.isContinueReading());
        Assert.assertFalse(done.getObj().isExceptionCaught());
        Assert.assertTrue(done.getObj().isLastFetch());
        Assert.assertTrue(done.getObj().backpressureFuture().isDone());
    }

    @Test
    public void testChunkedResponse() throws Exception {
        byte[] readAllBytes = Files.readAllBytes(this.file.toPath());
        ClientResponse<FrameFilePartialFetch> handleResponse = this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, byteSlice(readAllBytes, 0, 99)), null);
        Assert.assertFalse(handleResponse.isFinished());
        for (int i = 99; i < readAllBytes.length; i += 99) {
            handleResponse = this.handler.handleChunk(handleResponse, makeChunk(byteSlice(readAllBytes, i, 99)), i / 99);
            Assert.assertFalse(handleResponse.isFinished());
            Assert.assertFalse(handleResponse.getObj().isExceptionCaught());
            Assert.assertFalse(handleResponse.getObj().isLastFetch());
        }
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(handleResponse);
        Assert.assertTrue(done.isFinished());
        Assert.assertTrue(done.isContinueReading());
        Assert.assertFalse(handleResponse.getObj().isExceptionCaught());
        Assert.assertFalse(handleResponse.getObj().isLastFetch());
        ListenableFuture<?> backpressureFuture = handleResponse.getObj().backpressureFuture();
        Assert.assertFalse(backpressureFuture.isDone());
        this.channel.doneWriting();
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter(this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel(this.channel, FrameReader.create(this.adapter.getRowSignature())));
        Assert.assertTrue(backpressureFuture.isDone());
    }

    @Test
    public void testServerErrorResponse() {
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(this.handler.handleResponse(makeResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, StringUtils.toUtf8("Oh no!")), null));
        Assert.assertTrue(done.isFinished());
        Assert.assertTrue(done.isContinueReading());
        Assert.assertTrue(done.getObj().isExceptionCaught());
        Throwable exceptionCaught = done.getObj().getExceptionCaught();
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Server for [test] returned [500 Internal Server Error]")));
        Assert.assertFalse(this.channel.isErrorOrFinished());
    }

    @Test
    public void testChunkedServerErrorResponse() {
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(this.handler.handleChunk(this.handler.handleResponse(makeResponse(HttpResponseStatus.INTERNAL_SERVER_ERROR, StringUtils.toUtf8("Oh ")), null), makeChunk(StringUtils.toUtf8("no!")), 1L));
        Assert.assertTrue(done.isFinished());
        Assert.assertTrue(done.isContinueReading());
        Assert.assertTrue(done.getObj().isExceptionCaught());
        Throwable exceptionCaught = done.getObj().getExceptionCaught();
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Server for [test] returned [500 Internal Server Error]")));
        Assert.assertFalse(this.channel.isErrorOrFinished());
    }

    @Test
    public void testCaughtExceptionDuringChunkedResponse() throws Exception {
        int checkedCast = Ints.checkedCast(LongMath.divide(this.file.length(), 4L, RoundingMode.CEILING));
        byte[] readAllBytes = Files.readAllBytes(this.file.toPath());
        ClientResponse<FrameFilePartialFetch> handleResponse = this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, byteSlice(readAllBytes, 0, checkedCast)), null);
        Assert.assertFalse(handleResponse.isFinished());
        ClientResponse<FrameFilePartialFetch> handleChunk = this.handler.handleChunk(handleResponse, makeChunk(byteSlice(readAllBytes, checkedCast, checkedCast)), 1L);
        this.handler.exceptionCaught(handleChunk, new ISE("Oh no!", new Object[0]));
        this.handler.handleChunk(handleChunk, makeChunk(byteSlice(readAllBytes, checkedCast * 2, checkedCast)), 2L);
        Assert.assertTrue(handleChunk.getObj().isExceptionCaught());
        Throwable exceptionCaught = handleChunk.getObj().getExceptionCaught();
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Oh no!")));
        Assert.assertFalse(this.channel.isErrorOrFinished());
        this.channel.addChunk(byteSlice(readAllBytes, checkedCast * 2, checkedCast));
        this.channel.addChunk(byteSlice(readAllBytes, checkedCast * 3, checkedCast));
        Assert.assertEquals(readAllBytes.length, this.channel.getBytesAdded());
        this.channel.doneWriting();
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter(this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel(this.channel, FrameReader.create(this.adapter.getRowSignature())));
    }

    @Test
    public void testCaughtExceptionDuringChunkedResponseRetryWithSameHandler() throws Exception {
        int checkedCast = Ints.checkedCast(LongMath.divide(this.file.length() - 100, 12L, RoundingMode.CEILING));
        byte[] readAllBytes = Files.readAllBytes(this.file.toPath());
        ClientResponse<FrameFilePartialFetch> done = this.handler.done(this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, byteSlice(readAllBytes, 0, 100)), null));
        Assert.assertEquals(100L, this.channel.getBytesAdded());
        Assert.assertTrue(done.isFinished());
        this.handler = new FrameFileHttpResponseHandler(this.channel);
        ClientResponse<FrameFilePartialFetch> handleResponse = this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, byteSlice(readAllBytes, 100, checkedCast * 3)), null);
        this.handler.exceptionCaught(handleResponse, new ISE("Oh no!", new Object[0]));
        ClientResponse<FrameFilePartialFetch> handleChunk = this.handler.handleChunk(handleResponse, makeChunk(byteSlice(readAllBytes, 100 + (checkedCast * 3), checkedCast * 3)), 2L);
        Assert.assertTrue(handleChunk.getObj().isExceptionCaught());
        Throwable exceptionCaught = handleChunk.getObj().getExceptionCaught();
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) CoreMatchers.instanceOf(IllegalStateException.class));
        MatcherAssert.assertThat(exceptionCaught, (Matcher<? super Throwable>) ThrowableMessageMatcher.hasMessage(CoreMatchers.equalTo("Oh no!")));
        ClientResponse<FrameFilePartialFetch> handleResponse2 = this.handler.handleResponse(makeResponse(HttpResponseStatus.OK, byteSlice(readAllBytes, 100, checkedCast * 4)), null);
        Assert.assertEquals(100 + (checkedCast * 4), this.channel.getBytesAdded());
        Assert.assertFalse(handleResponse2.isFinished());
        ClientResponse<FrameFilePartialFetch> handleChunk2 = this.handler.handleChunk(handleResponse2, makeChunk(byteSlice(readAllBytes, 100 + (checkedCast * 4), checkedCast * 4)), 1L);
        Assert.assertEquals(100 + (checkedCast * 8), this.channel.getBytesAdded());
        ClientResponse<FrameFilePartialFetch> done2 = this.handler.done(this.handler.handleChunk(handleChunk2, makeChunk(byteSlice(readAllBytes, 100 + (checkedCast * 8), checkedCast * 4)), 2L));
        Assert.assertTrue(done2.isFinished());
        Assert.assertFalse(done2.getObj().isExceptionCaught());
        Assert.assertEquals(readAllBytes.length, this.channel.getBytesAdded());
        this.channel.doneWriting();
        FrameTestUtil.assertRowsEqual(FrameTestUtil.readRowsFromAdapter(this.adapter, null, false), FrameTestUtil.readRowsFromFrameChannel(this.channel, FrameReader.create(this.adapter.getRowSignature())));
    }

    private static HttpResponse makeResponse(HttpResponseStatus httpResponseStatus, byte[] bArr) {
        final ByteBufferBackedChannelBuffer byteBufferBackedChannelBuffer = new ByteBufferBackedChannelBuffer(ByteBuffer.wrap(bArr));
        return new DefaultHttpResponse(HttpVersion.HTTP_1_1, httpResponseStatus) { // from class: org.apache.druid.frame.file.FrameFileHttpResponseHandlerTest.1
            @Override // org.jboss.netty.handler.codec.http.DefaultHttpMessage, org.jboss.netty.handler.codec.http.HttpMessage
            public ChannelBuffer getContent() {
                return byteBufferBackedChannelBuffer;
            }
        };
    }

    private static HttpChunk makeChunk(byte[] bArr) {
        return new DefaultHttpChunk(new ByteBufferBackedChannelBuffer(ByteBuffer.wrap(bArr)));
    }

    private static byte[] byteSlice(byte[] bArr, int i, int i2) {
        int min = Math.min(bArr.length - i, i2);
        byte[] bArr2 = new byte[min];
        System.arraycopy(bArr, i, bArr2, 0, min);
        return bArr2;
    }
}
