package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.id.IdGenerator;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:io/confluent/kafka/schemaregistry/storage/KafkaStoreMessageHandler.class */
public class KafkaStoreMessageHandler implements StoreUpdateHandler<SchemaRegistryKey, SchemaRegistryValue> {
    private static final Logger log = LoggerFactory.getLogger(KafkaStoreMessageHandler.class);
    private final KafkaSchemaRegistry schemaRegistry;
    private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    private final ExecutorService tombstoneExecutor = Executors.newSingleThreadExecutor();
    private IdGenerator idGenerator;

    public KafkaStoreMessageHandler(KafkaSchemaRegistry kafkaSchemaRegistry, LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache, IdGenerator idGenerator) {
        this.schemaRegistry = kafkaSchemaRegistry;
        this.lookupCache = lookupCache;
        this.idGenerator = idGenerator;
    }

    @Override // io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler
    public boolean validateUpdate(SchemaRegistryKey schemaRegistryKey, SchemaRegistryValue schemaRegistryValue) {
        SchemaValue schemaValue;
        SchemaKey schemaKeyById;
        if (schemaRegistryKey.getKeyType() != SchemaRegistryKeyType.SCHEMA || (schemaValue = (SchemaValue) schemaRegistryValue) == null || (schemaKeyById = this.lookupCache.schemaKeyById(schemaValue.getId())) == null) {
            return true;
        }
        try {
            SchemaValue schemaValue2 = (SchemaValue) this.lookupCache.get(schemaKeyById);
            if (schemaValue2 == null || schemaValue2.getSchema().equals(schemaValue.getSchema())) {
                return true;
            }
            log.error("Found a schema with duplicate ID {}.  This schema will not be registered since a schema already exists with this ID.", schemaValue.getId());
            return false;
        } catch (StoreException e) {
            log.error("Error while retrieving schema", e);
            return false;
        }
    }

    @Override // io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler
    public void handleUpdate(SchemaRegistryKey schemaRegistryKey, SchemaRegistryValue schemaRegistryValue) {
        if (schemaRegistryKey.getKeyType() == SchemaRegistryKeyType.SCHEMA) {
            handleSchemaUpdate((SchemaKey) schemaRegistryKey, (SchemaValue) schemaRegistryValue);
            return;
        }
        if (schemaRegistryValue == null) {
            return;
        }
        if (schemaRegistryKey.getKeyType() == SchemaRegistryKeyType.DELETE_SUBJECT) {
            handleDeleteSubject((DeleteSubjectValue) schemaRegistryValue);
        } else if (schemaRegistryKey.getKeyType() == SchemaRegistryKeyType.CLEAR_SUBJECT) {
            handleClearSubject((ClearSubjectValue) schemaRegistryValue);
        }
    }

    private void handleDeleteSubject(DeleteSubjectValue deleteSubjectValue) {
        String subject = deleteSubjectValue.getSubject();
        Integer version = deleteSubjectValue.getVersion();
        for (int i = 1; i <= version.intValue(); i++) {
            try {
                SchemaKey schemaKey = new SchemaKey(subject, i);
                SchemaValue schemaValue = (SchemaValue) this.lookupCache.get(schemaKey);
                if (schemaValue != null) {
                    schemaValue.setDeleted(true);
                    this.lookupCache.put(schemaKey, schemaValue);
                    this.lookupCache.schemaDeleted(schemaKey, schemaValue);
                }
            } catch (StoreException e) {
                log.error("Failed to delete subject {} in the local cache", subject);
            }
        }
    }

    private void handleClearSubject(ClearSubjectValue clearSubjectValue) {
        String subject = clearSubjectValue.getSubject();
        try {
            this.lookupCache.clearSubjects(subject);
        } catch (StoreException e) {
            log.error("Failed to clear subject {} in the local cache", subject);
        }
    }

    private void handleSchemaUpdate(SchemaKey schemaKey, SchemaValue schemaValue) {
        if (schemaValue == null) {
            this.lookupCache.schemaTombstoned(schemaKey);
            return;
        }
        this.idGenerator.schemaRegistered(schemaKey, schemaValue);
        if (schemaValue.isDeleted()) {
            this.lookupCache.schemaDeleted(schemaKey, schemaValue);
        } else {
            this.lookupCache.schemaRegistered(schemaKey, schemaValue);
            this.lookupCache.deletedSchemaKeys(schemaValue).stream().filter(schemaKey2 -> {
                return schemaKey2.getSubject().equals(schemaValue.getSubject()) && schemaKey2.getVersion() != schemaValue.getVersion().intValue();
            }).forEach(this::tombstoneSchemaKey);
        }
    }

    private void tombstoneSchemaKey(SchemaKey schemaKey) {
        this.tombstoneExecutor.execute(() -> {
            try {
                this.schemaRegistry.getKafkaStore().waitForInit();
                this.schemaRegistry.getKafkaStore().delete(schemaKey);
                log.debug("Tombstoned {}", schemaKey);
            } catch (StoreException e) {
                log.error("Failed to tombstone {}", schemaKey, e);
            } catch (InterruptedException e2) {
                log.error("Interrupted while waiting for the tombstone thread to be initialized ", e2);
            }
        });
    }
}
