/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Properties;
import kafka.server.ConfigType$;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkUtils$;
import org.apache.kafka.clients.admin.Config;
import org.apache.kafka.clients.admin.DescribeConfigsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.config.ConfigResource;
import org.apache.kafka.common.errors.PolicyViolationException;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.Iterable$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001E4AAC\u0006\u0001%!A1\u0004\u0001B\u0001B\u0003%A\u0004C\u0005 \u0001\t\u0005\t\u0015!\u0003!M!A\u0001\u0006\u0001B\u0001B\u0003%\u0011\u0006C\u0003-\u0001\u0011\u0005Q\u0006C\u00043\u0001\t\u0007I\u0011B\u001a\t\r]\u0003\u0001\u0015!\u00035\u0011\u0015A\u0006\u0001\"\u0015Z\u0011\u0015i\u0006\u0001\"\u0003_\u0011\u0015\u0001\b\u0001\"\u00014\u0005q\u0019E.^:uKJd\u0015N\\6Ts:\u001cGk\u001c9jGN\u001cuN\u001c4jONT!\u0001D\u0007\u0002\t1Lgn\u001b\u0006\u0003\u001d=\taa]3sm\u0016\u0014(\"\u0001\t\u0002\u000b-\fgm[1\u0004\u0001M\u0011\u0001a\u0005\t\u0003)aq!!\u0006\f\u000e\u0003-I!aF\u0006\u0002)\rcWo\u001d;fe2Kgn[*dQ\u0016$W\u000f\\3s\u0013\tI\"D\u0001\u0007QKJLw\u000eZ5d)\u0006\u001c8N\u0003\u0002\u0018\u0017\u0005i1\r\\5f]Rl\u0015M\\1hKJ\u0004\"!F\u000f\n\u0005yY!\u0001G\"mkN$XM\u001d'j].\u001cE.[3oi6\u000bg.Y4fe\u0006q1/\u001f8d\u0013:$XM\u001d<bY6\u001b\bCA\u0011%\u001b\u0005\u0011#\"A\u0012\u0002\u000bM\u001c\u0017\r\\1\n\u0005\u0015\u0012#aA%oi&\u0011q\u0005G\u0001\u0012e\u0016\u001c8\r[3ek2,G)\u001a7bs6\u001b\u0018aB7fiJL7m\u001d\t\u0003+)J!aK\u0006\u0003%\rcWo\u001d;fe2Kgn['fiJL7m]\u0001\u0007y%t\u0017\u000e\u001e \u0015\t9z\u0003'\r\t\u0003+\u0001AQa\u0007\u0003A\u0002qAQa\b\u0003A\u0002\u0001BQ\u0001\u000b\u0003A\u0002%\nqaY8oM&<7/F\u00015!\u0011)$\bP$\u000e\u0003YR!a\u000e\u001d\u0002\u000f5,H/\u00192mK*\u0011\u0011HI\u0001\u000bG>dG.Z2uS>t\u0017BA\u001e7\u0005\ri\u0015\r\u001d\t\u0003{\u0011s!A\u0010\"\u0011\u0005}\u0012S\"\u0001!\u000b\u0005\u0005\u000b\u0012A\u0002\u001fs_>$h(\u0003\u0002DE\u00051\u0001K]3eK\u001aL!!\u0012$\u0003\rM#(/\u001b8h\u0015\t\u0019%\u0005E\u0002\"\u0011*K!!\u0013\u0012\u0003\r=\u0003H/[8o!\tYU+D\u0001M\u0015\tie*A\u0003bI6LgN\u0003\u0002P!\u000691\r\\5f]R\u001c(B\u0001\tR\u0015\t\u00116+\u0001\u0004ba\u0006\u001c\u0007.\u001a\u0006\u0002)\u0006\u0019qN]4\n\u0005Yc%AB\"p]\u001aLw-\u0001\u0005d_:4\u0017nZ:!\u0003\r\u0011XO\u001c\u000b\u00025B\u0011\u0011eW\u0005\u00039\n\u0012qAQ8pY\u0016\fg.\u0001\niC:$G.\u001a+pa&\u001c7i\u001c8gS\u001e\u001cHC\u0001.`\u0011\u0015\u0001\u0007\u00021\u0001b\u0003\u0019\u0011Xm];miB!!m\u00193m\u001b\u0005A\u0014BA\u001e9!\t)'.D\u0001g\u0015\t9\u0007.\u0001\u0004d_:4\u0017n\u001a\u0006\u0003SB\u000baaY8n[>t\u0017BA6g\u00059\u0019uN\u001c4jOJ+7o\\;sG\u0016\u00042!\u001c8K\u001b\u0005A\u0017BA8i\u0005-Y\u0015MZ6b\rV$XO]3\u0002!\r,(O]3oi\u000e{gNZ5h\u001b\u0006\u0004\b")
public class ClusterLinkSyncTopicsConfigs
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkMetrics metrics;
    private final scala.collection.mutable.Map<String, Option<Config>> configs;

    private scala.collection.mutable.Map<String, Option<Config>> configs() {
        return this.configs;
    }

    @Override
    public boolean run() {
        Set<String> newTopics = this.clientManager.getTopics();
        Set oldTopics = this.configs().keySet();
        oldTopics.diff(newTopics).foreach((Function1 & Serializable & scala.Serializable)key -> this.configs().remove(key));
        newTopics.diff((GenSet)oldTopics).foreach((Function1 & Serializable & scala.Serializable)topic -> this.configs().put(topic, (Object)None$.MODULE$));
        scala.collection.immutable.Set resources = ((TraversableOnce)this.configs().keys().map((Function1 & Serializable & scala.Serializable)name -> new ConfigResource(ConfigResource.Type.TOPIC, name), Iterable$.MODULE$.canBuildFrom())).toSet();
        if (resources.nonEmpty()) {
            this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Attempting to retrieve topic configs from source cluster");
            DescribeConfigsResult describeConfigsResult = this.clientManager.getAdmin().describeConfigs((Collection)CollectionConverters$.MODULE$.setAsJavaSetConverter((Set)resources).asJava());
            this.scheduleWhenComplete(describeConfigsResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.handleTopicConfigs((Map<ConfigResource, KafkaFuture<Config>>)((TraversableOnce)CollectionConverters$.MODULE$.mapAsScalaMapConverter(describeConfigsResult.values()).asScala()).toMap(Predef$.MODULE$.$conforms())));
            return false;
        }
        return true;
    }

    private boolean handleTopicConfigs(Map<ConfigResource, KafkaFuture<Config>> result) {
        result.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            ClusterLinkSyncTopicsConfigs.$anonfun$handleTopicConfigs$1(this, x0$1);
            return BoxedUnit.UNIT;
        });
        return true;
    }

    public scala.collection.mutable.Map<String, Option<Config>> currentConfigMap() {
        return this.configs();
    }

    public static final /* synthetic */ boolean $anonfun$handleTopicConfigs$3(Config curConfig$1, Config x$1) {
        return !x$1.equals((Object)curConfig$1);
    }

    public static final /* synthetic */ void $anonfun$handleTopicConfigs$2(ClusterLinkSyncTopicsConfigs $this, KafkaFuture future$1, String topic$1, Option oldConfig) {
        try {
            Config curConfig = (Config)future$1.get();
            if (oldConfig.forall((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSyncTopicsConfigs.$anonfun$handleTopicConfigs$3(curConfig, x$1)))) {
                Properties newProps;
                $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(72).append("Detected new remote configuration for mirror topic '").append(topic$1).append("' on cluster link '").append($this.clientManager.linkData().linkName()).append("'").toString());
                Properties curProps = $this.clientManager.adminZkClient().fetchEntityConfig(ConfigType$.MODULE$.Topic(), topic$1);
                Properties properties = newProps = ClusterLinkUtils$.MODULE$.restrictValidateTopicConfigPolicy(topic$1, ClusterLinkUtils$.MODULE$.updateMirrorProps(topic$1, curProps, curConfig), $this.clientManager.alterConfigPolicy());
                if (properties == null ? curProps != null : !((Object)properties).equals(curProps)) {
                    $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("Updating local configuration for mirror topic '").append(topic$1).append("' on cluster link '").append($this.clientManager.linkData().linkName()).append("'").toString());
                    scala.collection.mutable.Set newTopicConfigs = (scala.collection.mutable.Set)((SetLike)CollectionConverters$.MODULE$.asScalaSetConverter(newProps.entrySet()).asScala()).diff((GenSet)CollectionConverters$.MODULE$.asScalaSetConverter(curProps.entrySet()).asScala());
                    $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(44).append("Adding configs ").append(newTopicConfigs).append(" for topic ").append(topic$1).append(" on target cluster").toString());
                    $this.clientManager.adminZkClient().changeTopicConfig(topic$1, newProps);
                }
                $this.configs().put((Object)topic$1, (Object)new Some((Object)curConfig));
                $this.metrics.topicConfigUpdateSensor().record();
            }
        }
        catch (PolicyViolationException e) {
            $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(88).append("Could not update mirror topic '").append(topic$1).append("' configuration due to policy violation on ").append("cluster link ").append($this.clientManager.linkData().linkName()).append("'").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            return;
        }
        catch (Throwable e) {
            $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(93).append("Error encountered while processing remote configuration for mirror topic '").append(topic$1).append("' ").append("on cluster link ").append($this.clientManager.linkData().linkName()).append("'").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            $this.metrics.topicConfigUpdateFailedSensor().record();
        }
    }

    public static final /* synthetic */ void $anonfun$handleTopicConfigs$1(ClusterLinkSyncTopicsConfigs $this, Tuple2 x0$1) {
        if (x0$1 != null) {
            ConfigResource resource = (ConfigResource)x0$1._1();
            KafkaFuture future = (KafkaFuture)x0$1._2();
            String topic = resource.name();
            $this.configs().get((Object)topic).foreach((Function1 & Serializable & scala.Serializable)oldConfig -> {
                ClusterLinkSyncTopicsConfigs.$anonfun$handleTopicConfigs$2($this, future, topic, oldConfig);
                return BoxedUnit.UNIT;
            });
            return;
        }
        throw new MatchError(null);
    }

    public ClusterLinkSyncTopicsConfigs(ClusterLinkClientManager clientManager, int syncIntervalMs, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncTopicsConfigs", syncIntervalMs);
        this.configs = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
    }
}

