/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.response;

import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.RuntimeJsonMappingException;
import com.fasterxml.jackson.jakarta.rs.base.JsonMappingExceptionMapper;
import com.fasterxml.jackson.jakarta.rs.base.JsonParseExceptionMapper;
import com.google.auto.value.AutoValue;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.kafkarest.exceptions.BadRequestException;
import io.confluent.kafkarest.exceptions.ProduceRequestTooLargeException;
import io.confluent.kafkarest.exceptions.RestConstraintViolationExceptionMapper;
import io.confluent.kafkarest.exceptions.StatusCodeException;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.exceptions.v3.V3ExceptionMapper;
import io.confluent.kafkarest.requestlog.CustomLog;
import io.confluent.kafkarest.response.AutoValue_StreamingResponse_ErrorHolder;
import io.confluent.kafkarest.response.AutoValue_StreamingResponse_ResultHolder;
import io.confluent.kafkarest.response.ChunkedOutputFactory;
import io.confluent.kafkarest.response.JsonStream;
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.rest.exceptions.KafkaExceptionMapper;
import io.confluent.rest.exceptions.RestConstraintViolationException;
import io.confluent.rest.exceptions.WebApplicationExceptionMapper;
import jakarta.annotation.Nullable;
import jakarta.ws.rs.WebApplicationException;
import jakarta.ws.rs.container.AsyncResponse;
import jakarta.ws.rs.core.Response;
import jakarta.ws.rs.ext.ExceptionMapper;
import java.io.IOException;
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import org.glassfish.jersey.server.ChunkedOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamingResponse<T> {
    private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class);
    private static final int ONE_SECOND_MS = 1000;
    private static final CompositeErrorMapper EXCEPTION_MAPPER = new CompositeErrorMapper.Builder().putMapper(JsonMappingException.class, new JsonMappingExceptionMapper(), response -> Response.Status.BAD_REQUEST.getStatusCode(), response -> (String)response.getEntity()).putMapper(JsonParseException.class, new JsonParseExceptionMapper(), response -> Response.Status.BAD_REQUEST.getStatusCode(), response -> (String)response.getEntity()).putMapper(StatusCodeException.class, new V3ExceptionMapper(), response -> ((ErrorResponse)response.getEntity()).getErrorCode(), response -> ((ErrorResponse)response.getEntity()).getMessage()).putMapper(RestConstraintViolationException.class, new RestConstraintViolationExceptionMapper(), response -> ((ErrorResponse)response.getEntity()).getErrorCode(), response -> ((ErrorResponse)response.getEntity()).getMessage()).putMapper(WebApplicationException.class, new WebApplicationExceptionMapper(null), response -> ((ErrorMessage)response.getEntity()).getErrorCode(), response -> ((ErrorMessage)response.getEntity()).getMessage()).setDefaultMapper((ExceptionMapper<Throwable>)new KafkaExceptionMapper(null), response -> ((ErrorMessage)response.getEntity()).getErrorCode(), response -> ((ErrorMessage)response.getEntity()).getMessage()).build();
    private final ChunkedOutputFactory chunkedOutputFactory;
    private final Duration maxDuration;
    private final Duration gracePeriod;
    private final Instant streamStartTime;
    private final Clock clock;
    volatile boolean closingStarted = false;

    StreamingResponse(ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, Clock clock) {
        this.clock = clock;
        this.streamStartTime = clock.instant();
        this.chunkedOutputFactory = Objects.requireNonNull(chunkedOutputFactory);
        this.maxDuration = maxDuration;
        this.gracePeriod = gracePeriod;
    }

    public static <T> StreamingResponse<T> from(JsonStream<T> inputStream, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod) {
        return new InputStreamingResponse<T>(inputStream, chunkedOutputFactory, maxDuration, gracePeriod, Clock.systemUTC());
    }

    @VisibleForTesting
    static <T> StreamingResponse<T> fromWithClock(JsonStream<T> inputStream, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, Clock clock) {
        return new InputStreamingResponse<T>(inputStream, chunkedOutputFactory, maxDuration, gracePeriod, clock);
    }

    public final <O> StreamingResponse<O> compose(Function<? super T, ? extends CompletableFuture<O>> transform) {
        return new ComposingStreamingResponse(this, transform, this.chunkedOutputFactory, this.maxDuration, this.gracePeriod);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public final void resume(AsyncResponse asyncResponse, CustomLog.ProduceRecordErrorCounter produceRecordErrorCounter) {
        log.debug("Resuming StreamingResponse");
        AsyncResponseQueue responseQueue = new AsyncResponseQueue(this.chunkedOutputFactory);
        responseQueue.asyncResume(asyncResponse);
        ExecutorService executorService = null;
        try {
            while (!this.closingStarted && this.hasNext()) {
                if (!this.closingStarted && Duration.between(this.streamStartTime, this.clock.instant()).compareTo(this.maxDuration) > 0) {
                    if (executorService == null) {
                        executorService = Executors.newSingleThreadScheduledExecutor();
                        executorService.schedule(() -> this.closeAll(responseQueue), this.gracePeriod.toMillis(), TimeUnit.MILLISECONDS);
                    }
                    this.next();
                    responseQueue.push(CompletableFuture.completedFuture(ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(new StatusCodeException(Response.Status.REQUEST_TIMEOUT, "Streaming connection open for longer than allowed", "Connection will be closed.")))));
                    continue;
                }
                if (!this.closingStarted) {
                    responseQueue.push((CompletableFuture<ResultOrError>)this.next().handle((result, exception) -> this.handleNext((T)result, (Throwable)exception, produceRecordErrorCounter)));
                    continue;
                }
                break;
            }
        }
        catch (Exception e) {
            log.debug("Exception thrown when processing stream ", (Throwable)e);
            responseQueue.push(CompletableFuture.completedFuture(ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(e))));
        }
        finally {
            this.close();
            responseQueue.close();
            if (executorService != null) {
                executorService.shutdown();
                try {
                    if (!executorService.awaitTermination(1000L, TimeUnit.MILLISECONDS)) {
                        executorService.shutdownNow();
                    }
                }
                catch (InterruptedException e) {
                    log.debug("Exception thrown when attempting to shutdown executorService", (Throwable)e);
                }
            }
        }
    }

    private void closeAll(AsyncResponseQueue responseQueue) {
        this.closingStarted = true;
        this.close();
        responseQueue.close();
    }

    private ResultOrError handleNext(T result, @Nullable Throwable error, CustomLog.ProduceRecordErrorCounter produceRecordErrorCounter) {
        if (error == null) {
            return ResultOrError.result(result);
        }
        log.debug("Error processing streaming operation.", error);
        if (error.getCause() == null) {
            throw new IllegalArgumentException("Error cause is null", error);
        }
        int errorCode = EXCEPTION_MAPPER.toErrorResponse(error.getCause()).getErrorCode();
        produceRecordErrorCounter.incrementErrorCount(errorCode);
        return ResultOrError.error(EXCEPTION_MAPPER.toErrorResponse(error.getCause()));
    }

    public static ErrorResponse toErrorResponse(Throwable t) {
        return EXCEPTION_MAPPER.toErrorResponse(t);
    }

    abstract boolean hasNext();

    abstract void close();

    abstract CompletableFuture<T> next();

    private static class InputStreamingResponse<T>
    extends StreamingResponse<T> {
        private final JsonStream<T> inputStream;

        private InputStreamingResponse(JsonStream<T> inputStream, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod, Clock clock) {
            super(chunkedOutputFactory, maxDuration, gracePeriod, clock);
            this.inputStream = Objects.requireNonNull(inputStream);
        }

        @Override
        public void close() {
            try {
                this.inputStream.close();
            }
            catch (IOException e) {
                log.error("Error when closing the request stream", (Throwable)e);
            }
            catch (BadRequestException e) {
                log.error("Error when closing the request stream", e.getCause() != null ? e.getCause() : e);
            }
            catch (Throwable e) {
                log.error("Unknown error when closing the request stream.", e);
            }
        }

        @Override
        public boolean hasNext() {
            try {
                return this.inputStream.hasNext();
            }
            catch (RuntimeJsonMappingException jme) {
                throw new BadRequestException(String.format("Error processing JSON: %s", jme.getMessage()), jme);
            }
            catch (RuntimeException re) {
                throw new BadRequestException(String.format("Error processing message: %s", re.getMessage()), re);
            }
        }

        @Override
        public CompletableFuture<T> next() {
            try {
                return CompletableFuture.completedFuture(this.inputStream.nextValue());
            }
            catch (JsonMappingException e) {
                if (e.getCause() instanceof ProduceRequestTooLargeException) {
                    throw new BadRequestException(String.format("Error processing message: %s", e.getCause().getMessage()), e.getCause());
                }
                return CompletableFutures.failedFuture(e);
            }
            catch (Throwable e) {
                return CompletableFutures.failedFuture(e);
            }
        }
    }

    private static final class ComposingStreamingResponse<I, O>
    extends StreamingResponse<O> {
        private final StreamingResponse<I> streamingResponseInput;
        private final Function<? super I, ? extends CompletableFuture<O>> transform;

        private ComposingStreamingResponse(StreamingResponse<I> streamingResponseInput, Function<? super I, ? extends CompletableFuture<O>> transform, ChunkedOutputFactory chunkedOutputFactory, Duration maxDuration, Duration gracePeriod) {
            super(chunkedOutputFactory, maxDuration, gracePeriod, streamingResponseInput.clock);
            this.streamingResponseInput = Objects.requireNonNull(streamingResponseInput);
            this.transform = Objects.requireNonNull(transform);
        }

        @Override
        public boolean hasNext() {
            try {
                return this.streamingResponseInput.hasNext();
            }
            catch (BadRequestException e) {
                if (this.closingStarted && e.getCause() != null && e.getCause() instanceof ArrayIndexOutOfBoundsException) {
                    return false;
                }
                throw e;
            }
        }

        @Override
        public CompletableFuture<O> next() {
            return this.streamingResponseInput.next().thenCompose(this.transform);
        }

        @Override
        public void close() {
            this.streamingResponseInput.close();
        }
    }

    private static final class AsyncResponseQueue {
        private final ChunkedOutput<ResultOrError> sink;
        private CompletableFuture<Void> tail;
        private volatile boolean sinkClosed = false;

        private AsyncResponseQueue(ChunkedOutputFactory chunkedOutputFactory) {
            this.sink = chunkedOutputFactory.getChunkedOutput();
            this.tail = CompletableFuture.completedFuture(null);
        }

        private void asyncResume(AsyncResponse asyncResponse) {
            asyncResponse.resume((Object)Response.ok(this.sink).build());
        }

        private boolean isClosed() {
            return this.sinkClosed;
        }

        private void push(CompletableFuture<ResultOrError> result) {
            log.debug("Pushing to response queue");
            this.tail = CompletableFuture.allOf(this.tail, result).thenApply(unused -> {
                try {
                    if (this.sinkClosed || this.sink.isClosed()) {
                        this.sinkClosed = true;
                        return null;
                    }
                    ResultOrError res = (ResultOrError)result.join();
                    log.debug("Writing to sink");
                    this.sink.write((Object)res);
                }
                catch (IOException e) {
                    log.error("Error when writing streaming result to response channel.", (Throwable)e);
                }
                return null;
            });
        }

        private void close() {
            this.tail.whenComplete((unused, throwable) -> {
                try {
                    this.sinkClosed = true;
                    this.sink.close();
                }
                catch (IOException e) {
                    log.error("Error when closing response channel.", (Throwable)e);
                }
            });
        }
    }

    private static final class CompositeErrorMapper {
        private final List<ErrorMapper<?>> mappers;
        private final ErrorMapper<Throwable> defaultMapper;

        private CompositeErrorMapper(List<ErrorMapper<?>> mappers, ErrorMapper<Throwable> defaultMapper) {
            this.mappers = Objects.requireNonNull(mappers);
            this.defaultMapper = Objects.requireNonNull(defaultMapper);
        }

        public ErrorResponse toErrorResponse(Throwable exception) {
            for (ErrorMapper<?> mapper : this.mappers) {
                if (!mapper.handles(exception)) continue;
                return mapper.toErrorResponse(exception);
            }
            return this.defaultMapper.toErrorResponse(exception);
        }

        private static final class Builder {
            private final ImmutableList.Builder<ErrorMapper<?>> mappers = ImmutableList.builder();
            private ErrorMapper<Throwable> defaultMapper;

            private Builder() {
            }

            private <T extends Throwable> Builder putMapper(Class<T> mappedType, ExceptionMapper<T> mapper, Function<Response, Integer> errorCode, Function<Response, String> message) {
                this.mappers.add(new ErrorMapper<T>(mappedType, mapper, errorCode, message));
                return this;
            }

            private Builder setDefaultMapper(ExceptionMapper<Throwable> mapper, Function<Response, Integer> errorCode, Function<Response, String> message) {
                this.defaultMapper = new ErrorMapper<Throwable>(Throwable.class, mapper, errorCode, message);
                return this;
            }

            public CompositeErrorMapper build() {
                return new CompositeErrorMapper((List<ErrorMapper<?>>)this.mappers.build(), this.defaultMapper);
            }
        }
    }

    public static abstract class ResultOrError {
        public static <T> ResultHolder<T> result(T result) {
            return new AutoValue_StreamingResponse_ResultHolder<T>(result);
        }

        public static ErrorHolder error(ErrorResponse error) {
            return new AutoValue_StreamingResponse_ErrorHolder(error);
        }
    }

    @AutoValue
    static abstract class ErrorHolder
    extends ResultOrError {
        ErrorHolder() {
        }

        @JsonValue
        abstract ErrorResponse getError();
    }

    @AutoValue
    static abstract class ResultHolder<T>
    extends ResultOrError {
        ResultHolder() {
        }

        @JsonValue
        abstract T getResult();
    }

    private static final class ErrorMapper<T extends Throwable> {
        private final Class<T> errorClass;
        private final ExceptionMapper<T> mapper;
        private final Function<Response, Integer> errorCode;
        private final Function<Response, String> message;

        private ErrorMapper(Class<T> errorClass, ExceptionMapper<T> mapper, Function<Response, Integer> errorCode, Function<Response, String> message) {
            this.errorClass = Objects.requireNonNull(errorClass);
            this.mapper = Objects.requireNonNull(mapper);
            this.errorCode = Objects.requireNonNull(errorCode);
            this.message = Objects.requireNonNull(message);
        }

        private boolean handles(Throwable error) {
            return this.errorClass.isInstance(error);
        }

        private ErrorResponse toErrorResponse(Throwable error) {
            Response response = this.mapper.toResponse(error);
            String originalMessage = this.message.apply(response);
            String messageWithoutSource = originalMessage == null ? "" : originalMessage.split("\\n")[0].trim();
            return ErrorResponse.create(this.errorCode.apply(response), messageWithoutSource);
        }
    }
}

