package org.apache.druid.segment.realtime.firehose;

import com.fasterxml.jackson.annotation.JacksonInject;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.io.CountingInputStream;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Iterator;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import javax.annotation.Nullable;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.Consumes;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.Response;
import org.apache.druid.concurrent.Threads;
import org.apache.druid.data.input.Firehose;
import org.apache.druid.data.input.FirehoseFactory;
import org.apache.druid.data.input.InputRow;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.Smile;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.emitter.EmittingLogger;
import org.apache.druid.server.metrics.EventReceiverFirehoseMetric;
import org.apache.druid.server.metrics.EventReceiverFirehoseRegister;
import org.apache.druid.server.security.Action;
import org.apache.druid.server.security.AuthorizationUtils;
import org.apache.druid.server.security.AuthorizerMapper;
import org.apache.druid.server.security.Resource;
import org.apache.druid.server.security.ResourceAction;
import org.apache.kafka.common.security.JaasUtils;
import org.joda.time.DateTime;

@Deprecated
/* loaded from: input_file:org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory.class */
public class EventReceiverFirehoseFactory implements FirehoseFactory<InputRowParser<Map<String, Object>>> {
    public static final int MAX_FIREHOSE_PRODUCERS = 10000;
    private static final int DEFAULT_BUFFER_SIZE = 100000;
    private final String serviceName;
    private final int bufferSize;
    private final long maxIdleTimeMillis;
    private final ChatHandlerProvider chatHandlerProvider;
    private final ObjectMapper jsonMapper;
    private final ObjectMapper smileMapper;
    private final EventReceiverFirehoseRegister eventReceiverFirehoseRegister;
    private final AuthorizerMapper authorizerMapper;
    private static final EmittingLogger log = new EmittingLogger(EventReceiverFirehoseFactory.class);
    private static final Object FIREHOSE_CLOSED = new Object();

    @VisibleForTesting
    /* loaded from: input_file:org/apache/druid/segment/realtime/firehose/EventReceiverFirehoseFactory$EventReceiverFirehose.class */
    public class EventReceiverFirehose implements ChatHandler, Firehose, EventReceiverFirehoseMetric {

        @Nullable
        @GuardedBy("this")
        private Thread delayedCloseExecutor;
        private final BlockingQueue<Object> buffer;
        private final InputRowParser<Map<String, Object>> parser;

        @Nullable
        private volatile Long idleCloseTimeNs;
        private volatile boolean closed = false;

        @Nullable
        private InputRow nextRow = null;
        private boolean rowsRunOut = false;
        private final AtomicLong bytesReceived = new AtomicLong(0);
        private final AtomicLong lastBufferAddFailLoggingTimeNs = new AtomicLong(System.nanoTime());
        private final ConcurrentHashMap<String, Long> producerSequences = new ConcurrentHashMap<>();

        @Nullable
        private volatile Long requestedShutdownTimeNs = null;

