/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.response;

import com.fasterxml.jackson.annotation.JsonValue;
import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.MappingIterator;
import com.fasterxml.jackson.jaxrs.base.JsonMappingExceptionMapper;
import com.fasterxml.jackson.jaxrs.base.JsonParseExceptionMapper;
import com.google.auto.value.AutoValue;
import com.google.common.collect.ImmutableList;
import io.confluent.kafkarest.common.CompletableFutures;
import io.confluent.kafkarest.exceptions.StatusCodeException;
import io.confluent.kafkarest.exceptions.v3.ErrorResponse;
import io.confluent.kafkarest.exceptions.v3.V3ExceptionMapper;
import io.confluent.kafkarest.response.AutoValue_StreamingResponse_ErrorHolder;
import io.confluent.kafkarest.response.AutoValue_StreamingResponse_ResultHolder;
import io.confluent.rest.entities.ErrorMessage;
import io.confluent.rest.exceptions.KafkaExceptionMapper;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import javax.annotation.Nullable;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.core.Response;
import javax.ws.rs.ext.ExceptionMapper;
import org.glassfish.jersey.server.ChunkedOutput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class StreamingResponse<T> {
    private static final Logger log = LoggerFactory.getLogger(StreamingResponse.class);
    private static final CompositeErrorMapper EXCEPTION_MAPPER = CompositeErrorMapper.Builder.access$200(CompositeErrorMapper.Builder.access$100(CompositeErrorMapper.Builder.access$100(CompositeErrorMapper.Builder.access$100(new CompositeErrorMapper.Builder(), JsonMappingException.class, (ExceptionMapper)new JsonMappingExceptionMapper(), response -> Response.Status.BAD_REQUEST.getStatusCode(), response -> (String)response.getEntity()), JsonParseException.class, (ExceptionMapper)new JsonParseExceptionMapper(), response -> Response.Status.BAD_REQUEST.getStatusCode(), response -> (String)response.getEntity()), StatusCodeException.class, new V3ExceptionMapper(), response -> ((ErrorResponse)response.getEntity()).getErrorCode(), response -> ((ErrorResponse)response.getEntity()).getMessage()), (ExceptionMapper)new KafkaExceptionMapper(null), response -> ((ErrorMessage)response.getEntity()).getErrorCode(), response -> ((ErrorMessage)response.getEntity()).getMessage()).build();

    StreamingResponse() {
    }

    public static <T> StreamingResponse<T> from(MappingIterator<T> input) {
        return new InputStreamingResponse(input);
    }

    public final <O> StreamingResponse<O> compose(Function<? super T, ? extends CompletableFuture<O>> transform) {
        return new ComposingStreamingResponse(this, transform);
    }

    public final void resume(AsyncResponse asyncResponse) {
        AsyncResponseQueue responseQueue = new AsyncResponseQueue();
        responseQueue.asyncResume(asyncResponse);
        while (this.hasNext()) {
            responseQueue.push((CompletableFuture)this.next().handle(this::handleNext));
        }
        responseQueue.close();
    }

    private ResultOrError handleNext(T result, @Nullable Throwable error) {
        if (error == null) {
            return ResultOrError.result(result);
        }
        log.debug("Error processing streaming operation.", error);
        return ResultOrError.error(StreamingResponse.EXCEPTION_MAPPER.toErrorResponse(error.getCause()));
    }

    abstract boolean hasNext();

    abstract CompletableFuture<T> next();

    private static final class CompositeErrorMapper {
        private final List<ErrorMapper<?>> mappers;
        private final ErrorMapper<Throwable> defaultMapper;

        private CompositeErrorMapper(List<ErrorMapper<?>> mappers, ErrorMapper<Throwable> defaultMapper) {
            this.mappers = Objects.requireNonNull(mappers);
            this.defaultMapper = Objects.requireNonNull(defaultMapper);
        }

        public ErrorResponse toErrorResponse(Throwable exception) {
            for (ErrorMapper<?> mapper : this.mappers) {
                if (!((ErrorMapper)mapper).handles(exception)) continue;
                return ((ErrorMapper)mapper).toErrorResponse(exception);
            }
            return ((ErrorMapper)this.defaultMapper).toErrorResponse(exception);
        }

        private static final class Builder {
            private final ImmutableList.Builder<ErrorMapper<?>> mappers = ImmutableList.builder();
            private ErrorMapper<Throwable> defaultMapper;

            private Builder() {
            }

            private <T extends Throwable> Builder putMapper(Class<T> mappedType, ExceptionMapper<T> mapper, Function<Response, Integer> errorCode, Function<Response, String> message) {
                this.mappers.add(new ErrorMapper(mappedType, mapper, errorCode, message));
                return this;
            }

            private Builder setDefaultMapper(ExceptionMapper<Throwable> mapper, Function<Response, Integer> errorCode, Function<Response, String> message) {
                this.defaultMapper = new ErrorMapper(Throwable.class, mapper, errorCode, message);
                return this;
            }

            public CompositeErrorMapper build() {
                return new CompositeErrorMapper((List)this.mappers.build(), this.defaultMapper);
            }

            static /* synthetic */ Builder access$100(Builder x0, Class x1, ExceptionMapper x2, Function x3, Function x4) {
                return x0.putMapper(x1, x2, x3, x4);
            }

            static /* synthetic */ Builder access$200(Builder x0, ExceptionMapper x1, Function x2, Function x3) {
                return x0.setDefaultMapper((ExceptionMapper<Throwable>)x1, x2, x3);
            }
        }
    }

    private static final class ErrorMapper<T extends Throwable> {
        private final Class<T> errorClass;
        private final ExceptionMapper<T> mapper;
        private final Function<Response, Integer> errorCode;
        private final Function<Response, String> message;

        private ErrorMapper(Class<T> errorClass, ExceptionMapper<T> mapper, Function<Response, Integer> errorCode, Function<Response, String> message) {
            this.errorClass = Objects.requireNonNull(errorClass);
            this.mapper = Objects.requireNonNull(mapper);
            this.errorCode = Objects.requireNonNull(errorCode);
            this.message = Objects.requireNonNull(message);
        }

        private boolean handles(Throwable error) {
            return this.errorClass.isInstance(error);
        }

        private ErrorResponse toErrorResponse(Throwable error) {
            Response response = this.mapper.toResponse(error);
            return ErrorResponse.create(this.errorCode.apply(response), this.message.apply(response));
        }
    }

    @AutoValue
    static abstract class ErrorHolder
    extends ResultOrError {
        ErrorHolder() {
        }

        @JsonValue
        abstract ErrorResponse getError();
    }

    @AutoValue
    static abstract class ResultHolder<T>
    extends ResultOrError {
        ResultHolder() {
        }

        @JsonValue
        abstract T getResult();
    }

    private static abstract class ResultOrError {
        private ResultOrError() {
        }

        private static <T> ResultHolder<T> result(T result) {
            return new AutoValue_StreamingResponse_ResultHolder<T>(result);
        }

        private static ErrorHolder error(ErrorResponse error) {
            return new AutoValue_StreamingResponse_ErrorHolder(error);
        }
    }

    private static final class AsyncResponseQueue {
        private static final String CHUNK_SEPARATOR = "\r\n";
        private final ChunkedOutput<ResultOrError> sink = new ChunkedOutput(ResultOrError.class, "\r\n");
        private CompletableFuture<Void> tail = CompletableFuture.completedFuture(null);

        private AsyncResponseQueue() {
        }

        private void asyncResume(AsyncResponse asyncResponse) {
            asyncResponse.resume((Object)Response.ok(this.sink).build());
        }

        private void push(CompletableFuture<ResultOrError> result) {
            this.tail = CompletableFuture.allOf(this.tail, result).thenApply(unused -> {
                try {
                    this.sink.write(result.join());
                }
                catch (IOException e) {
                    log.error("Error when writing streaming result to response channel.", (Throwable)e);
                }
                return null;
            });
        }

        private void close() {
            this.tail.whenComplete((unused, throwable) -> {
                try {
                    this.sink.close();
                }
                catch (IOException e) {
                    log.error("Error when closing response channel.", (Throwable)e);
                }
            });
        }
    }

    private static final class ComposingStreamingResponse<I, O>
    extends StreamingResponse<O> {
        private final StreamingResponse<I> input;
        private final Function<? super I, ? extends CompletableFuture<O>> transform;

        private ComposingStreamingResponse(StreamingResponse<I> input, Function<? super I, ? extends CompletableFuture<O>> transform) {
            this.input = Objects.requireNonNull(input);
            this.transform = Objects.requireNonNull(transform);
        }

        @Override
        public boolean hasNext() {
            return this.input.hasNext();
        }

        @Override
        public CompletableFuture<O> next() {
            return this.input.next().thenCompose(this.transform);
        }
    }

    private static final class InputStreamingResponse<T>
    extends StreamingResponse<T> {
        private final MappingIterator<T> input;

        private InputStreamingResponse(MappingIterator<T> input) {
            this.input = Objects.requireNonNull(input);
        }

        @Override
        public boolean hasNext() {
            return this.input.hasNext();
        }

        @Override
        public CompletableFuture<T> next() {
            try {
                return CompletableFuture.completedFuture(this.input.nextValue());
            }
            catch (Throwable e) {
                return CompletableFutures.failedFuture(e);
            }
        }
    }
}

