package org.apache.druid.segment.realtime.firehose;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import org.apache.commons.io.IOUtils;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.jackson.DefaultObjectMapper;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.java.util.common.concurrent.Execs;
import org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory;
import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.druid.server.security.AllowAllAuthenticator;
import org.apache.druid.server.security.AuthConfig;
import org.apache.druid.server.security.AuthTestUtils;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseTest.class */
public class EventReceiverFirehoseTest {
    private static final int CAPACITY = 300;
    private static final int NUM_EVENTS = 100;
    private static final long MAX_IDLE_TIME_MILLIS = TimeUnit.SECONDS.toMillis(20);
    private static final String SERVICE_NAME = "test_firehose";
    private EventReceiverFirehoseFactory eventReceiverFirehoseFactory;
    private EventReceiverFirehoseFactory.EventReceiverFirehose firehose;
    private HttpServletRequest req;
    private final String inputRow = "[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]";
    private EventReceiverFirehoseRegister register = new EventReceiverFirehoseRegister();

    @Before
    public void setUp() {
        this.req = (HttpServletRequest) EasyMock.createMock(HttpServletRequest.class);
        this.eventReceiverFirehoseFactory = new EventReceiverFirehoseFactory(SERVICE_NAME, 300, Long.valueOf(MAX_IDLE_TIME_MILLIS), null, new DefaultObjectMapper(), new DefaultObjectMapper(), this.register, AuthTestUtils.TEST_AUTHORIZER_MAPPER);
        this.firehose = (EventReceiverFirehoseFactory.EventReceiverFirehose) this.eventReceiverFirehoseFactory.connect(new MapInputRowParser(new JSONParseSpec(new TimestampSpec(TimestampSpec.DEFAULT_COLUMN, "auto", null), new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("d1"))), null, null, null)), null);
    }

    @Test(timeout = 60000)
    public void testSingleThread() throws IOException, InterruptedException {
        for (int i = 0; i < 100; i++) {
            setUpRequestExpectations(null, null);
            InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
            this.firehose.addAll(inputStream, this.req);
            Assert.assertEquals(i + 1, this.firehose.getCurrentBufferSize());
            inputStream.close();
        }
        EasyMock.verify(this.req);
        Iterable<Map.Entry<String, EventReceiverFirehoseMetric>> metrics = this.register.getMetrics();
        Assert.assertEquals(1L, Iterables.size(metrics));
        Assert.assertEquals(SERVICE_NAME, ((Map.Entry) Iterables.getLast(metrics)).getKey());
        Assert.assertEquals(300L, ((EventReceiverFirehoseMetric) r0.getValue()).getCapacity());
        Assert.assertEquals(300L, this.firehose.getCapacity());
        Assert.assertEquals(100L, ((EventReceiverFirehoseMetric) r0.getValue()).getCurrentBufferSize());
        Assert.assertEquals(100L, this.firehose.getCurrentBufferSize());
        for (int i2 = 99; i2 >= 0; i2--) {
            Assert.assertTrue(this.firehose.hasMore());
            Assert.assertNotNull(this.firehose.nextRow());
            Assert.assertEquals(i2, this.firehose.getCurrentBufferSize());
        }
        Assert.assertEquals(300L, ((EventReceiverFirehoseMetric) r0.getValue()).getCapacity());
        Assert.assertEquals(300L, this.firehose.getCapacity());
        Assert.assertEquals(0L, ((EventReceiverFirehoseMetric) r0.getValue()).getCurrentBufferSize());
        Assert.assertEquals(0L, this.firehose.getCurrentBufferSize());
        this.firehose.close();
        Assert.assertFalse(this.firehose.hasMore());
        Assert.assertEquals(0L, Iterables.size(this.register.getMetrics()));
        awaitDelayedExecutorThreadTerminated();
    }

    @Test(timeout = 60000)
    public void testMultipleThreads() throws InterruptedException, IOException, TimeoutException, ExecutionException {
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect(this.req.getContentType()).andReturn("application/json").times(200);
        EasyMock.expect(this.req.getHeader("X-Firehose-Producer-Id")).andReturn(null).times(200);
        EasyMock.replay(this.req);
        ExecutorService singleThreaded = Execs.singleThreaded("single_thread");
        Future submit = singleThreaded.submit(new Callable<Boolean>() { // from class: org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseTest.1
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                for (int i = 0; i < 100; i++) {
                    InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
                    EventReceiverFirehoseTest.this.firehose.addAll(inputStream, EventReceiverFirehoseTest.this.req);
                    inputStream.close();
                }
                return true;
            }
        });
        for (int i = 0; i < 100; i++) {
            InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
            this.firehose.addAll(inputStream, this.req);
            inputStream.close();
        }
        submit.get(10L, TimeUnit.SECONDS);
        EasyMock.verify(this.req);
        Iterable<Map.Entry<String, EventReceiverFirehoseMetric>> metrics = this.register.getMetrics();
        Assert.assertEquals(1L, Iterables.size(metrics));
        Assert.assertEquals(SERVICE_NAME, ((Map.Entry) Iterables.getLast(metrics)).getKey());
        Assert.assertEquals(300L, ((EventReceiverFirehoseMetric) r0.getValue()).getCapacity());
        Assert.assertEquals(300L, this.firehose.getCapacity());
        Assert.assertEquals(200L, ((EventReceiverFirehoseMetric) r0.getValue()).getCurrentBufferSize());
        Assert.assertEquals(200L, this.firehose.getCurrentBufferSize());
        for (int i2 = 199; i2 >= 0; i2--) {
            Assert.assertTrue(this.firehose.hasMore());
            Assert.assertNotNull(this.firehose.nextRow());
            Assert.assertEquals(i2, this.firehose.getCurrentBufferSize());
        }
        Assert.assertEquals(300L, ((EventReceiverFirehoseMetric) r0.getValue()).getCapacity());
        Assert.assertEquals(300L, this.firehose.getCapacity());
        Assert.assertEquals(0L, ((EventReceiverFirehoseMetric) r0.getValue()).getCurrentBufferSize());
        Assert.assertEquals(0L, this.firehose.getCurrentBufferSize());
        this.firehose.close();
        Assert.assertFalse(this.firehose.hasMore());
        Assert.assertEquals(0L, Iterables.size(this.register.getMetrics()));
        awaitDelayedExecutorThreadTerminated();
        singleThreaded.shutdownNow();
    }

    @Test(expected = ISE.class)
    public void testDuplicateRegistering() {
    }

    @Test(timeout = 60000)
    public void testShutdownWithPrevTime() throws Exception {
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay(this.req);
        this.firehose.shutdown(DateTimes.nowUtc().minusMinutes(2).toString(), this.req);
        awaitFirehoseClosed();
        awaitDelayedExecutorThreadTerminated();
    }

    private void awaitFirehoseClosed() throws InterruptedException {
        while (!this.firehose.isClosed()) {
            Thread.sleep(50L);
        }
    }

    private void awaitDelayedExecutorThreadTerminated() throws InterruptedException {
        this.firehose.getDelayedCloseExecutor().join();
    }

    @Test(timeout = 60000)
    public void testShutdown() throws Exception {
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.replay(this.req);
        this.firehose.shutdown(DateTimes.nowUtc().plusMillis(100).toString(), this.req);
        awaitFirehoseClosed();
        awaitDelayedExecutorThreadTerminated();
    }

    @Test
    public void testProducerSequence() throws IOException {
        for (int i = 0; i < 100; i++) {
            setUpRequestExpectations("producer", String.valueOf(i));
            InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
            this.firehose.addAll(inputStream, this.req);
            Assert.assertEquals(i + 1, this.firehose.getCurrentBufferSize());
            inputStream.close();
        }
        EasyMock.verify(this.req);
        Iterable<Map.Entry<String, EventReceiverFirehoseMetric>> metrics = this.register.getMetrics();
        Assert.assertEquals(1L, Iterables.size(metrics));
        Assert.assertEquals(SERVICE_NAME, ((Map.Entry) Iterables.getLast(metrics)).getKey());
        Assert.assertEquals(300L, ((EventReceiverFirehoseMetric) r0.getValue()).getCapacity());
        Assert.assertEquals(300L, this.firehose.getCapacity());
        Assert.assertEquals(100L, ((EventReceiverFirehoseMetric) r0.getValue()).getCurrentBufferSize());
        Assert.assertEquals(100L, this.firehose.getCurrentBufferSize());
        for (int i2 = 99; i2 >= 0; i2--) {
            Assert.assertTrue(this.firehose.hasMore());
            Assert.assertNotNull(this.firehose.nextRow());
            Assert.assertEquals(i2, this.firehose.getCurrentBufferSize());
        }
        Assert.assertEquals(300L, ((EventReceiverFirehoseMetric) r0.getValue()).getCapacity());
        Assert.assertEquals(300L, this.firehose.getCapacity());
        Assert.assertEquals(0L, ((EventReceiverFirehoseMetric) r0.getValue()).getCurrentBufferSize());
        Assert.assertEquals(0L, this.firehose.getCurrentBufferSize());
        this.firehose.close();
        Assert.assertFalse(this.firehose.hasMore());
        Assert.assertEquals(0L, Iterables.size(this.register.getMetrics()));
    }

    @Test
    public void testLowProducerSequence() throws IOException {
        for (int i = 0; i < 100; i++) {
            setUpRequestExpectations("producer", "1");
            InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
            Assert.assertEquals(Response.Status.OK.getStatusCode(), this.firehose.addAll(inputStream, this.req).getStatus());
            Assert.assertEquals(1L, this.firehose.getCurrentBufferSize());
            inputStream.close();
        }
        EasyMock.verify(this.req);
        this.firehose.close();
    }

    @Test
    public void testMissingProducerSequence() throws IOException {
        setUpRequestExpectations("producer", null);
        InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
        Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), this.firehose.addAll(inputStream, this.req).getStatus());
        inputStream.close();
        EasyMock.verify(this.req);
        this.firehose.close();
    }

    @Test
    public void testTooManyProducerIds() throws IOException {
        for (int i = 0; i < 9999; i++) {
            setUpRequestExpectations("producer-" + i, "0");
            InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
            Assert.assertEquals(Response.Status.OK.getStatusCode(), this.firehose.addAll(inputStream, this.req).getStatus());
            inputStream.close();
            Assert.assertTrue(this.firehose.hasMore());
            Assert.assertNotNull(this.firehose.nextRow());
        }
        setUpRequestExpectations("toomany", "0");
        InputStream inputStream2 = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
        Assert.assertEquals(Response.Status.FORBIDDEN.getStatusCode(), this.firehose.addAll(inputStream2, this.req).getStatus());
        inputStream2.close();
        EasyMock.verify(this.req);
        this.firehose.close();
    }

    @Test
    public void testNaNProducerSequence() throws IOException {
        setUpRequestExpectations("producer", "foo");
        InputStream inputStream = IOUtils.toInputStream("[{\n  \"timestamp\":123,\n  \"d1\":\"v1\"\n}]", StandardCharsets.UTF_8);
        Assert.assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), this.firehose.addAll(inputStream, this.req).getStatus());
        inputStream.close();
        EasyMock.verify(this.req);
        this.firehose.close();
    }

    private void setUpRequestExpectations(String str, String str2) {
        EasyMock.reset(this.req);
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_ALLOW_UNSECURED_PATH)).andReturn(null).anyTimes();
        EasyMock.expect(this.req.getAttribute(AuthConfig.DRUID_AUTHENTICATION_RESULT)).andReturn(AllowAllAuthenticator.ALLOW_ALL_RESULT).anyTimes();
        this.req.setAttribute(AuthConfig.DRUID_AUTHORIZATION_CHECKED, true);
        EasyMock.expectLastCall().anyTimes();
        EasyMock.expect(this.req.getContentType()).andReturn("application/json");
        EasyMock.expect(this.req.getHeader("X-Firehose-Producer-Id")).andReturn(str);
        if (str != null) {
            EasyMock.expect(this.req.getHeader("X-Firehose-Producer-Seq")).andReturn(str2);
        }
        EasyMock.replay(this.req);
    }
}