        EventReceiverFirehose(InputRowParser<Map<String, Object>> inputRowParser) {
            this.idleCloseTimeNs = null;
            this.buffer = new ArrayBlockingQueue(EventReceiverFirehoseFactory.this.bufferSize);
            this.parser = inputRowParser;
            if (EventReceiverFirehoseFactory.this.maxIdleTimeMillis != Long.MAX_VALUE) {
                this.idleCloseTimeNs = Long.valueOf(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(EventReceiverFirehoseFactory.this.maxIdleTimeMillis));
                synchronized (this) {
                    createDelayedCloseExecutor();
                }
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        @Nullable
        public synchronized Thread getDelayedCloseExecutor() {
            return this.delayedCloseExecutor;
        }

        @GuardedBy("this")
        private Thread createDelayedCloseExecutor() {
            Thread thread = new Thread(() -> {
                while (!this.closed) {
                    if (this.idleCloseTimeNs == null && this.requestedShutdownTimeNs == null) {
                        EventReceiverFirehoseFactory.log.error("Either idleCloseTimeNs or requestedShutdownTimeNs must be non-null. Please file a bug at https://github.com/apache/druid/issues", new Object[0]);
                    }
                    if (this.idleCloseTimeNs != null && this.idleCloseTimeNs.longValue() - System.nanoTime() <= 0) {
                        EventReceiverFirehoseFactory.log.info("Firehose has been idle for %d ms, closing.", Long.valueOf(EventReceiverFirehoseFactory.this.maxIdleTimeMillis));
                        close();
                    } else if (this.requestedShutdownTimeNs != null && this.requestedShutdownTimeNs.longValue() - System.nanoTime() <= 0) {
                        EventReceiverFirehoseFactory.log.info("Closing Firehose after a shutdown request", new Object[0]);
                        close();
                    }
                    try {
                        Threads.sleepFor(1L, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                    }
                }
            }, "event-receiver-firehose-closer");
            thread.setDaemon(true);
            this.delayedCloseExecutor = thread;
            thread.start();
            return thread;
        }

        @Path("/push-events")
        @Consumes({"application/json", SmileMediaTypes.APPLICATION_JACKSON_SMILE})
        @POST
        @Produces({"application/json", SmileMediaTypes.APPLICATION_JACKSON_SMILE})
        public Response addAll(InputStream inputStream, @Context HttpServletRequest httpServletRequest) throws JsonProcessingException {
            this.idleCloseTimeNs = Long.valueOf(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(EventReceiverFirehoseFactory.this.maxIdleTimeMillis));
            if (!AuthorizationUtils.authorizeResourceAction(httpServletRequest, new ResourceAction(Resource.STATE_RESOURCE, Action.WRITE), EventReceiverFirehoseFactory.this.authorizerMapper).isAllowed()) {
                return Response.status(403).build();
            }
            String contentType = httpServletRequest.getContentType();
            boolean equals = SmileMediaTypes.APPLICATION_JACKSON_SMILE.equals(contentType);
            String str = equals ? SmileMediaTypes.APPLICATION_JACKSON_SMILE : "application/json";
            ObjectMapper objectMapper = equals ? EventReceiverFirehoseFactory.this.smileMapper : EventReceiverFirehoseFactory.this.jsonMapper;
            Response checkProducerSequence = checkProducerSequence(httpServletRequest, contentType, objectMapper);
            if (checkProducerSequence != null) {
                return checkProducerSequence;
            }
            CountingInputStream countingInputStream = new CountingInputStream(inputStream);
            try {
                try {
                    Collection collection = (Collection) objectMapper.readValue(countingInputStream, new TypeReference<Collection<Map<String, Object>>>() { // from class: org.apache.druid.segment.realtime.firehose.EventReceiverFirehoseFactory.EventReceiverFirehose.1
                    });
                    this.bytesReceived.addAndGet(countingInputStream.getCount());
                    EventReceiverFirehoseFactory.log.debug("Adding %,d events to firehose: %s", Integer.valueOf(collection.size()), EventReceiverFirehoseFactory.this.serviceName);
                    ArrayList arrayList = new ArrayList();
                    Iterator it2 = collection.iterator();
                    while (it2.hasNext()) {
                        arrayList.addAll(this.parser.parseBatch((Map) it2.next()));
                    }
                    try {
                        addRows(arrayList);
                        return Response.ok(objectMapper.writeValueAsString(ImmutableMap.of("eventCount", Integer.valueOf(collection.size()))), str).build();
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                } catch (IOException e2) {
                    Response build = Response.serverError().entity(ImmutableMap.of("error", e2.getMessage())).build();
                    this.bytesReceived.addAndGet(countingInputStream.getCount());
                    return build;
                }
            } catch (Throwable th) {
                this.bytesReceived.addAndGet(countingInputStream.getCount());
                throw th;
            }
        }

        @Override // org.apache.druid.data.input.Firehose
        public boolean hasMore() {
            if (this.rowsRunOut) {
                return false;
            }
            if (this.nextRow != null) {
                return true;
            }
            try {
                Object take = this.buffer.take();
                if (take == EventReceiverFirehoseFactory.FIREHOSE_CLOSED) {
                    this.rowsRunOut = true;
                    return false;
                }
                this.nextRow = (InputRow) take;
                return true;
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        @Override // org.apache.druid.data.input.Firehose
        @Nullable
        public InputRow nextRow() {
            InputRow inputRow = this.nextRow;
            if (inputRow == null) {
                throw new NoSuchElementException();
            }
            this.nextRow = null;
            return inputRow;
        }

        @Override // org.apache.druid.server.metrics.EventReceiverFirehoseMetric
        public int getCurrentBufferSize() {
            return this.buffer.size();
        }

        @Override // org.apache.druid.server.metrics.EventReceiverFirehoseMetric
        public int getCapacity() {
            return EventReceiverFirehoseFactory.this.bufferSize;
        }

        @Override // org.apache.druid.server.metrics.EventReceiverFirehoseMetric
        public long getBytesReceived() {
            return this.bytesReceived.get();
        }

        @Override // org.apache.druid.data.input.Firehose, java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            EventReceiverFirehoseFactory.log.info("Firehose closing.", new Object[0]);
            Uninterruptibles.putUninterruptibly(this.buffer, EventReceiverFirehoseFactory.FIREHOSE_CLOSED);
            EventReceiverFirehoseFactory.this.eventReceiverFirehoseRegister.unregister(EventReceiverFirehoseFactory.this.serviceName);
            if (EventReceiverFirehoseFactory.this.chatHandlerProvider != null) {
                EventReceiverFirehoseFactory.this.chatHandlerProvider.unregister(EventReceiverFirehoseFactory.this.serviceName);
            }
            if (this.delayedCloseExecutor == null || this.delayedCloseExecutor.equals(Thread.currentThread())) {
                return;
            }
            this.delayedCloseExecutor.interrupt();
        }

        @VisibleForTesting
        void addRows(Iterable<InputRow> iterable) throws InterruptedException {
            for (InputRow inputRow : iterable) {
                boolean z = false;
                while (!this.closed && !z) {
                    z = this.buffer.offer(inputRow, 500L, TimeUnit.MILLISECONDS);
                    if (!z) {
                        long nanoTime = System.nanoTime();
                        long j = this.lastBufferAddFailLoggingTimeNs.get();
                        if (nanoTime - j > TimeUnit.SECONDS.toNanos(10L) && this.lastBufferAddFailLoggingTimeNs.compareAndSet(j, nanoTime)) {
                            EventReceiverFirehoseFactory.log.warn("Failed to add event to buffer with current size [%s] . Retrying...", Integer.valueOf(this.buffer.size()));
                        }
                    }
                }
                if (!z) {
                    throw new IllegalStateException("Cannot add events to closed firehose!");
                }
            }
        }

        @Path("/shutdown")
        @Consumes({"application/json", SmileMediaTypes.APPLICATION_JACKSON_SMILE})
        @POST
        @Produces({"application/json", SmileMediaTypes.APPLICATION_JACKSON_SMILE})
        public Response shutdown(@QueryParam("shutoffTime") String str, @Context HttpServletRequest httpServletRequest) {
            Thread thread;
            if (!AuthorizationUtils.authorizeResourceAction(httpServletRequest, new ResourceAction(Resource.STATE_RESOURCE, Action.WRITE), EventReceiverFirehoseFactory.this.authorizerMapper).isAllowed()) {
                return Response.status(403).build();
            }
            try {
                DateTime nowUtc = str == null ? DateTimes.nowUtc() : DateTimes.of(str);
                EventReceiverFirehoseFactory.log.info("Setting Firehose shutoffTime to %s", str);
                this.requestedShutdownTimeNs = Long.valueOf(System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(Math.max(nowUtc.getMillis() - System.currentTimeMillis(), 0L)));
                boolean z = true;
                synchronized (this) {
                    thread = this.delayedCloseExecutor;
                    if (thread == null) {
                        thread = createDelayedCloseExecutor();
                        z = false;
                    }
                }
                if (z) {
                    thread.interrupt();
                }
                return Response.ok().build();
            } catch (IllegalArgumentException e) {
                return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", e.getMessage())).build();
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        @VisibleForTesting
        public boolean isClosed() {
            return this.closed;
        }

        @Nullable
        private Response checkProducerSequence(HttpServletRequest httpServletRequest, String str, ObjectMapper objectMapper) {
            String header = httpServletRequest.getHeader("X-Firehose-Producer-Id");
            if (header == null) {
                return null;
            }
            String header2 = httpServletRequest.getHeader("X-Firehose-Producer-Seq");
            if (header2 == null) {
                return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "Producer sequence value is missing")).build();
            }
            Long computeIfAbsent = this.producerSequences.computeIfAbsent(header, str2 -> {
                return Long.MIN_VALUE;
            });
            if (this.producerSequences.size() >= 10000) {
                return Response.status(Response.Status.FORBIDDEN).entity(ImmutableMap.of("error", "Too many individual producer IDs for this firehose.  Max is 10000")).build();
            }
            try {
                Long valueOf = Long.valueOf(Long.parseLong(header2));
                while (valueOf.longValue() > computeIfAbsent.longValue()) {
                    if (this.producerSequences.replace(header, computeIfAbsent, valueOf)) {
                        return null;
                    }
                    computeIfAbsent = this.producerSequences.get(header);
                }
                return Response.ok(objectMapper.writeValueAsString(ImmutableMap.of("eventCount", false, "skipped", true)), str).build();
            } catch (JsonProcessingException e) {
                throw new RuntimeException(e);
            } catch (NumberFormatException e2) {
                return Response.status(Response.Status.BAD_REQUEST).entity(ImmutableMap.of("error", "Producer sequence must be a number")).build();
            }
        }
    }

    @JsonCreator
    public EventReceiverFirehoseFactory(@JsonProperty("serviceName") String str, @JsonProperty("bufferSize") Integer num, @JsonProperty("maxIdleTime") @Nullable Long l, @JacksonInject ChatHandlerProvider chatHandlerProvider, @Json @JacksonInject ObjectMapper objectMapper, @Smile @JacksonInject ObjectMapper objectMapper2, @JacksonInject EventReceiverFirehoseRegister eventReceiverFirehoseRegister, @JacksonInject AuthorizerMapper authorizerMapper) {
        Preconditions.checkNotNull(str, JaasUtils.SERVICE_NAME);
        this.serviceName = str;
        this.bufferSize = (num == null || num.intValue() <= 0) ? 100000 : num.intValue();
        this.maxIdleTimeMillis = (l == null || l.longValue() <= 0) ? Long.MAX_VALUE : l.longValue();
        this.chatHandlerProvider = chatHandlerProvider;
        this.jsonMapper = objectMapper;
        this.smileMapper = objectMapper2;
        this.eventReceiverFirehoseRegister = eventReceiverFirehoseRegister;
        this.authorizerMapper = authorizerMapper;
    }

    @Override // org.apache.druid.data.input.FirehoseFactory
    public Firehose connect(InputRowParser<Map<String, Object>> inputRowParser, File file) {
        log.info("Connecting firehose: %s", this.serviceName);
        EventReceiverFirehose eventReceiverFirehose = new EventReceiverFirehose(inputRowParser);
        if (this.chatHandlerProvider != null) {
            log.info("Found chathandler of class[%s]", this.chatHandlerProvider.getClass().getName());
            this.chatHandlerProvider.register(this.serviceName, eventReceiverFirehose);
            int lastIndexOf = this.serviceName.lastIndexOf(58);
            if (lastIndexOf > 0) {
                this.chatHandlerProvider.register(this.serviceName.substring(lastIndexOf + 1), eventReceiverFirehose);
            }
        } else {
            log.warn("No chathandler detected", new Object[0]);
        }
        this.eventReceiverFirehoseRegister.register(this.serviceName, eventReceiverFirehose);
        return eventReceiverFirehose;
    }

    @JsonProperty
    public String getServiceName() {
        return this.serviceName;
    }

    @JsonProperty
    public int getBufferSize() {
        return this.bufferSize;
    }

    @JsonProperty("maxIdleTime")
    public long getMaxIdleTimeMillis() {
        return this.maxIdleTimeMillis;
    }
}
