/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafkarest.resources.v2;

import io.confluent.kafkarest.ConsumerReadCallback;
import io.confluent.kafkarest.Errors;
import io.confluent.kafkarest.KafkaRestContext;
import io.confluent.kafkarest.UriUtils;
import io.confluent.kafkarest.entities.ConsumerRecord;
import io.confluent.kafkarest.entities.TopicPartitionOffset;
import io.confluent.kafkarest.entities.v2.BinaryConsumerRecord;
import io.confluent.kafkarest.entities.v2.CommitOffsetsResponse;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentRequest;
import io.confluent.kafkarest.entities.v2.ConsumerAssignmentResponse;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedRequest;
import io.confluent.kafkarest.entities.v2.ConsumerCommittedResponse;
import io.confluent.kafkarest.entities.v2.ConsumerOffsetCommitRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSeekToRequest;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionRecord;
import io.confluent.kafkarest.entities.v2.ConsumerSubscriptionResponse;
import io.confluent.kafkarest.entities.v2.CreateConsumerInstanceRequest;
import io.confluent.kafkarest.entities.v2.CreateConsumerInstanceResponse;
import io.confluent.kafkarest.entities.v2.JsonConsumerRecord;
import io.confluent.kafkarest.entities.v2.SchemaConsumerRecord;
import io.confluent.kafkarest.extension.ResourceAccesslistFeature;
import io.confluent.kafkarest.v2.BinaryKafkaConsumerState;
import io.confluent.kafkarest.v2.JsonKafkaConsumerState;
import io.confluent.kafkarest.v2.KafkaConsumerManager;
import io.confluent.kafkarest.v2.KafkaConsumerState;
import io.confluent.kafkarest.v2.SchemaKafkaConsumerState;
import io.confluent.rest.annotations.PerformanceMetric;
import java.time.Duration;
import java.util.List;
import java.util.function.Function;
import java.util.stream.Collectors;
import javax.inject.Inject;
import javax.inject.Provider;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import javax.ws.rs.Consumes;
import javax.ws.rs.DELETE;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.GET;
import javax.ws.rs.POST;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Context;
import javax.ws.rs.core.UriInfo;

@Path(value="/consumers")
@Produces(value={"application/vnd.kafka.binary.v2+json; qs=0.1", "application/vnd.kafka.avro.v2+json; qs=0.1", "application/vnd.kafka.json.v2+json; qs=0.1", "application/vnd.kafka.jsonschema.v2+json; qs=0.1", "application/vnd.kafka.protobuf.v2+json; qs=0.1", "application/vnd.kafka.v2+json; qs=0.9"})
@Consumes(value={"application/vnd.kafka.binary.v2+json", "application/vnd.kafka.avro.v2+json", "application/vnd.kafka.json.v2+json", "application/vnd.kafka.jsonschema.v2+json", "application/vnd.kafka.protobuf.v2+json", "application/vnd.kafka.v2+json"})
@ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.*")
public final class ConsumersResource {
    private final Provider<KafkaRestContext> context;

    @Inject
    public ConsumersResource(Provider<KafkaRestContext> context) {
        this.context = context;
    }

