/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import io.confluent.kafka.schemaregistry.id.IdGenerator;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectValue;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectValue;
import io.confluent.kafka.schemaregistry.storage.KafkaSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.LookupCache;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKeyType;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.StoreUpdateHandler;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaStoreMessageHandler
implements StoreUpdateHandler<SchemaRegistryKey, SchemaRegistryValue> {
    private static final Logger log = LoggerFactory.getLogger(KafkaStoreMessageHandler.class);
    private final KafkaSchemaRegistry schemaRegistry;
    private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    private final ExecutorService tombstoneExecutor;
    private IdGenerator idGenerator;

    public KafkaStoreMessageHandler(KafkaSchemaRegistry schemaRegistry, LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache, IdGenerator idGenerator) {
        this.schemaRegistry = schemaRegistry;
        this.lookupCache = lookupCache;
        this.idGenerator = idGenerator;
        this.tombstoneExecutor = Executors.newSingleThreadExecutor();
    }

    @Override
    public boolean validateUpdate(SchemaRegistryKey key, SchemaRegistryValue value) {
        SchemaKey oldKey;
        SchemaValue schemaObj;
        if (key.getKeyType() == SchemaRegistryKeyType.SCHEMA && (schemaObj = (SchemaValue)value) != null && (oldKey = this.lookupCache.schemaKeyById(schemaObj.getId())) != null) {
            SchemaValue oldSchema;
            try {
                oldSchema = (SchemaValue)this.lookupCache.get(oldKey);
            }
            catch (StoreException e) {
                log.error("Error while retrieving schema", (Throwable)e);
                return false;
            }
            if (oldSchema != null && !oldSchema.getSchema().equals(schemaObj.getSchema())) {
                log.error("Found a schema with duplicate ID {}.  This schema will not be registered since a schema already exists with this ID.", (Object)schemaObj.getId());
                return false;
            }
        }
        return true;
    }

    @Override
    public void handleUpdate(SchemaRegistryKey key, SchemaRegistryValue value) {
        if (key.getKeyType() == SchemaRegistryKeyType.SCHEMA) {
            this.handleSchemaUpdate((SchemaKey)key, (SchemaValue)value);
        } else if (value != null) {
            if (key.getKeyType() == SchemaRegistryKeyType.DELETE_SUBJECT) {
                this.handleDeleteSubject((DeleteSubjectValue)value);
            } else if (key.getKeyType() == SchemaRegistryKeyType.CLEAR_SUBJECT) {
                this.handleClearSubject((ClearSubjectValue)value);
            }
        }
    }

    private void handleDeleteSubject(DeleteSubjectValue deleteSubjectValue) {
        String subject = deleteSubjectValue.getSubject();
        Integer deleteTillVersion = deleteSubjectValue.getVersion();
        for (int version = 1; version <= deleteTillVersion; ++version) {
            try {
                SchemaKey schemaKey = new SchemaKey(subject, version);
                SchemaValue schemaValue = (SchemaValue)this.lookupCache.get(schemaKey);
                if (schemaValue == null) continue;
                schemaValue.setDeleted(true);
                this.lookupCache.put(schemaKey, schemaValue);
                this.lookupCache.schemaDeleted(schemaKey, schemaValue);
                continue;
            }
            catch (StoreException e) {
                log.error("Failed to delete subject {} in the local cache", (Object)subject);
            }
        }
    }

    private void handleClearSubject(ClearSubjectValue clearSubjectValue) {
        String subject = clearSubjectValue.getSubject();
        try {
            this.lookupCache.clearSubjects(subject);
        }
        catch (StoreException e) {
            log.error("Failed to clear subject {} in the local cache", (Object)subject);
        }
    }

    private void handleSchemaUpdate(SchemaKey schemaKey, SchemaValue schemaObj) {
        if (schemaObj != null) {
            if (schemaObj.isDeleted()) {
                this.lookupCache.schemaDeleted(schemaKey, schemaObj);
            } else {
                this.idGenerator.schemaRegistered(schemaKey, schemaObj);
                this.lookupCache.schemaRegistered(schemaKey, schemaObj);
                List<SchemaKey> schemaKeys = this.lookupCache.deletedSchemaKeys(schemaObj);
                schemaKeys.stream().filter(v -> v.getSubject().equals(schemaObj.getSubject()) && v.getVersion() != schemaObj.getVersion().intValue()).forEach(this::tombstoneSchemaKey);
            }
        } else {
            this.lookupCache.schemaTombstoned(schemaKey);
        }
    }

    private void tombstoneSchemaKey(SchemaKey schemaKey) {
        this.tombstoneExecutor.execute(() -> {
            try {
                this.schemaRegistry.getKafkaStore().waitForInit();
                this.schemaRegistry.getKafkaStore().delete(schemaKey);
                log.debug("Tombstoned {}", (Object)schemaKey);
            }
            catch (InterruptedException e) {
                log.error("Interrupted while waiting for the tombstone thread to be initialized ", (Throwable)e);
            }
            catch (StoreException e) {
                log.error("Failed to tombstone {}", (Object)schemaKey, (Object)e);
            }
        });
    }
}

