package org.apache.seatunnel.connectors.seatunnel.kafka.catalog;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartitionInfo;
import org.apache.seatunnel.api.table.catalog.Catalog;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.exception.CatalogException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.DatabaseNotExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableAlreadyExistException;
import org.apache.seatunnel.api.table.catalog.exception.TableNotExistException;
import org.apache.seatunnel.connectors.seatunnel.kafka.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/kafka/catalog/KafkaCatalog.class */
public class KafkaCatalog implements Catalog {
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaCatalog.class);
    private final String catalogName;
    private final String bootstrapServers;
    private final String defaultTopic;
    private AdminClient adminClient;

    public KafkaCatalog(String str, String str2, String str3) {
        this.catalogName = (String) Preconditions.checkNotNull(str, "catalogName cannot be null");
        this.bootstrapServers = (String) Preconditions.checkNotNull(str3, "bootstrapServers cannot be null");
        this.defaultTopic = (String) Preconditions.checkNotNull(str2, "defaultTopic cannot be null");
    }

    public void open() throws CatalogException {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", this.bootstrapServers);
        this.adminClient = AdminClient.create(properties);
        try {
            TopicDescription topicDescription = getTopicDescription(this.defaultTopic);
            if (topicDescription == null) {
                throw new DatabaseNotExistException(this.catalogName, this.defaultTopic);
            }
            LOGGER.info("Catalog {} is established connection to {}, the default database is {}", new Object[]{this.catalogName, this.bootstrapServers, topicDescription.name()});
        } catch (DatabaseNotExistException e) {
            throw e;
        } catch (Exception e2) {
            throw new CatalogException(String.format("Catalog : %s establish connection to %s error", this.catalogName, this.bootstrapServers), e2);
        }
    }

    public void close() throws CatalogException {
        this.adminClient.close();
    }

    public String getDefaultDatabase() throws CatalogException {
        return this.defaultTopic;
    }

    public boolean databaseExists(String str) throws CatalogException {
        Preconditions.checkNotNull(str, "databaseName cannot be null");
        try {
            return getTopicDescription(str) != null;
        } catch (Exception e) {
            throw new CatalogException(String.format("Catalog : %s check database : %s exists error", this.catalogName, str), e);
        }
    }

    public List<String> listDatabases() throws CatalogException {
        try {
            return Lists.newArrayList(this.adminClient.listTopics().names().get());
        } catch (InterruptedException | ExecutionException e) {
            throw new CatalogException(String.format("Listing database in catalog %s error", this.catalogName), e);
        }
    }

    public List<String> listTables(String str) throws CatalogException, DatabaseNotExistException {
        if (databaseExists(str)) {
            return Lists.newArrayList(new String[]{str});
        }
        throw new DatabaseNotExistException(this.catalogName, str);
    }

    public boolean tableExists(TablePath tablePath) throws CatalogException {
        Preconditions.checkNotNull(tablePath, "tablePath cannot be null");
        return databaseExists(tablePath.getDatabaseName());
    }

    public CatalogTable getTable(TablePath tablePath) throws CatalogException, TableNotExistException {
        Preconditions.checkNotNull(tablePath, "tablePath cannot be null");
        try {
            TopicDescription topicDescription = getTopicDescription(tablePath.getTableName());
            if (topicDescription == null) {
                throw new TableNotExistException(this.catalogName, tablePath);
            }
            return CatalogTable.of(TableIdentifier.of(this.catalogName, tablePath.getDatabaseName(), tablePath.getTableName()), TableSchema.builder().build(), buildConnectorOptions(topicDescription), Collections.emptyList(), "");
        } catch (InterruptedException | ExecutionException e) {
            throw new CatalogException(String.format("Catalog : %s get table : %s error", this.catalogName, tablePath), e);
        }
    }

    public void createTable(TablePath tablePath, CatalogTable catalogTable, boolean z) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        Preconditions.checkNotNull(tablePath, "tablePath cannot be null");
        if (tableExists(tablePath)) {
            throw new TableAlreadyExistException(this.catalogName, tablePath);
        }
        Map options = catalogTable.getOptions();
        try {
            this.adminClient.createTopics(Lists.newArrayList(new NewTopic[]{new NewTopic(tablePath.getTableName(), Integer.parseInt((String) options.get(Config.PARTITION.key())), Short.parseShort((String) options.get(Config.REPLICATION_FACTOR)))})).all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new CatalogException(String.format("Catalog : %s create table : %s error", this.catalogName, tablePath.getFullName()), e);
        }
    }

    public void dropTable(TablePath tablePath, boolean z) throws TableNotExistException, CatalogException {
        if (!tableExists(tablePath)) {
            throw new TableNotExistException(this.catalogName, tablePath);
        }
        try {
            this.adminClient.deleteTopics(Lists.newArrayList(new String[]{tablePath.getTableName()})).all().get();
        } catch (InterruptedException | ExecutionException e) {
            throw new CatalogException(String.format("Catalog : %s drop table : %s error", this.catalogName, tablePath.getFullName()), e);
        }
    }

    public void createDatabase(TablePath tablePath, boolean z) throws DatabaseAlreadyExistException, CatalogException {
        throw new UnsupportedOperationException("Kafka catalog does not support create database");
    }

    public void dropDatabase(TablePath tablePath, boolean z) throws DatabaseNotExistException, CatalogException {
        dropTable(tablePath, z);
    }

    private TopicDescription getTopicDescription(String str) throws ExecutionException, InterruptedException {
        return this.adminClient.describeTopics(Lists.newArrayList(new String[]{str})).topicNameValues().get(str).get();
    }

    private Map<String, String> buildConnectorOptions(TopicDescription topicDescription) {
        String name = topicDescription.name();
        List<TopicPartitionInfo> partitions = topicDescription.partitions();
        List<Node> replicas = partitions.get(0).replicas();
        HashMap hashMap = new HashMap();
        hashMap.put(Config.BOOTSTRAP_SERVERS.key(), this.bootstrapServers);
        hashMap.put(Config.TOPIC.key(), name);
        hashMap.put(Config.PARTITION.key(), String.valueOf(partitions.size()));
        hashMap.put(Config.REPLICATION_FACTOR, String.valueOf(replicas.size()));
        return hashMap;
    }
}