    @POST
    @Valid
    @Path(value="/{group}")
    @PerformanceMetric(value="consumer.create+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.create")
    public CreateConsumerInstanceResponse createGroup(@Context UriInfo uriInfo, @PathParam(value="group") String group, @Valid CreateConsumerInstanceRequest config) {
        if (config == null) {
            config = CreateConsumerInstanceRequest.PROTOTYPE;
        }
        String instanceId = ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().createConsumer(group, config.toConsumerInstanceConfig());
        String instanceBaseUri = UriUtils.absoluteUri(((KafkaRestContext)this.context.get()).getConfig(), uriInfo, "consumers", group, "instances", instanceId);
        return new CreateConsumerInstanceResponse(instanceId, instanceBaseUri);
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}")
    @PerformanceMetric(value="consumer.delete+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.delete")
    public void deleteGroup(@PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().deleteConsumer(group, instance);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.subscribe+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.subscribe")
    public void subscribe(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSubscriptionRecord subscription) {
        try {
            ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().subscribe(group, instance, subscription);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @GET
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.subscription+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.get-subscription")
    public ConsumerSubscriptionResponse subscription(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        return ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().subscription(group, instance);
    }

    @DELETE
    @Path(value="/{group}/instances/{instance}/subscription")
    @PerformanceMetric(value="consumer.unsubscribe+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.unsubscribe")
    public void unsubscribe(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().unsubscribe(group, instance);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-binary+v2")
    @Produces(value={"application/vnd.kafka.binary.v2+json", "application/vnd.kafka.v2+json; qs=0.9"})
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.consume-binary")
    public void readRecordBinary(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeoutMs, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, Duration.ofMillis(timeoutMs), maxBytes, BinaryKafkaConsumerState.class, BinaryConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-json+v2")
    @Produces(value={"application/vnd.kafka.json.v2+json; qs=0.1"})
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.consume-json")
    public void readRecordJson(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeoutMs, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, Duration.ofMillis(timeoutMs), maxBytes, JsonKafkaConsumerState.class, JsonConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-avro+v2")
    @Produces(value={"application/vnd.kafka.avro.v2+json; qs=0.1"})
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.consume-avro")
    public void readRecordAvro(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeoutMs, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, Duration.ofMillis(timeoutMs), maxBytes, SchemaKafkaConsumerState.class, SchemaConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-jsonschema+v2")
    @Produces(value={"application/vnd.kafka.jsonschema.v2+json; qs=0.1"})
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.consume-json-schema")
    public void readRecordJsonSchema(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeoutMs, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, Duration.ofMillis(timeoutMs), maxBytes, SchemaKafkaConsumerState.class, SchemaConsumerRecord::fromConsumerRecord);
    }

    @GET
    @Path(value="/{group}/instances/{instance}/records")
    @PerformanceMetric(value="consumer.records.read-protobuf+v2")
    @Produces(value={"application/vnd.kafka.protobuf.v2+json; qs=0.1"})
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.consume-protobuf")
    public void readRecordProtobuf(@Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="timeout") @DefaultValue(value="-1") long timeoutMs, @QueryParam(value="max_bytes") @DefaultValue(value="-1") long maxBytes) {
        this.readRecords(asyncResponse, group, instance, Duration.ofMillis(timeoutMs), maxBytes, SchemaKafkaConsumerState.class, SchemaConsumerRecord::fromConsumerRecord);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.commit-offsets+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.commit-offsets")
    public void commitOffsets(final @Suspended AsyncResponse asyncResponse, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @QueryParam(value="async") @DefaultValue(value="false") String async, @Valid ConsumerOffsetCommitRequest offsetCommitRequest) {
        ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().commitOffsets(group, instance, async, offsetCommitRequest, new KafkaConsumerManager.CommitCallback(){

            @Override
            public void onCompletion(List<TopicPartitionOffset> offsets, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume((Object)CommitOffsetsResponse.fromOffsets(offsets));
                }
            }
        });
    }

    @GET
    @Path(value="/{group}/instances/{instance}/offsets")
    @PerformanceMetric(value="consumer.committed-offsets+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.get-committed-offsets")
    public ConsumerCommittedResponse committedOffsets(@PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid ConsumerCommittedRequest request) {
        if (request == null) {
            throw Errors.partitionNotFoundException();
        }
        return ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().committed(group, instance, request);
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions/beginning")
    @PerformanceMetric(value="consumer.seek-to-beginning+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.seek-to-beginning")
    public void seekToBeginning(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSeekToRequest seekToRequest) {
        try {
            ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().seekToBeginning(group, instance, seekToRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions/end")
    @PerformanceMetric(value="consumer.seek-to-end+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.seek-to-end")
    public void seekToEnd(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSeekToRequest seekToRequest) {
        try {
            ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().seekToEnd(group, instance, seekToRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @POST
    @Path(value="/{group}/instances/{instance}/positions")
    @PerformanceMetric(value="consumer.seek-to-offset+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.seek-to-offset")
    public void seekToOffset(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerSeekRequest request) {
        try {
            ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().seek(group, instance, request);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @POST
    @Path(value="/{group}/instances/{instance}/assignments")
    @PerformanceMetric(value="consumer.assign+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.assign")
    public void assign(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance, @Valid @NotNull ConsumerAssignmentRequest assignmentRequest) {
        try {
            ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().assign(group, instance, assignmentRequest);
        }
        catch (IllegalStateException e) {
            throw Errors.illegalStateException(e);
        }
    }

    @GET
    @Path(value="/{group}/instances/{instance}/assignments")
    @PerformanceMetric(value="consumer.assignment+v2")
    @ResourceAccesslistFeature.ResourceName(value="api.v2.consumers.get-assignments")
    public ConsumerAssignmentResponse assignment(@Context UriInfo uriInfo, @PathParam(value="group") String group, @PathParam(value="instance") String instance) {
        return ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().assignment(group, instance);
    }

    private <KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT> void readRecords(final @Suspended AsyncResponse asyncResponse, String group, String instance, Duration timeout, long maxBytes, Class<? extends KafkaConsumerState<KafkaKeyT, KafkaValueT, ClientKeyT, ClientValueT>> consumerStateType, final Function<ConsumerRecord<ClientKeyT, ClientValueT>, ?> toJsonWrapper) {
        maxBytes = maxBytes <= 0L ? Long.MAX_VALUE : maxBytes;
        ((KafkaRestContext)this.context.get()).getKafkaConsumerManager().readRecords(group, instance, consumerStateType, timeout, maxBytes, new ConsumerReadCallback<ClientKeyT, ClientValueT>(){

            @Override
            public void onCompletion(List<ConsumerRecord<ClientKeyT, ClientValueT>> records, Exception e) {
                if (e != null) {
                    asyncResponse.resume((Throwable)e);
                } else {
                    asyncResponse.resume(records.stream().map(toJsonWrapper).collect(Collectors.toList()));
                }
            }
        });
    }
}

