/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.kafka.schemaregistry.storage;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import io.confluent.common.metrics.JmxReporter;
import io.confluent.common.metrics.MeasurableStat;
import io.confluent.common.metrics.MetricConfig;
import io.confluent.common.metrics.MetricName;
import io.confluent.common.metrics.Metrics;
import io.confluent.common.metrics.MetricsReporter;
import io.confluent.common.metrics.Sensor;
import io.confluent.common.metrics.stats.Gauge;
import io.confluent.common.utils.SystemTime;
import io.confluent.common.utils.Time;
import io.confluent.kafka.schemaregistry.CompatibilityLevel;
import io.confluent.kafka.schemaregistry.ParsedSchema;
import io.confluent.kafka.schemaregistry.SchemaProvider;
import io.confluent.kafka.schemaregistry.avro.AvroSchemaProvider;
import io.confluent.kafka.schemaregistry.client.rest.RestService;
import io.confluent.kafka.schemaregistry.client.rest.entities.Schema;
import io.confluent.kafka.schemaregistry.client.rest.entities.SchemaString;
import io.confluent.kafka.schemaregistry.client.rest.entities.SubjectVersion;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ConfigUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.ModeUpdateRequest;
import io.confluent.kafka.schemaregistry.client.rest.entities.requests.RegisterSchemaRequest;
import io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException;
import io.confluent.kafka.schemaregistry.client.rest.utils.UrlList;
import io.confluent.kafka.schemaregistry.client.security.SslFactory;
import io.confluent.kafka.schemaregistry.exceptions.IdDoesNotMatchException;
import io.confluent.kafka.schemaregistry.exceptions.IdGenerationException;
import io.confluent.kafka.schemaregistry.exceptions.IncompatibleSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.InvalidSchemaException;
import io.confluent.kafka.schemaregistry.exceptions.OperationNotPermittedException;
import io.confluent.kafka.schemaregistry.exceptions.ReferenceExistsException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryInitializationException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryRequestForwardingException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryStoreException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaRegistryTimeoutException;
import io.confluent.kafka.schemaregistry.exceptions.SchemaVersionNotSoftDeletedException;
import io.confluent.kafka.schemaregistry.exceptions.SubjectNotSoftDeletedException;
import io.confluent.kafka.schemaregistry.exceptions.UnknownMasterException;
import io.confluent.kafka.schemaregistry.id.IdGenerator;
import io.confluent.kafka.schemaregistry.id.IncrementalIdGenerator;
import io.confluent.kafka.schemaregistry.id.ZookeeperIdGenerator;
import io.confluent.kafka.schemaregistry.json.JsonSchemaProvider;
import io.confluent.kafka.schemaregistry.masterelector.kafka.KafkaGroupMasterElector;
import io.confluent.kafka.schemaregistry.masterelector.zookeeper.ZookeeperMasterElector;
import io.confluent.kafka.schemaregistry.protobuf.ProtobufSchemaProvider;
import io.confluent.kafka.schemaregistry.rest.SchemaRegistryConfig;
import io.confluent.kafka.schemaregistry.rest.VersionId;
import io.confluent.kafka.schemaregistry.rest.exceptions.Errors;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectKey;
import io.confluent.kafka.schemaregistry.storage.ClearSubjectValue;
import io.confluent.kafka.schemaregistry.storage.ConfigKey;
import io.confluent.kafka.schemaregistry.storage.ConfigValue;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectKey;
import io.confluent.kafka.schemaregistry.storage.DeleteSubjectValue;
import io.confluent.kafka.schemaregistry.storage.InMemoryCache;
import io.confluent.kafka.schemaregistry.storage.KafkaStore;
import io.confluent.kafka.schemaregistry.storage.KafkaStoreMessageHandler;
import io.confluent.kafka.schemaregistry.storage.LookupCache;
import io.confluent.kafka.schemaregistry.storage.MasterAwareSchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.MasterElector;
import io.confluent.kafka.schemaregistry.storage.Mode;
import io.confluent.kafka.schemaregistry.storage.ModeKey;
import io.confluent.kafka.schemaregistry.storage.ModeValue;
import io.confluent.kafka.schemaregistry.storage.NoopKey;
import io.confluent.kafka.schemaregistry.storage.SchemaIdAndSubjects;
import io.confluent.kafka.schemaregistry.storage.SchemaKey;
import io.confluent.kafka.schemaregistry.storage.SchemaReference;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistry;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryIdentity;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryKey;
import io.confluent.kafka.schemaregistry.storage.SchemaRegistryValue;
import io.confluent.kafka.schemaregistry.storage.SchemaValue;
import io.confluent.kafka.schemaregistry.storage.SubjectKey;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreInitializationException;
import io.confluent.kafka.schemaregistry.storage.exceptions.StoreTimeoutException;
import io.confluent.kafka.schemaregistry.storage.serialization.Serializer;
import io.confluent.rest.Application;
import io.confluent.rest.exceptions.RestException;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Collectors;
import javax.net.ssl.HostnameVerifier;
import org.apache.avro.reflect.Nullable;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.common.config.ConfigDef;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class KafkaSchemaRegistry
implements SchemaRegistry,
MasterAwareSchemaRegistry {
    public static final int MIN_VERSION = 1;
    public static final int MAX_VERSION = Integer.MAX_VALUE;
    private static final Logger log = LoggerFactory.getLogger(KafkaSchemaRegistry.class);
    private final SchemaRegistryConfig config;
    private final LoadingCache<Schema, ParsedSchema> schemaCache;
    private final LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache;
    final KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore;
    private final Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer;
    private final SchemaRegistryIdentity myIdentity;
    private final CompatibilityLevel defaultCompatibilityLevel;
    private final Mode defaultMode;
    private final int kafkaStoreTimeoutMs;
    private final int initTimeout;
    private final int kafkaStoreMaxRetries;
    private final boolean isEligibleForMasterElector;
    private final boolean allowModeChanges;
    private SchemaRegistryIdentity masterIdentity;
    private RestService masterRestService;
    private SslFactory sslFactory;
    private IdGenerator idGenerator = null;
    private MasterElector masterElector = null;
    private Metrics metrics;
    private Sensor masterNodeSensor;
    private final Map<String, SchemaProvider> providers;

    public KafkaSchemaRegistry(SchemaRegistryConfig config, Serializer<SchemaRegistryKey, SchemaRegistryValue> serializer) throws SchemaRegistryException {
        if (config == null) {
            throw new SchemaRegistryException("Schema registry configuration is null");
        }
        this.config = config;
        String host = config.getString("host.name");
        SchemeAndPort schemeAndPort = KafkaSchemaRegistry.getSchemeAndPortForIdentity(config.getInt("port"), config.getList("listeners"), config.interInstanceProtocol());
        this.isEligibleForMasterElector = config.getBoolean("master.eligibility");
        this.allowModeChanges = config.getBoolean("mode.mutability");
        this.myIdentity = new SchemaRegistryIdentity(host, schemeAndPort.port, this.isEligibleForMasterElector, schemeAndPort.scheme);
        this.sslFactory = new SslFactory(ConfigDef.convertToStringMapWithPasswordValues((Map)config.values()));
        this.kafkaStoreTimeoutMs = config.getInt("kafkastore.timeout.ms");
        this.initTimeout = config.getInt("kafkastore.init.timeout.ms");
        this.kafkaStoreMaxRetries = config.getInt("kafkastore.write.max.retries");
        this.serializer = serializer;
        this.defaultCompatibilityLevel = config.compatibilityType();
        this.defaultMode = Mode.READWRITE;
        this.schemaCache = CacheBuilder.newBuilder().maximumSize((long)config.getInt("schema.cache.size").intValue()).expireAfterAccess((long)config.getInt("schema.cache.expiry.secs").intValue(), TimeUnit.SECONDS).build((CacheLoader)new CacheLoader<Schema, ParsedSchema>(){

            public ParsedSchema load(Schema s) throws Exception {
                return KafkaSchemaRegistry.this.loadSchema(s.getSchemaType(), s.getSchema(), s.getReferences());
            }
        });
        this.lookupCache = this.lookupCache();
        this.idGenerator = this.identityGenerator(config);
        this.kafkaStore = this.kafkaStore(config);
        MetricConfig metricConfig = new MetricConfig().samples(config.getInt("metrics.num.samples").intValue()).timeWindow(config.getLong("metrics.sample.window.ms").longValue(), TimeUnit.MILLISECONDS);
        List reporters = config.getConfiguredInstances("metric.reporters", MetricsReporter.class);
        String jmxPrefix = "kafka.schema.registry";
        reporters.add(new JmxReporter(jmxPrefix));
        this.metrics = new Metrics(metricConfig, reporters, (Time)new SystemTime());
        this.masterNodeSensor = this.metrics.sensor("master-slave-role");
        this.providers = this.initProviders(config);
        Map configuredTags = Application.parseListToMap((List)config.getList("metrics.tag.map"));
        MetricName m = new MetricName("master-slave-role", "master-slave-role", "1.0 indicates the node is the active master in the cluster and is the node where all register schema and config update requests are served.", configuredTags);
        this.masterNodeSensor.add(m, (MeasurableStat)new Gauge());
    }

    private Map<String, SchemaProvider> initProviders(SchemaRegistryConfig config) {
        HashMap<String, KafkaSchemaRegistry> schemaProviderConfigs = new HashMap<String, KafkaSchemaRegistry>();
        schemaProviderConfigs.put("schemaVersionFetcher", this);
        List<SchemaProvider> defaultSchemaProviders = Arrays.asList(new AvroSchemaProvider(), new JsonSchemaProvider(), new ProtobufSchemaProvider());
        for (SchemaProvider provider : defaultSchemaProviders) {
            provider.configure(schemaProviderConfigs);
        }
        HashMap<String, SchemaProvider> providerMap = new HashMap<String, SchemaProvider>();
        this.registerProviders(providerMap, defaultSchemaProviders);
        List customSchemaProviders = config.getConfiguredInstances("schema.providers", SchemaProvider.class, schemaProviderConfigs);
        this.registerProviders(providerMap, customSchemaProviders);
        return providerMap;
    }

    private void registerProviders(Map<String, SchemaProvider> providerMap, List<SchemaProvider> schemaProviders) {
        for (SchemaProvider schemaProvider : schemaProviders) {
            log.info("Registering schema provider for {}: {}", (Object)schemaProvider.schemaType(), (Object)schemaProvider.getClass().getName());
            providerMap.put(schemaProvider.schemaType(), schemaProvider);
        }
    }

    protected KafkaStore<SchemaRegistryKey, SchemaRegistryValue> kafkaStore(SchemaRegistryConfig config) throws SchemaRegistryException {
        return new KafkaStore<SchemaRegistryKey, SchemaRegistryValue>(config, new KafkaStoreMessageHandler(this, this.lookupCache, this.idGenerator), this.serializer, this.lookupCache, new NoopKey());
    }

    protected LookupCache<SchemaRegistryKey, SchemaRegistryValue> lookupCache() {
        return new InMemoryCache<SchemaRegistryKey, SchemaRegistryValue>();
    }

    protected LookupCache<SchemaRegistryKey, SchemaRegistryValue> getLookupCache() {
        return this.lookupCache;
    }

    protected Serializer<SchemaRegistryKey, SchemaRegistryValue> getSerializer() {
        return this.serializer;
    }

    protected IdGenerator identityGenerator(SchemaRegistryConfig config) {
        IdGenerator idGenerator = config.useKafkaCoordination() ? new IncrementalIdGenerator() : new ZookeeperIdGenerator();
        idGenerator.configure(config);
        return idGenerator;
    }

    protected IdGenerator getIdentityGenerator() {
        return this.idGenerator;
    }

    static SchemeAndPort getSchemeAndPortForIdentity(int port, List<String> configuredListeners, String requestedScheme) throws SchemaRegistryException {
        List listeners = Application.parseListeners(configuredListeners, (int)port, Arrays.asList("http", "https"), (String)"http");
        if (requestedScheme.isEmpty()) {
            requestedScheme = "http";
        }
        for (URI listener : listeners) {
            if (!requestedScheme.equalsIgnoreCase(listener.getScheme())) continue;
            return new SchemeAndPort(listener.getScheme(), listener.getPort());
        }
        throw new SchemaRegistryException(" No listener configured with requested scheme " + requestedScheme);
    }

    @Override
    public void init() throws SchemaRegistryException {
        try {
            this.kafkaStore.init();
        }
        catch (StoreInitializationException e) {
            throw new SchemaRegistryInitializationException("Error initializing kafka store while initializing schema registry", e);
        }
        try {
            if (this.config.useKafkaCoordination()) {
                log.info("Joining schema registry with Kafka-based coordination");
                this.masterElector = new KafkaGroupMasterElector(this.config, this.myIdentity, this);
            } else {
                log.info("Joining schema registry with Zookeeper-based coordination");
                this.masterElector = new ZookeeperMasterElector(this.config, this.myIdentity, this);
            }
            this.masterElector.init();
        }
        catch (SchemaRegistryStoreException e) {
            throw new SchemaRegistryInitializationException("Error electing master while initializing schema registry", e);
        }
        catch (SchemaRegistryTimeoutException e) {
            throw new SchemaRegistryInitializationException(e);
        }
    }

    public boolean initialized() {
        return this.kafkaStore.initialized();
    }

    public boolean isMaster() {
        this.kafkaStore.masterLock().lock();
        try {
            if (this.masterIdentity != null && this.masterIdentity.equals(this.myIdentity)) {
                boolean bl = true;
                return bl;
            }
            boolean bl = false;
            return bl;
        }
        finally {
            this.kafkaStore.masterLock().unlock();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void setMaster(@Nullable SchemaRegistryIdentity newMaster) throws SchemaRegistryTimeoutException, SchemaRegistryStoreException, IdGenerationException {
        log.debug("Setting the master to " + newMaster);
        if (newMaster != null && !newMaster.getMasterEligibility()) {
            throw new IllegalStateException("Tried to set an ineligible node to master: " + newMaster);
        }
        this.kafkaStore.masterLock().lock();
        try {
            SchemaRegistryIdentity previousMaster = this.masterIdentity;
            this.masterIdentity = newMaster;
            if (this.masterIdentity == null) {
                this.masterRestService = null;
            } else {
                this.masterRestService = new RestService(this.masterIdentity.getUrl());
                if (this.sslFactory != null && this.sslFactory.sslContext() != null) {
                    this.masterRestService.setSslSocketFactory(this.sslFactory.sslContext().getSocketFactory());
                    this.masterRestService.setHostnameVerifier(this.getHostnameVerifier());
                }
            }
            if (this.masterIdentity != null && !this.masterIdentity.equals(previousMaster) && this.isMaster()) {
                this.kafkaStore.markLastWrittenOffsetInvalid();
                try {
                    this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(this.initTimeout);
                }
                catch (StoreException e) {
                    throw new SchemaRegistryStoreException("Exception getting latest offset ", e);
                }
                this.idGenerator.init();
            }
            this.masterNodeSensor.record(this.isMaster() ? 1.0 : 0.0);
        }
        finally {
            this.kafkaStore.masterLock().unlock();
        }
    }

    public SchemaRegistryIdentity myIdentity() {
        return this.myIdentity;
    }

    public SchemaRegistryIdentity masterIdentity() {
        this.kafkaStore.masterLock().lock();
        try {
            SchemaRegistryIdentity schemaRegistryIdentity = this.masterIdentity;
            return schemaRegistryIdentity;
        }
        finally {
            this.kafkaStore.masterLock().unlock();
        }
    }

    @Override
    public Set<String> schemaTypes() {
        return this.providers.keySet();
    }

    @Override
    public int register(String subject, Schema schema) throws SchemaRegistryException {
        try {
            this.checkRegisterMode(subject, schema);
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            ParsedSchema parsedSchema = this.canonicalizeSchema(schema);
            int schemaId = schema.getId();
            SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
            if (schemaIdAndSubjects != null) {
                if (schemaId >= 0 && schemaId != schemaIdAndSubjects.getSchemaId()) {
                    throw new IdDoesNotMatchException(schemaIdAndSubjects.getSchemaId(), schema.getId());
                }
                if (schemaIdAndSubjects.hasSubject(subject) && !this.isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject))) {
                    return schemaIdAndSubjects.getSchemaId();
                }
                schemaId = schemaIdAndSubjects.getSchemaId();
            }
            List<SchemaValue> allVersions = this.getAllSchemaValues(subject);
            Collections.reverse(allVersions);
            ArrayList<SchemaValue> deletedVersions = new ArrayList<SchemaValue>();
            ArrayList<ParsedSchema> undeletedVersions = new ArrayList<ParsedSchema>();
            int newVersion = 1;
            for (SchemaValue schemaValue : allVersions) {
                newVersion = Math.max(newVersion, schemaValue.getVersion() + 1);
                if (schemaValue.isDeleted()) {
                    deletedVersions.add(schemaValue);
                    continue;
                }
                ParsedSchema undeletedSchema = this.parseSchema(this.getSchemaEntityFromSchemaValue(schemaValue));
                if (parsedSchema.references().isEmpty() && !undeletedSchema.references().isEmpty() && parsedSchema.deepEquals(undeletedSchema)) {
                    return schemaValue.getId();
                }
                undeletedVersions.add(undeletedSchema);
            }
            Collections.reverse(undeletedVersions);
            boolean isCompatible = this.isCompatibleWithPrevious(subject, parsedSchema, undeletedVersions);
            schema.setSchema(parsedSchema.canonicalString());
            schema.setReferences(parsedSchema.references());
            if (isCompatible) {
                if (schema.getVersion() <= 0) {
                    schema.setVersion(Integer.valueOf(newVersion));
                }
                SchemaKey schemaKey = new SchemaKey(subject, schema.getVersion());
                if (schemaId >= 0) {
                    this.checkIfSchemaWithIdExist(schemaId, schema);
                    schema.setId(Integer.valueOf(schemaId));
                    this.kafkaStore.put(schemaKey, new SchemaValue(schema));
                } else {
                    int retries = 0;
                    while (retries++ < this.kafkaStoreMaxRetries) {
                        int newId = this.idGenerator.id(schema);
                        if (this.lookupCache.schemaKeyById(newId) != null) continue;
                        schema.setId(Integer.valueOf(newId));
                        if (retries > 1) {
                            log.warn(String.format("Retrying to register the schema with ID %s", newId));
                        }
                        this.kafkaStore.put(schemaKey, new SchemaValue(schema));
                        break;
                    }
                    if (retries >= this.kafkaStoreMaxRetries) {
                        throw new SchemaRegistryStoreException("Error while registering the schema due to generating an ID that is already in use.");
                    }
                }
                for (SchemaValue schemaValue : deletedVersions) {
                    if (!schemaValue.getId().equals(schema.getId())) continue;
                    SchemaKey key = new SchemaKey(schemaValue.getSubject(), schemaValue.getVersion());
                    this.kafkaStore.delete(key);
                }
                return schema.getId();
            }
            throw new IncompatibleSchemaException("New schema is incompatible with an earlier schema.");
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while registering the schema in the backend Kafka store", e);
        }
    }

    private void checkRegisterMode(String subject, Schema schema) throws OperationNotPermittedException, SchemaRegistryStoreException {
        if (this.getModeInScope(subject) == Mode.READONLY) {
            throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
        }
        if (schema.getId() >= 0 || schema.getVersion() > 0) {
            if (this.getModeInScope(subject) != Mode.IMPORT) {
                throw new OperationNotPermittedException("Subject " + subject + " is not in import mode");
            }
        } else if (this.getModeInScope(subject) != Mode.READWRITE) {
            throw new OperationNotPermittedException("Subject " + subject + " is not in read-write mode");
        }
    }

    public int registerOrForward(String subject, Schema schema, Map<String, String> headerProperties) throws SchemaRegistryException {
        Schema existingSchema = this.lookUpSchemaUnderSubject(subject, schema, false);
        if (existingSchema != null) {
            if (schema.getId() != null && schema.getId() >= 0 && !schema.getId().equals(existingSchema.getId())) {
                throw new IdDoesNotMatchException(existingSchema.getId(), schema.getId());
            }
            return existingSchema.getId();
        }
        this.kafkaStore.lockFor(subject).lock();
        try {
            if (this.isMaster()) {
                int n = this.register(subject, schema);
                return n;
            }
            if (this.masterIdentity != null) {
                int n = this.forwardRegisterRequestToMaster(subject, schema, headerProperties);
                return n;
            }
            throw new UnknownMasterException("Register schema request failed since master is unknown");
        }
        finally {
            this.kafkaStore.lockFor(subject).unlock();
        }
    }

    @Override
    public void deleteSchemaVersion(String subject, Schema schema, boolean permanentDelete) throws SchemaRegistryException {
        try {
            if (this.getModeInScope(subject) == Mode.READONLY) {
                throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
            }
            SchemaKey key = new SchemaKey(subject, schema.getVersion());
            if (!this.lookupCache.referencesSchema(key).isEmpty()) {
                throw new ReferenceExistsException(key.toString());
            }
            SchemaValue schemaValue = (SchemaValue)this.lookupCache.get(key);
            if (permanentDelete && !schemaValue.isDeleted()) {
                throw new SchemaVersionNotSoftDeletedException(subject, schema.getVersion().toString());
            }
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            if (!permanentDelete) {
                schemaValue = new SchemaValue(schema);
                schemaValue.setDeleted(true);
                this.kafkaStore.put(key, schemaValue);
                if (!this.getAllVersions(subject, false).hasNext()) {
                    if (this.getMode(subject) != null) {
                        this.deleteMode(subject);
                    }
                    if (this.getCompatibilityLevel(subject) != null) {
                        this.deleteSubjectCompatibility(subject);
                    }
                }
            } else {
                this.kafkaStore.put(key, null);
            }
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while deleting the schema for subject '" + subject + "' in the backend Kafka store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void deleteSchemaVersionOrForward(Map<String, String> headerProperties, String subject, Schema schema, boolean permanentDelete) throws SchemaRegistryException {
        block5: {
            this.kafkaStore.lockFor(subject).lock();
            try {
                if (this.isMaster()) {
                    this.deleteSchemaVersion(subject, schema, permanentDelete);
                    break block5;
                }
                if (this.masterIdentity != null) {
                    this.forwardDeleteSchemaVersionRequestToMaster(headerProperties, subject, schema.getVersion(), permanentDelete);
                    break block5;
                }
                throw new UnknownMasterException("Register schema request failed since master is unknown");
            }
            finally {
                this.kafkaStore.lockFor(subject).unlock();
            }
        }
    }

    @Override
    public List<Integer> deleteSubject(String subject, boolean permanentDelete) throws SchemaRegistryException {
        try {
            SubjectKey key;
            if (this.getModeInScope(subject) == Mode.READONLY) {
                throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
            }
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            ArrayList<Integer> deletedVersions = new ArrayList<Integer>();
            int deleteWatermarkVersion = 0;
            Iterator<Schema> schemasToBeDeleted = this.getAllVersions(subject, permanentDelete);
            while (schemasToBeDeleted.hasNext()) {
                SchemaValue schemaValue;
                deleteWatermarkVersion = schemasToBeDeleted.next().getVersion();
                key = new SchemaKey(subject, deleteWatermarkVersion);
                if (!this.lookupCache.referencesSchema((SchemaKey)key).isEmpty()) {
                    throw new ReferenceExistsException(((SchemaKey)key).toString());
                }
                if (permanentDelete && !(schemaValue = (SchemaValue)this.lookupCache.get(key)).isDeleted()) {
                    throw new SubjectNotSoftDeletedException(subject);
                }
                deletedVersions.add(deleteWatermarkVersion);
            }
            if (!permanentDelete) {
                key = new DeleteSubjectKey(subject);
                DeleteSubjectValue value = new DeleteSubjectValue(subject, deleteWatermarkVersion);
                this.kafkaStore.put(key, value);
                if (this.getMode(subject) != null) {
                    this.deleteMode(subject);
                }
                if (this.getCompatibilityLevel(subject) != null) {
                    this.deleteSubjectCompatibility(subject);
                }
            } else {
                for (Integer version : deletedVersions) {
                    this.kafkaStore.put(new SchemaKey(subject, version), null);
                }
            }
            return deletedVersions;
        }
        catch (StoreTimeoutException te) {
            throw new SchemaRegistryTimeoutException("Write to the Kafka store timed out while", te);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while deleting the subject in the backend Kafka store", e);
        }
    }

    public List<Integer> deleteSubjectOrForward(Map<String, String> requestProperties, String subject, boolean permanentDelete) throws SchemaRegistryException {
        this.kafkaStore.lockFor(subject).lock();
        try {
            if (this.isMaster()) {
                List<Integer> list = this.deleteSubject(subject, permanentDelete);
                return list;
            }
            if (this.masterIdentity != null) {
                List<Integer> list = this.forwardDeleteSubjectRequestToMaster(requestProperties, subject, permanentDelete);
                return list;
            }
            throw new UnknownMasterException("Register schema request failed since master is unknown");
        }
        finally {
            this.kafkaStore.lockFor(subject).unlock();
        }
    }

    @Override
    public Schema lookUpSchemaUnderSubject(String subject, Schema schema, boolean lookupDeletedSchema) throws SchemaRegistryException {
        ParsedSchema parsedSchema = this.canonicalizeSchema(schema);
        SchemaIdAndSubjects schemaIdAndSubjects = this.lookupCache.schemaIdAndSubjects(schema);
        if (schemaIdAndSubjects != null && schemaIdAndSubjects.hasSubject(subject) && (lookupDeletedSchema || !this.isSubjectVersionDeleted(subject, schemaIdAndSubjects.getVersion(subject)))) {
            Schema matchingSchema = new Schema(subject, Integer.valueOf(schemaIdAndSubjects.getVersion(subject)), Integer.valueOf(schemaIdAndSubjects.getSchemaId()), schema.getSchemaType(), schema.getReferences(), schema.getSchema());
            return matchingSchema;
        }
        List<SchemaValue> allVersions = this.getAllSchemaValues(subject);
        Collections.reverse(allVersions);
        for (SchemaValue schemaValue : allVersions) {
            Schema undeleted;
            ParsedSchema undeletedSchema;
            if (schemaValue.isDeleted() || !parsedSchema.references().isEmpty() || schemaValue.getReferences().isEmpty() || !parsedSchema.deepEquals(undeletedSchema = this.parseSchema(undeleted = this.getSchemaEntityFromSchemaValue(schemaValue)))) continue;
            return undeleted;
        }
        return null;
    }

    public void checkIfSchemaWithIdExist(int id, Schema schema) throws SchemaRegistryException, StoreException {
        SchemaRegistryValue existingValue;
        SchemaKey existingKey = this.lookupCache.schemaKeyById(id);
        if (existingKey != null && (existingValue = (SchemaRegistryValue)this.lookupCache.get(existingKey)) != null && existingValue instanceof SchemaValue && !((SchemaValue)existingValue).getSchema().equals(schema.getSchema())) {
            throw new OperationNotPermittedException(String.format("Overwrite new schema with id %s is not permitted.", id));
        }
    }

    private int forwardRegisterRequestToMaster(String subject, Schema schema, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        RegisterSchemaRequest registerSchemaRequest = new RegisterSchemaRequest();
        registerSchemaRequest.setSchema(schema.getSchema());
        registerSchemaRequest.setSchemaType(schema.getSchemaType());
        registerSchemaRequest.setReferences(schema.getReferences());
        registerSchemaRequest.setVersion(schema.getVersion());
        registerSchemaRequest.setId(schema.getId());
        log.debug(String.format("Forwarding registering schema request %s to %s", registerSchemaRequest, baseUrl));
        try {
            int id = this.masterRestService.registerSchema(headerProperties, registerSchemaRequest, subject);
            return id;
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the registering schema request %s to %s", registerSchemaRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardUpdateCompatibilityLevelRequestToMaster(String subject, CompatibilityLevel compatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        ConfigUpdateRequest configUpdateRequest = new ConfigUpdateRequest();
        configUpdateRequest.setCompatibilityLevel(compatibilityLevel.name);
        log.debug(String.format("Forwarding update config request %s to %s", configUpdateRequest, baseUrl));
        try {
            this.masterRestService.updateConfig(headerProperties, configUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update config request %s to %s", configUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardDeleteSchemaVersionRequestToMaster(Map<String, String> headerProperties, String subject, Integer version, boolean permanentDelete) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        log.debug(String.format("Forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl));
        try {
            this.masterRestService.deleteSchemaVersion(headerProperties, subject, String.valueOf(version), permanentDelete);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding deleteSchemaVersion schema version request %s-%s to %s", subject, version, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private List<Integer> forwardDeleteSubjectRequestToMaster(Map<String, String> requestProperties, String subject, boolean permanentDelete) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        log.debug(String.format("Forwarding delete subject request for  %s to %s", subject, baseUrl));
        try {
            return this.masterRestService.deleteSubject(requestProperties, subject, permanentDelete);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding delete subject request %s to %s", subject, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private void forwardSetModeRequestToMaster(String subject, Mode mode, Map<String, String> headerProperties) throws SchemaRegistryRequestForwardingException {
        UrlList baseUrl = this.masterRestService.getBaseUrls();
        ModeUpdateRequest modeUpdateRequest = new ModeUpdateRequest();
        modeUpdateRequest.setMode(mode.name());
        log.debug(String.format("Forwarding update mode request %s to %s", modeUpdateRequest, baseUrl));
        try {
            this.masterRestService.setMode(headerProperties, modeUpdateRequest, subject);
        }
        catch (IOException e) {
            throw new SchemaRegistryRequestForwardingException(String.format("Unexpected error while forwarding the update mode request %s to %s", modeUpdateRequest, baseUrl), e);
        }
        catch (RestClientException e) {
            throw new RestException(e.getMessage(), e.getStatus(), e.getErrorCode(), (Throwable)e);
        }
    }

    private ParsedSchema canonicalizeSchema(Schema schema) throws InvalidSchemaException {
        if (schema == null || schema.getSchema() == null || schema.getSchema().trim().isEmpty()) {
            log.error("Empty schema");
            throw new InvalidSchemaException("Empty schema");
        }
        ParsedSchema parsedSchema = this.parseSchema(schema);
        try {
            parsedSchema.validate();
        }
        catch (Exception e) {
            String errMsg = "Invalid schema " + schema;
            log.error(errMsg, (Throwable)e);
            throw new InvalidSchemaException(errMsg, e);
        }
        schema.setSchema(parsedSchema.canonicalString());
        schema.setReferences(parsedSchema.references());
        return parsedSchema;
    }

    private ParsedSchema parseSchema(Schema schema) throws InvalidSchemaException {
        return this.parseSchema(schema.getSchemaType(), schema.getSchema(), schema.getReferences());
    }

    private ParsedSchema parseSchema(String schemaType, String schema, List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference> references) throws InvalidSchemaException {
        try {
            return (ParsedSchema)this.schemaCache.get((Object)new Schema("", Integer.valueOf(0), Integer.valueOf(-1), schemaType, references, schema));
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof InvalidSchemaException) {
                throw (InvalidSchemaException)cause;
            }
            if (cause instanceof RuntimeException) {
                throw (RuntimeException)cause;
            }
            throw new RuntimeException(e);
        }
    }

    private ParsedSchema loadSchema(String schemaType, String schema, List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference> references) throws InvalidSchemaException {
        SchemaProvider provider;
        if (schemaType == null) {
            schemaType = "AVRO";
        }
        if ((provider = this.providers.get(schemaType)) == null) {
            String errMsg = "Invalid schema type " + schemaType;
            log.error(errMsg);
            throw new InvalidSchemaException(errMsg);
        }
        String type = schemaType;
        ParsedSchema parsedSchema = (ParsedSchema)provider.parseSchema(schema, references).orElseThrow(() -> new InvalidSchemaException("Invalid schema " + schema + " with refs " + references + " of type " + type));
        return parsedSchema;
    }

    public Schema validateAndGetSchema(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException {
        int version = versionId.getVersionId();
        Schema schema = this.get(subject, version, returnDeletedSchema);
        if (schema == null) {
            if (!this.hasSubjects(subject, returnDeletedSchema)) {
                throw Errors.subjectNotFoundException(subject);
            }
            throw Errors.versionNotFoundException(version);
        }
        return schema;
    }

    public boolean schemaVersionExists(String subject, VersionId versionId, boolean returnDeletedSchema) throws SchemaRegistryException {
        int version = versionId.getVersionId();
        Schema schema = this.get(subject, version, returnDeletedSchema);
        return schema != null;
    }

    @Override
    public Schema get(String subject, int version, boolean returnDeletedSchema) throws SchemaRegistryException {
        VersionId versionId = new VersionId(version);
        if (versionId.isLatest()) {
            return this.getLatestVersion(subject);
        }
        SchemaKey key = new SchemaKey(subject, version);
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(key);
            Schema schema = null;
            if (schemaValue != null && !schemaValue.isDeleted() || returnDeletedSchema) {
                schema = this.getSchemaEntityFromSchemaValue(schemaValue);
            }
            return schema;
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    @Override
    public SchemaString get(int id) throws SchemaRegistryException {
        return this.get(id, null, false);
    }

    public SchemaString get(int id, String format, boolean fetchMaxId) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e);
        }
        SchemaString schemaString = new SchemaString();
        schemaString.setSchemaType(schema.getSchemaType());
        List<io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference> refs = schema.getReferences() != null ? schema.getReferences().stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()) : null;
        schemaString.setReferences(refs);
        if (format != null && !format.trim().isEmpty()) {
            ParsedSchema parsedSchema = this.parseSchema(schema.getSchemaType(), schema.getSchema(), refs);
            schemaString.setSchemaString(parsedSchema.formattedString(format));
        } else {
            schemaString.setSchemaString(schema.getSchema());
        }
        if (fetchMaxId) {
            schemaString.setMaxId(Integer.valueOf(this.idGenerator.getMaxId(id)));
        }
        return schemaString;
    }

    public List<Integer> getReferencedBy(String subject, VersionId versionId) throws SchemaRegistryException {
        int version = versionId.getVersionId();
        if (versionId.isLatest()) {
            version = this.getLatestVersion(subject).getVersion();
        }
        SchemaKey key = new SchemaKey(subject, version);
        ArrayList<Integer> ids = new ArrayList<Integer>(this.lookupCache.referencesSchema(key));
        Collections.sort(ids);
        return ids;
    }

    @Override
    public Set<String> listSubjects(boolean returnDeletedSubjects) throws SchemaRegistryException {
        try {
            Iterator<SchemaRegistryKey> allKeys = this.kafkaStore.getAllKeys();
            return this.extractUniqueSubjects(allKeys, returnDeletedSubjects);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    public Set<String> listSubjectsForId(int id) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e);
        }
        return this.lookupCache.schemaIdAndSubjects(new Schema(schema.getSubject(), schema.getVersion(), schema.getId(), schema.getSchemaType(), schema.getReferences().stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()), schema.getSchema())).allSubjects();
    }

    public List<SubjectVersion> listVersionsForId(int id) throws SchemaRegistryException {
        SchemaValue schema = null;
        try {
            SchemaKey subjectVersionKey = this.lookupCache.schemaKeyById(id);
            if (subjectVersionKey == null) {
                return null;
            }
            schema = (SchemaValue)this.kafkaStore.get(subjectVersionKey);
            if (schema == null) {
                return null;
            }
        }
        catch (StoreException e2) {
            throw new SchemaRegistryStoreException("Error while retrieving schema with id " + id + " from the backend Kafka store", e2);
        }
        return this.lookupCache.schemaIdAndSubjects(new Schema(schema.getSubject(), schema.getVersion(), schema.getId(), schema.getSchemaType(), schema.getReferences().stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()), schema.getSchema())).allSubjectVersions().entrySet().stream().map(e -> new SubjectVersion((String)e.getKey(), (Integer)e.getValue())).collect(Collectors.toList());
    }

    private Set<String> extractUniqueSubjects(Iterator<SchemaRegistryKey> allKeys, boolean returnDeletedSubjects) throws StoreException {
        HashSet<String> subjects = new HashSet<String>();
        while (allKeys.hasNext()) {
            SchemaKey key;
            SchemaValue value;
            SchemaRegistryKey k = allKeys.next();
            if (!(k instanceof SchemaKey) || (value = (SchemaValue)this.kafkaStore.get(key = (SchemaKey)k)) == null || value.isDeleted() && !returnDeletedSubjects) continue;
            subjects.add(key.getSubject());
        }
        return subjects;
    }

    public boolean hasSubjects(String subject, boolean lookupDeletedSubjects) throws SchemaRegistryStoreException {
        try {
            return this.lookupCache.hasSubjects(subject, lookupDeletedSubjects);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public Iterator<Schema> getAllVersions(String subject, boolean returnDeletedSchemas) throws SchemaRegistryException {
        return this.sortSchemasByVersion(this.allVersions(subject, false), returnDeletedSchemas).iterator();
    }

    public Iterator<Schema> getAllVersionsWithPrefix(String prefix, boolean returnDeletedSchemas) throws SchemaRegistryException {
        return this.sortSchemasByVersion(this.allVersions(prefix, true), returnDeletedSchemas).iterator();
    }

    private List<SchemaValue> getAllSchemaValues(String subject) throws SchemaRegistryException {
        return this.sortSchemaValuesByVersion(this.allVersions(subject, false));
    }

    @Override
    public Schema getLatestVersion(String subject) throws SchemaRegistryException {
        List<Schema> sortedVersions = this.sortSchemasByVersion(this.allVersions(subject, false), false);
        return sortedVersions.size() > 0 ? sortedVersions.get(sortedVersions.size() - 1) : null;
    }

    private Iterator<SchemaRegistryValue> allVersions(String subjectOrPrefix, boolean isPrefix) throws SchemaRegistryException {
        try {
            String start = subjectOrPrefix;
            String end = isPrefix ? subjectOrPrefix + '\uffff' : subjectOrPrefix;
            SchemaKey key1 = new SchemaKey(start, 1);
            SchemaKey key2 = new SchemaKey(end, Integer.MAX_VALUE);
            return this.kafkaStore.getAll(key1, key2);
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error from the backend Kafka store", e);
        }
    }

    @Override
    public void close() {
        log.info("Shutting down schema registry");
        this.kafkaStore.close();
        if (this.masterElector != null) {
            this.masterElector.close();
        }
    }

    public void updateCompatibilityLevel(String subject, CompatibilityLevel newCompatibilityLevel) throws SchemaRegistryStoreException, OperationNotPermittedException, UnknownMasterException {
        if (this.getModeInScope(subject) == Mode.READONLY) {
            throw new OperationNotPermittedException("Subject " + subject + " is in read-only mode");
        }
        ConfigKey configKey = new ConfigKey(subject);
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            this.kafkaStore.put(configKey, new ConfigValue(newCompatibilityLevel));
            log.debug("Wrote new compatibility level: " + newCompatibilityLevel.name + " to the Kafka data store with key " + configKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new config value to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateConfigOrForward(String subject, CompatibilityLevel newCompatibilityLevel, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, UnknownMasterException, OperationNotPermittedException {
        block5: {
            this.kafkaStore.lockFor(subject).lock();
            try {
                if (this.isMaster()) {
                    this.updateCompatibilityLevel(subject, newCompatibilityLevel);
                    break block5;
                }
                if (this.masterIdentity != null) {
                    this.forwardUpdateCompatibilityLevelRequestToMaster(subject, newCompatibilityLevel, headerProperties);
                    break block5;
                }
                throw new UnknownMasterException("Update config request failed since master is unknown");
            }
            finally {
                this.kafkaStore.lockFor(subject).unlock();
            }
        }
    }

    public String getKafkaClusterId() throws SchemaRegistryException {
        String kafkaClusterId;
        Properties adminClientProps = new Properties();
        KafkaStore.addSchemaRegistryConfigsToClientProperties(this.config, adminClientProps);
        adminClientProps.put("bootstrap.servers", this.config.bootstrapBrokers());
        try (AdminClient adminClient = AdminClient.create((Properties)adminClientProps);){
            kafkaClusterId = (String)adminClient.describeCluster().clusterId().get((long)this.initTimeout, TimeUnit.MILLISECONDS);
        }
        catch (InterruptedException | ExecutionException | TimeoutException e) {
            throw new SchemaRegistryException("Failed to get Kafka cluster ID", e);
        }
        return kafkaClusterId;
    }

    public CompatibilityLevel getCompatibilityLevel(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.compatibilityLevel(subject, false, this.defaultCompatibilityLevel);
    }

    public CompatibilityLevel getCompatibilityLevelInScope(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.compatibilityLevel(subject, true, this.defaultCompatibilityLevel);
    }

    @Override
    public boolean isCompatible(String subject, Schema newSchema, Schema latestSchema) throws SchemaRegistryException {
        if (latestSchema == null) {
            log.error("Lastest schema not provided");
            throw new InvalidSchemaException("Latest schema not provided");
        }
        return this.isCompatible(subject, newSchema, Collections.singletonList(latestSchema));
    }

    @Override
    public boolean isCompatible(String subject, Schema newSchema, List<Schema> previousSchemas) throws SchemaRegistryException {
        if (previousSchemas == null) {
            log.error("Previous schema not provided");
            throw new InvalidSchemaException("Previous schema not provided");
        }
        ArrayList<ParsedSchema> prevParsedSchemas = new ArrayList<ParsedSchema>(previousSchemas.size());
        for (Schema previousSchema : previousSchemas) {
            ParsedSchema prevParsedSchema = this.parseSchema(previousSchema);
            prevParsedSchemas.add(prevParsedSchema);
        }
        return this.isCompatibleWithPrevious(subject, this.parseSchema(newSchema), prevParsedSchemas);
    }

    private boolean isCompatibleWithPrevious(String subject, ParsedSchema parsedSchema, List<ParsedSchema> previousSchemas) throws SchemaRegistryException {
        CompatibilityLevel compatibility = this.getCompatibilityLevelInScope(subject);
        return parsedSchema.isCompatible(compatibility, previousSchemas);
    }

    private void deleteMode(String subject) throws StoreException {
        ModeKey modeKey = new ModeKey(subject);
        this.kafkaStore.delete(modeKey);
    }

    private void deleteSubjectCompatibility(String subject) throws StoreException {
        ConfigKey configKey = new ConfigKey(subject);
        this.kafkaStore.delete(configKey);
    }

    public Mode getMode(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.mode(subject, false, this.defaultMode);
    }

    private Mode getModeInScope(String subject) throws SchemaRegistryStoreException {
        return this.lookupCache.mode(subject, true, this.defaultMode);
    }

    public void setMode(String subject, Mode mode) throws SchemaRegistryStoreException, OperationNotPermittedException {
        if (!this.allowModeChanges) {
            throw new OperationNotPermittedException("Mode changes are not allowed");
        }
        ModeKey modeKey = new ModeKey(subject);
        try {
            this.kafkaStore.waitUntilKafkaReaderReachesLastOffset(subject, this.kafkaStoreTimeoutMs);
            if (mode == Mode.IMPORT && this.getMode(subject) != Mode.IMPORT) {
                if (this.hasSubjects(subject, false)) {
                    throw new OperationNotPermittedException("Cannot import since found existing subjects");
                }
                this.kafkaStore.put(new ClearSubjectKey(subject), new ClearSubjectValue(subject));
            }
            this.kafkaStore.put(modeKey, new ModeValue(mode));
            log.debug("Wrote new mode: " + mode.name() + " to the Kafka data store with key " + modeKey.toString());
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Failed to write new mode to the store", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setModeOrForward(String subject, Mode mode, Map<String, String> headerProperties) throws SchemaRegistryStoreException, SchemaRegistryRequestForwardingException, OperationNotPermittedException, UnknownMasterException {
        block5: {
            this.kafkaStore.lockFor(subject).lock();
            try {
                if (this.isMaster()) {
                    this.setMode(subject, mode);
                    break block5;
                }
                if (this.masterIdentity != null) {
                    this.forwardSetModeRequestToMaster(subject, mode, headerProperties);
                    break block5;
                }
                throw new UnknownMasterException("Update mode request failed since master is unknown");
            }
            finally {
                this.kafkaStore.lockFor(subject).unlock();
            }
        }
    }

    KafkaStore<SchemaRegistryKey, SchemaRegistryValue> getKafkaStore() {
        return this.kafkaStore;
    }

    private List<Schema> sortSchemasByVersion(Iterator<SchemaRegistryValue> schemas, boolean returnDeletedSchemas) {
        ArrayList<Schema> schemaList = new ArrayList<Schema>();
        while (schemas.hasNext()) {
            SchemaValue schemaValue = (SchemaValue)schemas.next();
            if (!returnDeletedSchemas && schemaValue.isDeleted()) continue;
            schemaList.add(this.getSchemaEntityFromSchemaValue(schemaValue));
        }
        Collections.sort(schemaList);
        return schemaList;
    }

    private List<SchemaValue> sortSchemaValuesByVersion(Iterator<SchemaRegistryValue> schemas) {
        ArrayList<SchemaValue> schemaList = new ArrayList<SchemaValue>();
        while (schemas.hasNext()) {
            SchemaValue schemaValue = (SchemaValue)schemas.next();
            schemaList.add(schemaValue);
        }
        Collections.sort(schemaList);
        return schemaList;
    }

    private Schema getSchemaEntityFromSchemaValue(SchemaValue schemaValue) {
        if (schemaValue == null) {
            return null;
        }
        List<SchemaReference> refs = schemaValue.getReferences();
        return new Schema(schemaValue.getSubject(), schemaValue.getVersion(), schemaValue.getId(), schemaValue.getSchemaType(), refs == null ? null : refs.stream().map(ref -> new io.confluent.kafka.schemaregistry.client.rest.entities.SchemaReference(ref.getName(), ref.getSubject(), ref.getVersion())).collect(Collectors.toList()), schemaValue.getSchema());
    }

    private boolean isSubjectVersionDeleted(String subject, int version) throws SchemaRegistryException {
        try {
            SchemaValue schemaValue = (SchemaValue)this.kafkaStore.get(new SchemaKey(subject, version));
            return schemaValue == null || schemaValue.isDeleted();
        }
        catch (StoreException e) {
            throw new SchemaRegistryStoreException("Error while retrieving schema from the backend Kafka store", e);
        }
    }

    @Override
    public SchemaRegistryConfig config() {
        return this.config;
    }

    public HostnameVerifier getHostnameVerifier() throws SchemaRegistryStoreException {
        String sslEndpointIdentificationAlgo = this.config.getString("ssl.endpoint.identification.algorithm");
        if (sslEndpointIdentificationAlgo == null || sslEndpointIdentificationAlgo.equals("none") || sslEndpointIdentificationAlgo.isEmpty()) {
            return (hostname, session) -> true;
        }
        if (sslEndpointIdentificationAlgo.equalsIgnoreCase("https")) {
            return null;
        }
        throw new SchemaRegistryStoreException("ssl.endpoint.identification.algorithm " + sslEndpointIdentificationAlgo + " not supported");
    }

    public static class SchemeAndPort {
        public int port;
        public String scheme;

        public SchemeAndPort(String scheme, int port) {
            this.port = port;
            this.scheme = scheme;
        }
    }
}

