/*
 * Decompiled with CFR 0.152.
 */
package org.apache.pulsar.broker.admin.v2;

import com.google.common.collect.Lists;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import io.swagger.annotations.ApiResponse;
import io.swagger.annotations.ApiResponses;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import javax.ws.rs.DefaultValue;
import javax.ws.rs.Encoded;
import javax.ws.rs.GET;
import javax.ws.rs.PUT;
import javax.ws.rs.Path;
import javax.ws.rs.PathParam;
import javax.ws.rs.Produces;
import javax.ws.rs.QueryParam;
import javax.ws.rs.WebApplicationException;
import javax.ws.rs.container.AsyncResponse;
import javax.ws.rs.container.Suspended;
import javax.ws.rs.core.Response;
import org.apache.pulsar.broker.PulsarServerException;
import org.apache.pulsar.broker.admin.v2.PersistentTopics;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.nonpersistent.NonPersistentTopic;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.common.naming.NamespaceBundle;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.NonPersistentTopicStats;
import org.apache.pulsar.common.policies.data.PersistentTopicInternalStats;
import org.apache.pulsar.common.policies.data.Policies;
import org.apache.pulsar.common.util.FutureUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Path(value="/non-persistent")
@Produces(value={"application/json"})
@Api(value="/non-persistent", description="Non-Persistent topic admin apis", tags={"non-persistent topic"})
public class NonPersistentTopics
extends PersistentTopics {
    private static final Logger log = LoggerFactory.getLogger(NonPersistentTopics.class);

    @Override
    @GET
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Get partitioned topic metadata.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate cluster configuration")})
    public PartitionedTopicMetadata getPartitionedMetadata(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Is authentication required to perform this operation") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Is check configuration required to automatically create topic") @QueryParam(value="checkAllowAutoCreation") @DefaultValue(value="false") boolean checkAllowAutoCreation) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        return this.getPartitionedTopicMetadata(this.topicName, authoritative, checkAllowAutoCreation);
    }

    @GET
    @Path(value="{tenant}/{namespace}/{topic}/stats")
    @ApiOperation(value="Get the stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public NonPersistentTopicStats getStats(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Is authentication required to perform this operation") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative, @ApiParam(value="Is return precise backlog or imprecise backlog") @QueryParam(value="getPreciseBacklog") @DefaultValue(value="false") boolean getPreciseBacklog) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateAdminOperationOnTopic(this.topicName, authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        return ((NonPersistentTopic)topic).getStats(getPreciseBacklog);
    }

    @Override
    @GET
    @Path(value="{tenant}/{namespace}/{topic}/internalStats")
    @ApiOperation(value="Get the internal stats for the topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error")})
    public PersistentTopicInternalStats getInternalStats(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Is authentication required to perform this operation") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        this.validateTopicName(tenant, namespace, encodedTopic);
        this.validateAdminOperationOnTopic(this.topicName, authoritative);
        Topic topic = this.getTopicReference(this.topicName);
        return topic.getInternalStats();
    }

    @Override
    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/partitions")
    @ApiOperation(value="Create a partitioned topic.", notes="It needs to be called before creating a producer on a partitioned topic.")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace does not exist"), @ApiResponse(code=406, message="The number of partitions should be more than 0 and less than or equal to maxNumPartitionsPerPartitionedTopic"), @ApiResponse(code=409, message="Partitioned topic already exists"), @ApiResponse(code=412, message="Failed Reason : Name is invalid or Namespace does not have any clusters configured"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void createPartitionedTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="The number of partitions for the topic", required=true, type="int", defaultValue="0") int numPartitions) {
        try {
            this.validateGlobalNamespaceOwnership(tenant, namespace);
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.validateAdminAccessForTenant(this.topicName.getTenant());
            this.internalCreatePartitionedTopic(asyncResponse, numPartitions);
        }
        catch (Exception e) {
            log.error("[{}] Failed to create partitioned topic {}", new Object[]{this.clientAppId(), this.topicName, e});
            this.resumeAsyncResponseExceptionally(asyncResponse, e);
        }
    }

    @Override
    @PUT
    @Path(value="/{tenant}/{namespace}/{topic}/unload")
    @ApiOperation(value="Unload a topic")
    @ApiResponses(value={@ApiResponse(code=307, message="Current broker doesn't serve the namespace of this topic"), @ApiResponse(code=401, message="This operation requires super-user access"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace/topic does not exist"), @ApiResponse(code=412, message="Topic name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void unloadTopic(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Specify topic name", required=true) @PathParam(value="topic") @Encoded String encodedTopic, @ApiParam(value="Is authentication required to perform this operation") @QueryParam(value="authoritative") @DefaultValue(value="false") boolean authoritative) {
        try {
            this.validateTopicName(tenant, namespace, encodedTopic);
            this.internalUnloadTopic(asyncResponse, authoritative);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
        }
    }

    @Override
    @GET
    @Path(value="/{tenant}/{namespace}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="The tenant/namespace does not exist"), @ApiResponse(code=412, message="Namespace name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public void getList(@Suspended AsyncResponse asyncResponse, @ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace) {
        Policies policies = null;
        try {
            this.validateNamespaceName(tenant, namespace);
            if (log.isDebugEnabled()) {
                log.debug("[{}] list of topics on namespace {}", (Object)this.clientAppId(), (Object)this.namespaceName);
            }
            this.validateAdminAccessForTenant(tenant);
            policies = this.getNamespacePolicies(this.namespaceName);
            this.validateGlobalNamespaceOwnership(this.namespaceName);
        }
        catch (WebApplicationException wae) {
            asyncResponse.resume((Throwable)wae);
            return;
        }
        catch (Exception e) {
            asyncResponse.resume((Throwable)((Object)new RestException(e)));
            return;
        }
        ArrayList futures = Lists.newArrayList();
        List boundaries = policies.bundles.getBoundaries();
        for (int i = 0; i < boundaries.size() - 1; ++i) {
            String bundle = String.format("%s_%s", boundaries.get(i), boundaries.get(i + 1));
            try {
                futures.add(this.pulsar().getAdminClient().topics().getListInBundleAsync(this.namespaceName.toString(), bundle));
                continue;
            }
            catch (PulsarServerException e) {
                log.error("[{}] Failed to get list of topics under namespace {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundle, e});
                asyncResponse.resume((Throwable)((Object)new RestException(e)));
                return;
            }
        }
        ArrayList topics = Lists.newArrayList();
        FutureUtil.waitForAll((List)futures).handle((result, exception) -> {
            for (int i = 0; i < futures.size(); ++i) {
                try {
                    if (!((CompletableFuture)futures.get(i)).isDone() || ((CompletableFuture)futures.get(i)).get() == null) continue;
                    topics.addAll((Collection)((CompletableFuture)futures.get(i)).get());
                    continue;
                }
                catch (InterruptedException | ExecutionException e) {
                    log.error("[{}] Failed to get list of topics under namespace {}", new Object[]{this.clientAppId(), this.namespaceName, e});
                    asyncResponse.resume((Throwable)((Object)new RestException(e instanceof ExecutionException ? e.getCause() : e)));
                    return null;
                }
            }
            List nonPersistentTopics = topics.stream().filter(name -> !TopicName.get((String)name).isPersistent()).collect(Collectors.toList());
            asyncResponse.resume(nonPersistentTopics);
            return null;
        });
    }

    @GET
    @Path(value="/{tenant}/{namespace}/{bundle}")
    @ApiOperation(value="Get the list of non-persistent topics under a namespace bundle.", response=String.class, responseContainer="List")
    @ApiResponses(value={@ApiResponse(code=401, message="Don't have permission to manage resources on this tenant"), @ApiResponse(code=403, message="Don't have admin permission"), @ApiResponse(code=404, message="Namespace doesn't exist"), @ApiResponse(code=412, message="Namespace name is not valid"), @ApiResponse(code=500, message="Internal server error"), @ApiResponse(code=503, message="Failed to validate global cluster configuration")})
    public List<String> getListFromBundle(@ApiParam(value="Specify the tenant", required=true) @PathParam(value="tenant") String tenant, @ApiParam(value="Specify the namespace", required=true) @PathParam(value="namespace") String namespace, @ApiParam(value="Bundle range of a topic", required=true) @PathParam(value="bundle") String bundleRange) {
        this.validateNamespaceName(tenant, namespace);
        if (log.isDebugEnabled()) {
            log.debug("[{}] list of topics on namespace bundle {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange});
        }
        this.validateAdminAccessForTenant(tenant);
        Policies policies = this.getNamespacePolicies(this.namespaceName);
        this.validateGlobalNamespaceOwnership(this.namespaceName);
        if (!this.isBundleOwnedByAnyBroker(this.namespaceName, policies.bundles, bundleRange)) {
            log.info("[{}] Namespace bundle is not owned by any broker {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange});
            return null;
        }
        NamespaceBundle nsBundle = this.validateNamespaceBundleOwnership(this.namespaceName, policies.bundles, bundleRange, true, true);
        try {
            ArrayList topicList = Lists.newArrayList();
            this.pulsar().getBrokerService().forEachTopic(topic -> {
                TopicName topicName = TopicName.get((String)topic.getName());
                if (nsBundle.includes(topicName)) {
                    topicList.add(topic.getName());
                }
            });
            return topicList;
        }
        catch (Exception e) {
            log.error("[{}] Failed to unload namespace bundle {}/{}", new Object[]{this.clientAppId(), this.namespaceName, bundleRange, e});
            throw new RestException(e);
        }
    }

    protected void validateAdminOperationOnTopic(TopicName topicName, boolean authoritative) {
        this.validateAdminAccessForTenant(topicName.getTenant());
        this.validateTopicOwnership(topicName, authoritative);
    }

    private Topic getTopicReference(TopicName topicName) {
        return this.pulsar().getBrokerService().getTopicIfExists(topicName.toString()).join().orElseThrow(() -> new RestException(Response.Status.NOT_FOUND, "Topic not found"));
    }
}

