/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import kafka.api.ApiVersion$;
import kafka.cluster.BrokerEndPoint;
import kafka.cluster.Partition;
import kafka.server.AbstractFetcherManager;
import kafka.server.InitialFetchState;
import kafka.server.KafkaConfig;
import kafka.server.ReplicaManager;
import kafka.server.ReplicaQuota;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkDestConnectionManager;
import kafka.server.link.ClusterLinkFactory;
import kafka.server.link.ClusterLinkFactory$;
import kafka.server.link.ClusterLinkFetcherManager$;
import kafka.server.link.ClusterLinkFetcherThread;
import kafka.server.link.ClusterLinkFetcherThread$;
import kafka.server.link.ClusterLinkMetadata;
import kafka.server.link.ClusterLinkMetadataThread;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.FetchState;
import kafka.server.link.MetadataListener;
import kafka.server.link.PartitionAndState;
import kafka.tier.fetcher.TierStateFetcher;
import org.apache.kafka.clients.ClientDnsLookup;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.admin.Admin;
import org.apache.kafka.clients.admin.NewPartitions;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.config.SslConfigs;
import org.apache.kafka.common.errors.InvalidPartitionsException;
import org.apache.kafka.common.message.MetadataResponseData;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.utils.Time;
import scala.Function0;
import scala.Function1;
import scala.Function2;
import scala.MatchError;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.IterableLike;
import scala.collection.Map;
import scala.collection.MapLike;
import scala.collection.Seq;
import scala.collection.Set;
import scala.collection.Set$;
import scala.collection.SetLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\r-b\u0001\u0002\u001f>\u0001\u0011C\u0001b\u0016\u0001\u0003\u0002\u0003\u0006I\u0001\u0017\u0005\tK\u0002\u0011\t\u0011)A\u0005M\"Aa\u000e\u0001B\u0001B\u0003%q\u000e\u0003\u0005s\u0001\t\u0005\t\u0015!\u0003t\u0011!1\bA!A!\u0002\u00139\b\u0002\u0003>\u0001\u0005\u0003\u0005\u000b\u0011B>\t\u0011y\u0004!\u0011!Q\u0001\n}D!\"!\u0007\u0001\u0005\u0003\u0005\u000b\u0011BA\u000e\u0011)\t\t\u0003\u0001B\u0001B\u0003%\u00111\u0005\u0005\u000b\u0003S\u0001!\u0011!Q\u0001\n\u0005-\u0002BCA\u001e\u0001\t\u0005\t\u0015!\u0003\u0002>!Q\u0011Q\t\u0001\u0003\u0002\u0003\u0006I!a\u0012\t\u000f\u0005e\u0003\u0001\"\u0001\u0002\\!I\u0011q\u000f\u0001C\u0002\u0013%\u0011\u0011\u0010\u0005\t\u0003+\u0003\u0001\u0015!\u0003\u0002|!I\u0011q\u0013\u0001C\u0002\u0013%\u0011\u0011\u0014\u0005\t\u0003W\u0003\u0001\u0015!\u0003\u0002\u001c\"I\u0011Q\u0016\u0001C\u0002\u0013%\u0011q\u0016\u0005\t\u0003o\u0003\u0001\u0015!\u0003\u00022\"Y\u0011\u0011\u0018\u0001A\u0002\u0003\u0007I\u0011BA^\u0011-\t\u0019\r\u0001a\u0001\u0002\u0004%I!!2\t\u0017\u0005E\u0007\u00011A\u0001B\u0003&\u0011Q\u0018\u0005\f\u00037\u0004\u0001\u0019!a\u0001\n\u0013\ti\u000eC\u0006\u0002f\u0002\u0001\r\u00111A\u0005\n\u0005\u001d\bbCAv\u0001\u0001\u0007\t\u0011)Q\u0005\u0003?D\u0011\"a<\u0001\u0001\u0004%I!!=\t\u0013\u0005M\b\u00011A\u0005\n\u0005U\bbBA}\u0001\u0001\u0006Ka\u001c\u0005\b\u0003{\u0004A\u0011AA\u0000\u0011!\u0011\t\u0001\u0001C\u0001{\t\r\u0001b\u0002B\u0003\u0001\u0011\u0005\u0011q \u0005\t\u0005\u000f\u0001A\u0011A\u001f\u0002\u0000\"A!\u0011\u0002\u0001\u0005\u0002u\u0012Y\u0001C\u0004\u0003\u001c\u0001!\tE!\b\t\u000f\te\u0002\u0001\"\u0001\u0002\u0000\"9!1\b\u0001\u0005B\tu\u0002b\u0002B%\u0001\u0011%!1\n\u0005\t\u0005G\u0002A\u0011A\u001f\u0003f!A!1\u0011\u0001\u0005\u0002u\u0012)\tC\u0004\u0003\u0010\u0002!I!a@\t\u000f\tE\u0005\u0001\"\u0001\u00020\"9!1\u0013\u0001\u0005\u0002\u0005E\b\u0002\u0003BK\u0001\u0011\u0005Q(a/\t\u0011\t]\u0005\u0001\"\u0001>\u00053CqAa*\u0001\t\u0013\u0011I\u000b\u0003\u0005\u00032\u0002!\t!\u0010BZ\u0011!\u0011\t\u0007\u0001C\u0001{\t}\u0006b\u0002Bd\u0001\u0011%\u0011q \u0005\b\u0005\u0013\u0004A\u0011\u0002Bf\u0011\u001d\u0011I\u000e\u0001C\t\u00057DqA!9\u0001\t\u0003\u0011\u0019\u000fC\u0004\u0003r\u0002!\tAa=\t\u000f\te\b\u0001\"\u0001\u0003|\"9!Q \u0001\u0005\u0002\tmx!\u0003B\u0000{\u0005\u0005\t\u0012AB\u0001\r!aT(!A\t\u0002\r\r\u0001bBA-q\u0011\u000511\u0002\u0005\n\u0007\u001bA\u0014\u0013!C\u0001\u0007\u001fA\u0011b!\n9#\u0003%\taa\n\u00033\rcWo\u001d;fe2Kgn\u001b$fi\u000eDWM]'b]\u0006<WM\u001d\u0006\u0003}}\nA\u0001\\5oW*\u0011\u0001)Q\u0001\u0007g\u0016\u0014h/\u001a:\u000b\u0003\t\u000bQa[1gW\u0006\u001c\u0001a\u0005\u0003\u0001\u000b6#\u0006c\u0001$H\u00136\tq(\u0003\u0002I\u007f\t1\u0012IY:ue\u0006\u001cGOR3uG\",'/T1oC\u001e,'\u000f\u0005\u0002K\u00176\tQ(\u0003\u0002M{\tA2\t\\;ti\u0016\u0014H*\u001b8l\r\u0016$8\r[3s)\"\u0014X-\u00193\u0011\u00059\u000bfB\u0001&P\u0013\t\u0001V(\u0001\nDYV\u001cH/\u001a:MS:\\g)Y2u_JL\u0018B\u0001*T\u000591U\r^2iKJl\u0015M\\1hKJT!\u0001U\u001f\u0011\u0005)+\u0016B\u0001,>\u0005AiU\r^1eCR\fG*[:uK:,'/\u0001\u0005mS:\\g*Y7f!\tI&M\u0004\u0002[AB\u00111LX\u0007\u00029*\u0011QlQ\u0001\u0007yI|w\u000e\u001e \u000b\u0003}\u000bQa]2bY\u0006L!!\u00190\u0002\rA\u0013X\rZ3g\u0013\t\u0019GM\u0001\u0004TiJLgn\u001a\u0006\u0003Cz\u000ba\u0001\\5oW&#\u0007CA4m\u001b\u0005A'BA5k\u0003\u0011)H/\u001b7\u000b\u0003-\fAA[1wC&\u0011Q\u000e\u001b\u0002\u0005+VKE)A\u0007j]&$\u0018.\u00197D_:4\u0017n\u001a\t\u0003\u0015BL!!]\u001f\u0003#\rcWo\u001d;fe2Kgn[\"p]\u001aLw-A\u000beKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0011\u0005)#\u0018BA;>\u0005\u0001\u001aE.^:uKJd\u0015N\\6EKN$8i\u001c8oK\u000e$\u0018n\u001c8NC:\fw-\u001a:\u0002\u0019\t\u0014xn[3s\u0007>tg-[4\u0011\u0005\u0019C\u0018BA=@\u0005-Y\u0015MZ6b\u0007>tg-[4\u0002\u001dI,\u0007\u000f\\5dC6\u000bg.Y4feB\u0011a\t`\u0005\u0003{~\u0012aBU3qY&\u001c\u0017-T1oC\u001e,'/A\beKN$\u0018\tZ7j]\u000ec\u0017.\u001a8u!\u0011\t\t!!\u0006\u000e\u0005\u0005\r!\u0002BA\u0003\u0003\u000f\tQ!\u00193nS:TA!!\u0003\u0002\f\u000591\r\\5f]R\u001c(b\u0001\"\u0002\u000e)!\u0011qBA\t\u0003\u0019\t\u0007/Y2iK*\u0011\u00111C\u0001\u0004_J<\u0017\u0002BA\f\u0003\u0007\u0011Q!\u00113nS:\fQ!];pi\u0006\u00042ARA\u000f\u0013\r\tyb\u0010\u0002\r%\u0016\u0004H.[2b#V|G/Y\u0001\b[\u0016$(/[2t!\rQ\u0015QE\u0005\u0004\u0003Oi$AE\"mkN$XM\u001d'j].lU\r\u001e:jGN\fA\u0001^5nKB!\u0011QFA\u001c\u001b\t\tyC\u0003\u0003\u00022\u0005M\u0012!B;uS2\u001c(\u0002BA\u001b\u0003\u0017\taaY8n[>t\u0017\u0002BA\u001d\u0003_\u0011A\u0001V5nK\u0006\u0001B\u000f\u001b:fC\u0012t\u0015-\\3Qe\u00164\u0017\u000e\u001f\t\u0006\u0003\u007f\t\t\u0005W\u0007\u0002=&\u0019\u00111\t0\u0003\r=\u0003H/[8o\u0003A!\u0018.\u001a:Ti\u0006$XMR3uG\",'\u000f\u0005\u0004\u0002@\u0005\u0005\u0013\u0011\n\t\u0005\u0003\u0017\n)&\u0004\u0002\u0002N)!\u0011qJA)\u0003\u001d1W\r^2iKJT1!a\u0015B\u0003\u0011!\u0018.\u001a:\n\t\u0005]\u0013Q\n\u0002\u0011)&,'o\u0015;bi\u00164U\r^2iKJ\fa\u0001P5oSRtDCGA/\u0003?\n\t'a\u0019\u0002f\u0005\u001d\u0014\u0011NA6\u0003[\ny'!\u001d\u0002t\u0005U\u0004C\u0001&\u0001\u0011\u00159V\u00021\u0001Y\u0011\u0015)W\u00021\u0001g\u0011\u0015qW\u00021\u0001p\u0011\u0015\u0011X\u00021\u0001t\u0011\u00151X\u00021\u0001x\u0011\u0015QX\u00021\u0001|\u0011\u0015qX\u00021\u0001\u0000\u0011\u001d\tI\"\u0004a\u0001\u00037Aq!!\t\u000e\u0001\u0004\t\u0019\u0003C\u0004\u0002*5\u0001\r!a\u000b\t\u0013\u0005mR\u0002%AA\u0002\u0005u\u0002\"CA#\u001bA\u0005\t\u0019AA$\u0003Aa\u0017N\\6fIB\u000b'\u000f^5uS>t7/\u0006\u0002\u0002|AA\u0011QPAB\u0003\u000f\u000by)\u0004\u0002\u0002\u0000)\u0019\u0011\u0011\u00115\u0002\u0015\r|gnY;se\u0016tG/\u0003\u0003\u0002\u0006\u0006}$!E\"p]\u000e,(O]3oi\"\u000b7\u000f['baB!\u0011\u0011RAF\u001b\t\t\u0019$\u0003\u0003\u0002\u000e\u0006M\"A\u0004+pa&\u001c\u0007+\u0019:uSRLwN\u001c\t\u0004\u0015\u0006E\u0015bAAJ{\t\t\u0002+\u0019:uSRLwN\\!oIN#\u0018\r^3\u0002#1Lgn[3e!\u0006\u0014H/\u001b;j_:\u001c\b%\u0001\u000bv]\u0006\u001c8/[4oK\u0012\u0004\u0016M\u001d;ji&|gn]\u000b\u0003\u00037\u0003b!!(\u0002(\u0006\u001dUBAAP\u0015\u0011\t\t+a)\u0002\u000f5,H/\u00192mK*\u0019\u0011Q\u00150\u0002\u0015\r|G\u000e\\3di&|g.\u0003\u0003\u0002*\u0006}%aA*fi\u0006)RO\\1tg&<g.\u001a3QCJ$\u0018\u000e^5p]N\u0004\u0013AK5t)J,hnY1uS>twJ\u001c$fi\u000eD7+\u001e9q_J$X\rZ(o\u0019>\u001c\u0017\r\\\"mkN$XM]\u000b\u0003\u0003c\u0003B!a\u0010\u00024&\u0019\u0011Q\u00170\u0003\u000f\t{w\u000e\\3b]\u0006Y\u0013n\u001d+sk:\u001c\u0017\r^5p]>sg)\u001a;dQN+\b\u000f]8si\u0016$wJ\u001c'pG\u0006d7\t\\;ti\u0016\u0014\b%\u0001\u0005nKR\fG-\u0019;b+\t\ti\fE\u0002K\u0003\u007fK1!!1>\u0005M\u0019E.^:uKJd\u0015N\\6NKR\fG-\u0019;b\u00031iW\r^1eCR\fw\fJ3r)\u0011\t9-!4\u0011\t\u0005}\u0012\u0011Z\u0005\u0004\u0003\u0017t&\u0001B+oSRD\u0011\"a4\u0016\u0003\u0003\u0005\r!!0\u0002\u0007a$\u0013'A\u0005nKR\fG-\u0019;bA!\u001aa#!6\u0011\t\u0005}\u0012q[\u0005\u0004\u00033t&\u0001\u0003<pY\u0006$\u0018\u000e\\3\u0002+5,G/\u00193bi\u0006\u0014VM\u001a:fg\"$\u0006N]3bIV\u0011\u0011q\u001c\t\u0004\u0015\u0006\u0005\u0018bAAr{\tI2\t\\;ti\u0016\u0014H*\u001b8l\u001b\u0016$\u0018\rZ1uCRC'/Z1e\u0003eiW\r^1eCR\f'+\u001a4sKNDG\u000b\u001b:fC\u0012|F%Z9\u0015\t\u0005\u001d\u0017\u0011\u001e\u0005\n\u0003\u001fD\u0012\u0011!a\u0001\u0003?\fa#\\3uC\u0012\fG/\u0019*fMJ,7\u000f\u001b+ie\u0016\fG\r\t\u0015\u00043\u0005U\u0017!E2mkN$XM\u001d'j].\u001cuN\u001c4jOV\tq.A\u000bdYV\u001cH/\u001a:MS:\\7i\u001c8gS\u001e|F%Z9\u0015\t\u0005\u001d\u0017q\u001f\u0005\t\u0003\u001f\\\u0012\u0011!a\u0001_\u0006\u00112\r\\;ti\u0016\u0014H*\u001b8l\u0007>tg-[4!Q\ra\u0012Q[\u0001\bgR\f'\u000f^;q)\t\t9-\u0001\u0005jg\u0006\u001bG/\u001b<f)\t\t\t,\u0001\nj]&$\u0018.\u00197ju\u0016lU\r^1eCR\f\u0017aE:uCJ$X*\u001a;bI\u0006$\u0018\r\u00165sK\u0006$\u0017a\u0003:fG>tg-[4ve\u0016$b!a2\u0003\u000e\tE\u0001B\u0002B\bC\u0001\u0007q.A\u0005oK^\u001cuN\u001c4jO\"9!1C\u0011A\u0002\tU\u0011aC;qI\u0006$X\rZ&fsN\u0004RAa\u0006\u0003\u001aak!!a)\n\t\u0005%\u00161U\u0001\u0014GJ,\u0017\r^3GKR\u001c\u0007.\u001a:UQJ,\u0017\r\u001a\u000b\u0006\u0013\n}!\u0011\u0006\u0005\b\u0005C\u0011\u0003\u0019\u0001B\u0012\u0003%1W\r^2iKJLE\r\u0005\u0003\u0002@\t\u0015\u0012b\u0001B\u0014=\n\u0019\u0011J\u001c;\t\u000f\t-\"\u00051\u0001\u0003.\u0005a1o\\;sG\u0016\u0014%o\\6feB!!q\u0006B\u001b\u001b\t\u0011\tDC\u0002\u00034\u0005\u000bqa\u00197vgR,'/\u0003\u0003\u00038\tE\"A\u0004\"s_.,'/\u00128e!>Lg\u000e^\u0001\tg\",H\u000fZ8x]\u0006iqN\u001c(fo6+G/\u00193bi\u0006$B!a2\u0003@!9!\u0011\t\u0013A\u0002\t\r\u0013A\u00038fo\u000ecWo\u001d;feB!\u0011\u0011\u0012B#\u0013\u0011\u00119%a\r\u0003\u000f\rcWo\u001d;fe\u0006arN\u001c'j].,G\rT3bI\u0016\u0014X\u000b\u001d3bi\u0016\u0014Vm\u001d9p]N,G\u0003\u0002B'\u0005?\"B!a2\u0003P!9!\u0011K\u0013A\u0002\tM\u0013!B3se>\u0014\b\u0003\u0002B+\u00057j!Aa\u0016\u000b\t\te\u00131G\u0001\taJ|Go\\2pY&!!Q\fB,\u0005\u0019)%O]8sg\"9!\u0011M\u0013A\u0002\u0005\u001d\u0015!\u00039beRLG/[8o\u0003u\tG\r\u001a'j].,GMR3uG\",'OR8s!\u0006\u0014H/\u001b;j_:\u001cH\u0003BAd\u0005OBqA!\u001b'\u0001\u0004\u0011Y'\u0001\u0006qCJ$\u0018\u000e^5p]N\u0004bA!\u001c\u0003x\tud\u0002\u0002B8\u0005gr1a\u0017B9\u0013\u0005y\u0016b\u0001B;=\u00069\u0001/Y2lC\u001e,\u0017\u0002\u0002B=\u0005w\u0012\u0001\"\u0013;fe\u0006\u0014G.\u001a\u0006\u0004\u0005kr\u0006\u0003\u0002B\u0018\u0005\u007fJAA!!\u00032\tI\u0001+\u0019:uSRLwN\\\u0001!e\u0016lwN^3MS:\\W\r\u001a$fi\u000eDWM\u001d$peB\u000b'\u000f^5uS>t7\u000f\u0006\u0004\u0002H\n\u001d%1\u0012\u0005\b\u0005S:\u0003\u0019\u0001BE!\u0019\u00119B!\u0007\u0002\b\"9!QR\u0014A\u0002\u0005E\u0016A\u0004:fi\u0006Lg.T3uC\u0012\fG/Y\u0001\u0015kB$\u0017\r^3NKR\fG-\u0019;b)>\u0004\u0018nY:\u0002\u000f%\u001cX)\u001c9us\u0006i1-\u001e:sK:$8i\u001c8gS\u001e\fqbY;se\u0016tG/T3uC\u0012\fG/Y\u0001\u0017_:\u0004\u0016M\u001d;ji&|g\u000eT5oW\u001a\u000b\u0017\u000e\\;sKRA\u0011q\u0019BN\u0005?\u0013\u0019\u000bC\u0004\u0003\u001e2\u0002\r!a\"\u0002\u001dQ|\u0007/[2QCJ$\u0018\u000e^5p]\"9!\u0011\u0015\u0017A\u0002\u0005E\u0016!\u0003:fiJL\u0017M\u00197f\u0011\u0019\u0011)\u000b\fa\u00011\u00061!/Z1t_:\f1d\u001c8MS:\\g)Y5mkJ,W\u000b\u001d3bi\u0016\u0014Vm\u001d9p]N,G\u0003\u0002BV\u0005_#B!a2\u0003.\"9!\u0011K\u0017A\u0002\tM\u0003b\u0002B1[\u0001\u0007\u0011qQ\u0001\u001aG2,\u0017M\u001d)beRLG/[8o\u0019&t7NR1jYV\u0014X\r\u0006\u0004\u0002H\nU&q\u0017\u0005\b\u0005;s\u0003\u0019AAD\u0011!\u0011)K\fCA\u0002\te\u0006#BA \u0005wC\u0016b\u0001B_=\nAAHY=oC6,g\b\u0006\u0003\u0003B\n\r\u0007CBA \u0003\u0003\u0012i\bC\u0004\u0003F>\u0002\r!a\"\u0002\u0005Q\u0004\u0018AF7bs\n,\u0017\t\u001a3MS:\\W\r\u001a$fi\u000eDWM]:\u0002)U\u0004H-\u0019;f!\u0006\u0014H/\u001b;j_:\u001cu.\u001e8u)\u0019\t9M!4\u0003X\"9!qZ\u0019A\u0002\tE\u0017\u0001\u0006;pa&\u001c\u0007+\u0019:uSRLwN\\\"pk:$8\u000fE\u0004\u0003\u0018\tM\u0007La\t\n\t\tU\u00171\u0015\u0002\u0004\u001b\u0006\u0004\bb\u0002B\u001ac\u0001\u0007!1I\u0001\u000fa\u0006\u0014H/\u001b;j_:\u001cu.\u001e8u)\u0011\u0011\u0019C!8\t\r\t}'\u00071\u0001Y\u0003\u0015!x\u000e]5d\u0003e)\b\u000fZ1uKB\u000b'\u000f^5uS>tg)\u001a;dQN#\u0018\r^3\u0015\r\u0005\u001d'Q\u001dBt\u0011\u001d\u0011ij\ra\u0001\u0003\u000fCqA!;4\u0001\u0004\u0011Y/\u0001\u0006gKR\u001c\u0007n\u0015;bi\u0016\u00042A\u0013Bw\u0013\r\u0011y/\u0010\u0002\u000b\r\u0016$8\r[*uCR,\u0017a\u00059beRLG/[8o\r\u0016$8\r[*uCR,G\u0003\u0002B{\u0005o\u0004b!a\u0010\u0002B\t-\bb\u0002BOi\u0001\u0007\u0011qQ\u0001\u0015[&\u0014(o\u001c:QCJ$\u0018\u000e^5p]\u000e{WO\u001c;\u0015\u0005\t\r\u0012A\u00074bS2,G-T5se>\u0014\b+\u0019:uSRLwN\\\"pk:$\u0018!G\"mkN$XM\u001d'j].4U\r^2iKJl\u0015M\\1hKJ\u0004\"A\u0013\u001d\u0014\u0007a\u001a)\u0001\u0005\u0003\u0002@\r\u001d\u0011bAB\u0005=\n1\u0011I\\=SK\u001a$\"a!\u0001\u00029\u0011bWm]:j]&$He\u001a:fCR,'\u000f\n3fM\u0006,H\u000e\u001e\u00132cU\u00111\u0011\u0003\u0016\u0005\u0003{\u0019\u0019b\u000b\u0002\u0004\u0016A!1qCB\u0011\u001b\t\u0019IB\u0003\u0003\u0004\u001c\ru\u0011!C;oG\",7m[3e\u0015\r\u0019yBX\u0001\u000bC:tw\u000e^1uS>t\u0017\u0002BB\u0012\u00073\u0011\u0011#\u001e8dQ\u0016\u001c7.\u001a3WCJL\u0017M\\2f\u0003q!C.Z:tS:LG\u000fJ4sK\u0006$XM\u001d\u0013eK\u001a\fW\u000f\u001c;%cI*\"a!\u000b+\t\u0005\u001d31\u0003")
public class ClusterLinkFetcherManager
extends AbstractFetcherManager<ClusterLinkFetcherThread>
implements ClusterLinkFactory.FetcherManager,
MetadataListener {
    private final String linkName;
    private final UUID linkId;
    private final ClusterLinkDestConnectionManager destConnectionManager;
    private final KafkaConfig brokerConfig;
    private final ReplicaManager replicaManager;
    private final Admin destAdminClient;
    private final ReplicaQuota quota;
    private final ClusterLinkMetrics metrics;
    private final Time time;
    private final Option<String> threadNamePrefix;
    private final Option<TierStateFetcher> tierStateFetcher;
    private final ConcurrentHashMap<TopicPartition, PartitionAndState> linkedPartitions;
    private final scala.collection.mutable.Set<TopicPartition> unassignedPartitions;
    private final boolean isTruncationOnFetchSupportedOnLocalCluster;
    private volatile ClusterLinkMetadata metadata;
    private volatile ClusterLinkMetadataThread metadataRefreshThread;
    private volatile ClusterLinkConfig clusterLinkConfig;

    public static Option<TierStateFetcher> $lessinit$greater$default$12() {
        return ClusterLinkFetcherManager$.MODULE$.$lessinit$greater$default$12();
    }

    public static Option<String> $lessinit$greater$default$11() {
        return ClusterLinkFetcherManager$.MODULE$.$lessinit$greater$default$11();
    }

    private ConcurrentHashMap<TopicPartition, PartitionAndState> linkedPartitions() {
        return this.linkedPartitions;
    }

    private scala.collection.mutable.Set<TopicPartition> unassignedPartitions() {
        return this.unassignedPartitions;
    }

    private boolean isTruncationOnFetchSupportedOnLocalCluster() {
        return this.isTruncationOnFetchSupportedOnLocalCluster;
    }

    private ClusterLinkMetadata metadata() {
        return this.metadata;
    }

    private void metadata_$eq(ClusterLinkMetadata x$1) {
        this.metadata = x$1;
    }

    private ClusterLinkMetadataThread metadataRefreshThread() {
        return this.metadataRefreshThread;
    }

    private void metadataRefreshThread_$eq(ClusterLinkMetadataThread x$1) {
        this.metadataRefreshThread = x$1;
    }

    private ClusterLinkConfig clusterLinkConfig() {
        return this.clusterLinkConfig;
    }

    private void clusterLinkConfig_$eq(ClusterLinkConfig x$1) {
        this.clusterLinkConfig = x$1;
    }

    @Override
    public void startup() {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Starting ClusterLinkFetcherManager for cluster link ").append($this.linkName).toString());
        this.initializeMetadata();
        if (this.isActive()) {
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(58).append("Starting fetcher manager metadata thread for cluster link ").append($this.linkName).toString());
            this.startMetadataThread();
        }
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(66).append("Startup of ClusterLinkFetcherManager for cluster link ").append($this.linkName).append(" is complete").toString());
    }

    public boolean isActive() {
        return !Predef$.MODULE$.Boolean2boolean(this.clusterLinkConfig().clusterLinkPaused());
    }

    public void initializeMetadata() {
        ClusterLinkConfig config = this.clusterLinkConfig();
        this.metadata_$eq(new ClusterLinkMetadata(this.brokerConfig, this.linkName, this.linkId, config.linkMode(), Predef$.MODULE$.Long2long(config.metadataRefreshBackoffMs()), Predef$.MODULE$.Long2long(config.metadataMaxAgeMs())));
        List addresses = ClientUtils.parseAndValidateAddresses(config.bootstrapServers(), (ClientDnsLookup)config.dnsLookup());
        this.metadata().bootstrap(addresses);
    }

    public void startMetadataThread() {
        this.metadataRefreshThread_$eq(new ClusterLinkMetadataThread(this.clusterLinkConfig(), (Option<ClusterLinkDestConnectionManager>)new Some((Object)this.destConnectionManager), this.metadata(), this.metrics.metrics(), this.time));
        this.metadataRefreshThread().addListener(this);
        this.metadataRefreshThread().start();
    }

    @Override
    public void reconfigure(ClusterLinkConfig newConfig, Set<String> updatedKeys) {
        boolean bl;
        boolean bl2;
        Object object = this.lock();
        synchronized (object) {
            boolean oldActive = this.isActive();
            this.clusterLinkConfig_$eq(newConfig);
            boolean newActive = this.isActive();
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(56).append("Reconfiguring link ").append($this.linkName).append(" with new configs updated=").append(updatedKeys).append(" newConfig=").append(newConfig.values()).toString());
            Tuple2.mcZZ.sp sp2 = new Tuple2.mcZZ.sp(oldActive, newActive);
            if (!oldActive && !newActive) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Not reconfiguring fetcher manager since it's paused");
                bl2 = false;
                bl = false;
            } else if (!oldActive && newActive) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unpausing fetcher manager");
                this.fetcherThreadMap().values().foreach((Function1 & Serializable & scala.Serializable)x$1 -> {
                    ClusterLinkFetcherManager.$anonfun$reconfigure$4(this, x$1);
                    return BoxedUnit.UNIT;
                });
                bl2 = true;
                bl = false;
            } else if (oldActive && !newActive) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Pausing fetcher manager");
                bl2 = false;
                bl = true;
            } else if (sp2 != null && oldActive && newActive && updatedKeys.diff(ClusterLinkConfig$.MODULE$.PeriodicMigrationProps()).isEmpty()) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Not reconfiguring fetcher manager since replication configs haven't changed");
                bl2 = false;
                bl = false;
            } else if (sp2 != null && oldActive && newActive && SslConfigs.RECONFIGURABLE_CONFIGS.containsAll((Collection)CollectionConverters$.MODULE$.setAsJavaSetConverter(updatedKeys).asJava())) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(58).append("Reconfiguring cluster link fetchers with updated configs: ").append(updatedKeys).toString());
                java.util.Map newConfigValues = newConfig.values();
                this.metadataRefreshThread().clusterLinkClient().validateReconfiguration(newConfigValues);
                this.metadataRefreshThread().clusterLinkClient().reconfigure(newConfigValues);
                ((IterableLike)this.fetcherThreadMap().values().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.clusterLinkClient(), Iterable$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x$3 -> {
                    x$3.reconfigure(newConfigValues);
                    return BoxedUnit.UNIT;
                });
                bl2 = false;
                bl = false;
            } else if (sp2 != null && oldActive && newActive) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Recreating cluster link fetchers with updated configs: ").append(updatedKeys).toString());
                this.clusterLinkConfig_$eq(newConfig);
                bl2 = true;
                bl = true;
            } else {
                throw new MatchError((Object)sp2);
            }
        }
        if (bl) {
            this.metadataRefreshThread().shutdown();
            Object object2 = this.lock();
            synchronized (object2) {
                if (bl2) {
                    this.fetcherThreadMap().values().foreach((Function1 & Serializable & scala.Serializable)x$5 -> {
                        ClusterLinkFetcherManager.$anonfun$reconfigure$12(this, x$5);
                        return BoxedUnit.UNIT;
                    });
                }
                this.closeAllFetchers();
            }
        }
        if (bl2) {
            this.initializeMetadata();
            this.updateMetadataTopics();
            this.startMetadataThread();
        }
    }

    @Override
    public ClusterLinkFetcherThread createFetcherThread(int fetcherId, BrokerEndPoint sourceBroker) {
        String prefix2 = (String)this.threadNamePrefix.map((Function1 & Serializable & scala.Serializable)prefix -> new StringBuilder(1).append((String)prefix).append(":").toString()).getOrElse((Function0 & Serializable & scala.Serializable)() -> "");
        String threadName = new StringBuilder(27).append(prefix2).append("ClusterLinkFetcherThread-").append(fetcherId).append("-").append(this.linkName).append("-").append(sourceBroker.id()).toString();
        return ClusterLinkFetcherThread$.MODULE$.apply(threadName, fetcherId, this.brokerConfig, this.clusterLinkConfig(), this.metadata(), this, this.destConnectionManager, sourceBroker, this.failedPartitions(), this.replicaManager, this.quota, this.metrics.metrics(), this.time, this.tierStateFetcher);
    }

    @Override
    public void shutdown() {
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Shutting down ClusterLinkFetcherManager for cluster link ").append($this.linkName).toString());
        this.closeAllFetchers();
        if (this.metadataRefreshThread() != null) {
            this.metadataRefreshThread().shutdown();
        }
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(67).append("Shutdown of ClusterLinkFetcherManager for cluster link ").append($this.linkName).append(" is complete").toString());
    }

    @Override
    public void onNewMetadata(Cluster newCluster) {
        scala.collection.mutable.Map linkedEpochChanges = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        scala.collection.mutable.Map failedLinks = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        Object object = this.lock();
        synchronized (object) {
            scala.collection.mutable.Set updatedPartitions = (scala.collection.mutable.Set)scala.collection.mutable.Set$.MODULE$.apply((Seq)Nil$.MODULE$);
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(46).append("onNewMetadata linkedPartitions ").append(this.linkedPartitions().keySet()).append(" unassigned ").append(this.unassignedPartitions()).append(" : ").append(newCluster).toString());
            scala.collection.mutable.Map updatedPartitionCounts = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
            ((MapLike)CollectionConverters$.MODULE$.mapAsScalaConcurrentMapConverter(this.linkedPartitions()).asScala()).keys().iterator().filter((Function1 & Serializable & scala.Serializable)x$6 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$onNewMetadata$2(x$6))).foreach((Function1 & Serializable & scala.Serializable)tp -> {
                Integer sourcePartitionCount;
                String topic;
                block7: {
                    topic = tp.topic();
                    try {
                        this.metadata().maybeThrowExceptionForTopic(topic);
                    }
                    catch (Exception e) {
                        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(19).append("Metadata error for ").append(topic).toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                        if (!ClusterLinkFetcherThread$.MODULE$.LinkErrors().contains((Object)Errors.forException((Throwable)e))) break block7;
                        failedLinks.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)e.getMessage()));
                    }
                }
                if (!newCluster.topics().contains(topic)) {
                    failedLinks.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(tp), (Object)new StringBuilder(30).append("Topic ").append(topic).append(" not present in metadata").toString()));
                }
                if ((sourcePartitionCount = newCluster.partitionCountForTopic(topic)) != null) {
                    int destPartitionCount = this.partitionCount(topic);
                    if (destPartitionCount == 0) {
                        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(52).append("Partitions for linked destination topic ").append(topic).append(" are unknown").toString());
                        return BoxedUnit.UNIT;
                    }
                    if (destPartitionCount < Predef$.MODULE$.Integer2int(sourcePartitionCount)) {
                        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("Increasing partitions for linked topic ").append(topic).append(" from ").append(destPartitionCount).append(" to ").append(sourcePartitionCount).toString());
                        updatedPartitionCounts.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)topic), (Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(sourcePartitionCount))));
                        $this.metrics.linkedTopicPartitionAdditionSensor().record();
                        return BoxedUnit.UNIT;
                    }
                    if (destPartitionCount > Predef$.MODULE$.Integer2int(sourcePartitionCount)) {
                        String reason = new StringBuilder(64).append("Topic ").append(topic).append(" has ").append(destPartitionCount).append(" destination partitions, but only ").append(sourcePartitionCount).append(" source partitions.").toString();
                        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(103).append(reason).append(" This may be a transient issue or it could indicate that the source partition was").append(" deleted and recreated").toString());
                        return failedLinks.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)new TopicPartition(topic, 0)), (Object)reason));
                    }
                    return BoxedUnit.UNIT;
                }
                return BoxedUnit.UNIT;
            });
            if (updatedPartitionCounts.nonEmpty()) {
                this.updatePartitionCount((Map<String, Object>)updatedPartitionCounts, newCluster);
            }
            ((IterableLike)CollectionConverters$.MODULE$.mapAsScalaConcurrentMapConverter(this.linkedPartitions()).asScala()).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                BoxedUnit boxedUnit;
                if (x0$1 != null) {
                    TopicPartition tp = (TopicPartition)x0$1._1();
                    PartitionAndState partitionAndState = (PartitionAndState)x0$1._2();
                    Partition partition = partitionAndState.partition();
                    Metadata.LeaderAndEpoch oldLeaderAndEpoch = partitionAndState.sourceLeaderAndEpoch();
                    Metadata.LeaderAndEpoch newLeaderAndEpoch = this.metadata().currentLeader(tp);
                    Metadata.LeaderAndEpoch leaderAndEpoch = oldLeaderAndEpoch;
                    Metadata.LeaderAndEpoch leaderAndEpoch2 = Metadata.LeaderAndEpoch.noLeaderOrEpoch();
                    if (leaderAndEpoch == null ? leaderAndEpoch2 != null : !leaderAndEpoch.equals(leaderAndEpoch2)) {
                        Metadata.LeaderAndEpoch leaderAndEpoch3 = oldLeaderAndEpoch;
                        if (leaderAndEpoch3 == null ? newLeaderAndEpoch != null : !leaderAndEpoch3.equals(newLeaderAndEpoch)) {
                            updatedPartitions.$plus$eq((Object)tp);
                        }
                    }
                    Integer newEpoch = newLeaderAndEpoch.epoch.orElse(Predef$.MODULE$.int2Integer(-1));
                    int oldEpoch = BoxesRunTime.unboxToInt((Object)partition.getLinkedLeaderEpoch().getOrElse((Function0)(JFunction0.mcI.sp & Serializable & scala.Serializable)() -> -1));
                    if (Predef$.MODULE$.Integer2int(newEpoch) >= 0 && oldEpoch < Predef$.MODULE$.Integer2int(newEpoch)) {
                        partition.linkedLeaderOffsetsPending(!this.isTruncationOnFetchSupportedOnLocalCluster());
                        updatedPartitions.$plus$eq((Object)tp);
                        linkedEpochChanges.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)partition), (Object)BoxesRunTime.boxToInteger((int)Predef$.MODULE$.Integer2int(newEpoch))));
                        $this.metrics.linkedLeaderEpochChangeSensor().record();
                    }
                    if (!failedLinks.contains((Object)tp) && newLeaderAndEpoch.leader.isPresent() && Predef$.MODULE$.Integer2int(newEpoch) >= 0) {
                        if (oldEpoch > Predef$.MODULE$.Integer2int(newEpoch)) {
                            boxedUnit = failedLinks.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)new StringBuilder(41).append("Source epoch has gone backwards from ").append(oldEpoch).append(" to ").append(newEpoch).toString()));
                        } else if (Predef$.MODULE$.Integer2int(newEpoch) >= oldEpoch && partitionAndState.clearLinkFailure()) {
                            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(79).append("Clearing link failure for partition ").append(tp).append(" since newEpoch=").append(newEpoch).append(" is not less than oldEpoch=").append(oldEpoch).toString());
                            boxedUnit = BoxedUnit.UNIT;
                        } else {
                            boxedUnit = BoxedUnit.UNIT;
                        }
                    } else {
                        boxedUnit = BoxedUnit.UNIT;
                    }
                } else {
                    throw new MatchError(null);
                }
                BoxedUnit boxedUnit2 = boxedUnit;
                return boxedUnit2;
            });
            scala.collection.mutable.Set partitionsToReassign = (scala.collection.mutable.Set)updatedPartitions.diff(this.unassignedPartitions());
            if (partitionsToReassign.nonEmpty()) {
                this.removeFetcherForPartitions((Set<TopicPartition>)partitionsToReassign);
                this.unassignedPartitions().$plus$plus$eq((TraversableOnce)partitionsToReassign);
            }
            this.maybeAddLinkedFetchers();
        }
        linkedEpochChanges.foreach((Function1 & Serializable & scala.Serializable)x0$2 -> {
            ClusterLinkFetcherManager.$anonfun$onNewMetadata$12(this, x0$2);
            return BoxedUnit.UNIT;
        });
        failedLinks.foreach((Function1 & Serializable & scala.Serializable)x0$3 -> {
            ClusterLinkFetcherManager.$anonfun$onNewMetadata$14(this, x0$3);
            return BoxedUnit.UNIT;
        });
        if (failedLinks.nonEmpty()) {
            this.metadata().requestUpdate();
        }
    }

    private void onLinkedLeaderUpdateResponse(TopicPartition partition, Errors error) {
        boolean bl = Errors.OPERATION_NOT_ATTEMPTED.equals(error) ? true : (Errors.FENCED_LEADER_EPOCH.equals(error) ? true : Errors.INVALID_UPDATE_VERSION.equals(error));
        if (bl) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(73).append("We did not update cluster link state for ").append(partition).append(" since new metadata is available").toString());
            this.metadata().requestUpdate();
            return;
        }
        if (Errors.NONE.equals(error)) {
            this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(36).append("Linked leader update successful for ").append(partition).toString());
            return;
        }
        this.onPartitionLinkFailure(partition, false, new StringBuilder(51).append("Unexpected error in update of linked leader epoch: ").append(error).toString());
        this.metadata().requestUpdate();
    }

    @Override
    public void addLinkedFetcherForPartitions(Iterable<Partition> partitions) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(30).append("addLinkedFetcherForPartitions ").append(partitions).toString());
        Object object = this.lock();
        synchronized (object) {
            partitions.foreach((Function1 & Serializable & scala.Serializable)partition -> {
                ClusterLinkFetcherManager.$anonfun$addLinkedFetcherForPartitions$2(this, partition);
                return BoxedUnit.UNIT;
            });
            this.updateMetadataTopics();
            if (this.isActive()) {
                this.maybeAddLinkedFetchers();
            }
            return;
        }
    }

    @Override
    public void removeLinkedFetcherForPartitions(Set<TopicPartition> partitions, boolean retainMetadata) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("removeLinkedFetcherForPartitions ").append(partitions).append(" retainMetadata=").append(retainMetadata).toString());
        Object object = this.lock();
        synchronized (object) {
            this.removeFetcherForPartitions(partitions);
            if (!retainMetadata) {
                partitions.foreach((Function1 & Serializable & scala.Serializable)tp -> {
                    this.unassignedPartitions().remove(tp);
                    return this.linkedPartitions().remove(tp);
                });
            }
            this.updateMetadataTopics();
            if (retainMetadata) {
                this.metadata().requestUpdate();
            }
            return;
        }
    }

    private void updateMetadataTopics() {
        this.metadata().setTopics((scala.collection.immutable.Set<String>)((TraversableOnce)((SetLike)CollectionConverters$.MODULE$.asScalaSetConverter(this.linkedPartitions().keySet()).asScala()).map((Function1 & Serializable & scala.Serializable)x$7 -> x$7.topic(), scala.collection.mutable.Set$.MODULE$.canBuildFrom())).toSet());
    }

    public boolean isEmpty() {
        Object object = this.lock();
        synchronized (object) {
            boolean bl = this.linkedPartitions().isEmpty();
            return bl;
        }
    }

    @Override
    public ClusterLinkConfig currentConfig() {
        return this.clusterLinkConfig();
    }

    public ClusterLinkMetadata currentMetadata() {
        return this.metadata();
    }

    public void onPartitionLinkFailure(TopicPartition topicPartition, boolean retriable, String reason) {
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(42).append("onPartitionLinkFailure ").append(topicPartition).append(" retriable=").append(retriable).append(" reason=").append(reason).toString());
        PartitionAndState partitionAndState = this.linkedPartitions().get(topicPartition);
        if (partitionAndState != null && partitionAndState.partition().isActiveLinkDestinationLeader()) {
            int retryTimeoutMs = retriable ? this.clusterLinkConfig().retryTimeoutMs() : 0;
            long retryRemainingMs = partitionAndState.onLinkFailure(this.time.milliseconds(), retryTimeoutMs);
            if (retryRemainingMs <= 0L) {
                this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(60).append("Mirroring of topic ").append(topicPartition.topic()).append(" stopped due to failure of partition ").append(topicPartition).append(" : ").append(reason).append(".").toString());
                partitionAndState.partition().failClusterLink((Function1<Errors, BoxedUnit>)(Function1 & Serializable & scala.Serializable)error -> {
                    this.onLinkFailureUpdateResponse(topicPartition, error);
                    return BoxedUnit.UNIT;
                });
                return;
            }
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(49).append("Cluster link failed due to: ").append(reason).append(", will retry for ").append(retryRemainingMs).append(" ms.").toString());
            return;
        }
        this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("Ignoring partition link failure since ").append(topicPartition).append(" is not an active link destination any more").toString());
    }

    private void onLinkFailureUpdateResponse(TopicPartition partition, Errors error) {
        boolean bl = Errors.OPERATION_NOT_ATTEMPTED.equals(error) ? true : (Errors.FENCED_LEADER_EPOCH.equals(error) ? true : Errors.INVALID_UPDATE_VERSION.equals(error));
        if (bl) {
            this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(121).append("We did not update cluster link failed state for ").append(partition).append(" since new metadata is available. Update will be retried on next failure.").toString());
            return;
        }
        if (Errors.NONE.equals(error)) {
            this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(35).append("Link failed state was updated for ").append(partition).append(".").toString());
            return;
        }
        if (this.logger().underlying().isErrorEnabled()) {
            this.logger().underlying().error(this.msgWithLogIdent(ClusterLinkFetcherManager.$anonfun$onLinkFailureUpdateResponse$3(partition, error)));
            return;
        }
    }

    public void clearPartitionLinkFailure(TopicPartition topicPartition, Function0<String> reason) {
        PartitionAndState partitionAndState = this.linkedPartitions().get(topicPartition);
        if (partitionAndState != null && partitionAndState.clearLinkFailure()) {
            this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(53).append("Clearing cluster link failure for partition ").append(topicPartition).append(" due to: ").append(reason.apply()).toString());
        }
    }

    public Option<Partition> partition(TopicPartition tp) {
        return Option$.MODULE$.apply((Object)this.linkedPartitions().get(tp)).map((Function1 & Serializable & scala.Serializable)x$8 -> x$8.partition());
    }

    private void maybeAddLinkedFetchers() {
        Object object = this.lock();
        synchronized (object) {
            scala.collection.mutable.Map assignablePartitions = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
            this.unassignedPartitions().foreach((Function1 & Serializable & scala.Serializable)tp -> {
                ClusterLinkFetcherManager.$anonfun$maybeAddLinkedFetchers$1(this, assignablePartitions, tp);
                return BoxedUnit.UNIT;
            });
            this.addFetcherForPartitions((Map<TopicPartition, InitialFetchState>)assignablePartitions);
            assignablePartitions.keySet().foreach((Function1 & Serializable & scala.Serializable)elem -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$maybeAddLinkedFetchers$3(this, elem)));
            if (this.unassignedPartitions().nonEmpty() || ((IterableLike)CollectionConverters$.MODULE$.asScalaSetConverter(this.linkedPartitions().keySet()).asScala()).exists((Function1 & Serializable & scala.Serializable)topicPartition -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$maybeAddLinkedFetchers$4(this, topicPartition)))) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(47).append("Request metadata due to unassigned partitions: ").append(this.unassignedPartitions()).toString());
                this.metadata().requestUpdate();
            }
            return;
        }
    }

    private void updatePartitionCount(Map<String, Object> topicPartitionCounts, Cluster cluster) {
        java.util.Map newPartitions = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)topicPartitionCounts.map((Function1 & Serializable & scala.Serializable)x0$1 -> {
            if (x0$1 == null) {
                throw new MatchError(null);
            }
            String k = (String)x0$1._1();
            int v = x0$1._2$mcI$sp();
            Tuple2 tuple2 = Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)k), (Object)NewPartitions.increaseTo((int)v));
            return tuple2;
        }, scala.collection.Map$.MODULE$.canBuildFrom())).asJava();
        this.destAdminClient.createPartitions(newPartitions).values().forEach((topic, future) -> future.whenComplete((x$9, e) -> {
            if (e == null) {
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Updated destination topic partition count for ").append((String)topic).append(" to ").append(topicPartitionCounts.apply(topic)).toString());
                return;
            }
            if (e instanceof InvalidPartitionsException) {
                Throwable throwable = e;
                String string = new StringBuilder(59).append("Could not update destination topic partition count for ").append((String)topic).append(" to ").append(topicPartitionCounts.apply(topic)).toString();
                this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> x$10, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> x$11);
                return;
            }
            Throwable throwable = e;
            String string = new StringBuilder(59).append("Could not update destination topic partition count for ").append((String)topic).append(" to ").append(topicPartitionCounts.apply(topic)).toString();
            this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> x$12, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> x$13);
        }));
    }

    public int partitionCount(String topic) {
        Seq<MetadataResponseData.MetadataResponseTopic> topicMetadata = this.replicaManager.metadataCache().getTopicMetadata((Set<String>)((Set)Set$.MODULE$.apply((Seq)Predef$.MODULE$.wrapRefArray((Object[])new String[]{topic}))), this.brokerConfig.interBrokerListenerName(), this.replicaManager.metadataCache().getTopicMetadata$default$3(), this.replicaManager.metadataCache().getTopicMetadata$default$4());
        if (topicMetadata.isEmpty()) {
            return 0;
        }
        return ((MetadataResponseData.MetadataResponseTopic)topicMetadata.head()).partitions().size();
    }

    public void updatePartitionFetchState(TopicPartition topicPartition, FetchState fetchState) {
        Option$.MODULE$.apply((Object)this.linkedPartitions().get(topicPartition)).foreach((Function1 & Serializable & scala.Serializable)x$14 -> {
            x$14.fetchState_$eq((Option<FetchState>)new Some((Object)fetchState));
            return BoxedUnit.UNIT;
        });
    }

    @Override
    public Option<FetchState> partitionFetchState(TopicPartition topicPartition) {
        return Option$.MODULE$.apply((Object)this.linkedPartitions().get(topicPartition)).flatMap((Function1 & Serializable & scala.Serializable)x$15 -> x$15.fetchState());
    }

    public int mirrorPartitionCount() {
        return ((TraversableOnce)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.linkedPartitions().values()).asScala()).count((Function1 & Serializable & scala.Serializable)x$16 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$mirrorPartitionCount$1(x$16)));
    }

    public int failedMirrorPartitionCount() {
        return ((TraversableOnce)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(this.linkedPartitions().values()).asScala()).count((Function1 & Serializable & scala.Serializable)x$17 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$failedMirrorPartitionCount$1(x$17)));
    }

    public static final /* synthetic */ boolean $anonfun$reconfigure$5(ClusterLinkFetcherManager $this, TopicPartition elem) {
        return $this.unassignedPartitions().add((Object)elem);
    }

    public static final /* synthetic */ void $anonfun$reconfigure$4(ClusterLinkFetcherManager $this, ClusterLinkFetcherThread x$1) {
        x$1.partitionsAndOffsets().keySet().foreach((Function1 & Serializable & scala.Serializable)elem -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$reconfigure$5($this, elem)));
    }

    public static final /* synthetic */ boolean $anonfun$reconfigure$13(ClusterLinkFetcherManager $this, TopicPartition elem) {
        return $this.unassignedPartitions().add((Object)elem);
    }

    public static final /* synthetic */ void $anonfun$reconfigure$12(ClusterLinkFetcherManager $this, ClusterLinkFetcherThread x$5) {
        x$5.partitionsAndOffsets().keySet().foreach((Function1 & Serializable & scala.Serializable)elem -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkFetcherManager.$anonfun$reconfigure$13($this, elem)));
    }

    public static final /* synthetic */ boolean $anonfun$onNewMetadata$2(TopicPartition x$6) {
        return x$6.partition() == 0;
    }

    public static final /* synthetic */ void $anonfun$onNewMetadata$12(ClusterLinkFetcherManager $this, Tuple2 x0$2) {
        if (x0$2 != null) {
            Partition partition = (Partition)x0$2._1();
            int newEpoch = x0$2._2$mcI$sp();
            partition.updateLinkedLeaderEpoch(newEpoch, (Function1<Errors, BoxedUnit>)(Function1 & Serializable & scala.Serializable)error -> {
                $this.onLinkedLeaderUpdateResponse(partition.topicPartition(), error);
                return BoxedUnit.UNIT;
            });
            return;
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ void $anonfun$onNewMetadata$14(ClusterLinkFetcherManager $this, Tuple2 x0$3) {
        if (x0$3 != null) {
            TopicPartition tp = (TopicPartition)x0$3._1();
            String reason = (String)x0$3._2();
            $this.onPartitionLinkFailure(tp, true, reason);
            return;
        }
        throw new MatchError(null);
    }

    public static final /* synthetic */ void $anonfun$addLinkedFetcherForPartitions$2(ClusterLinkFetcherManager $this, Partition partition) {
        $this.linkedPartitions().put(partition.topicPartition(), new PartitionAndState(partition));
        $this.unassignedPartitions().$plus$eq((Object)partition.topicPartition());
        if (!$this.isActive() || $this.isTruncationOnFetchSupportedOnLocalCluster()) {
            partition.linkedLeaderOffsetsPending(false);
        }
    }

    public static final /* synthetic */ String $anonfun$onLinkFailureUpdateResponse$3(TopicPartition partition$3, Errors error$1) {
        return new StringBuilder(76).append("Failed to update failed state for partition ").append(partition$3).append(" : ").append(error$1).append(", will retry on next failure.").toString();
    }

    public static final /* synthetic */ void $anonfun$maybeAddLinkedFetchers$1(ClusterLinkFetcherManager $this, scala.collection.mutable.Map assignablePartitions$1, TopicPartition tp) {
        PartitionAndState partitionAndState = $this.linkedPartitions().get(tp);
        if (partitionAndState == null) {
            throw new IllegalStateException(new StringBuilder(27).append("Linked partition not found ").append(tp).toString());
        }
        Partition partition = partitionAndState.partition();
        Metadata.LeaderAndEpoch leaderAndEpoch = $this.metadata().currentLeader(tp);
        if (leaderAndEpoch.leader.isPresent() && leaderAndEpoch.epoch.isPresent()) {
            Integer sourceEpoch = (Integer)leaderAndEpoch.epoch.get();
            if (partition.getLeaderEpoch() >= Predef$.MODULE$.Integer2int(sourceEpoch)) {
                Node leader = (Node)leaderAndEpoch.leader.get();
                InitialFetchState initialFetchState = new InitialFetchState(new BrokerEndPoint(leader.id(), leader.host(), leader.port()), Predef$.MODULE$.Integer2int(sourceEpoch), partition.localLogOrException().localLogEndOffset());
                $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(50).append("Adding fetcher for linked partition ").append(tp).append(" ").append(initialFetchState).append(", localEpoch=").append(partition.getLeaderEpoch()).toString());
                assignablePartitions$1.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)tp), (Object)initialFetchState));
                partitionAndState.sourceLeaderAndEpoch_$eq(leaderAndEpoch);
                partition.truncateTo(initialFetchState.initOffset(), false);
                return;
            }
        }
    }

    public static final /* synthetic */ boolean $anonfun$maybeAddLinkedFetchers$3(ClusterLinkFetcherManager $this, TopicPartition elem) {
        return $this.unassignedPartitions().remove((Object)elem);
    }

    public static final /* synthetic */ boolean $anonfun$maybeAddLinkedFetchers$4(ClusterLinkFetcherManager $this, TopicPartition topicPartition) {
        return $this.failedPartitions().contains(topicPartition);
    }

    private static final void printException$1(Function2 output, String topic$2, Map topicPartitionCounts$1, Throwable e$2) {
        output.apply((Object)new StringBuilder(59).append("Could not update destination topic partition count for ").append(topic$2).append(" to ").append(topicPartitionCounts$1.apply((Object)topic$2)).toString(), (Object)e$2);
    }

    public static final /* synthetic */ boolean $anonfun$mirrorPartitionCount$1(PartitionAndState x$16) {
        return x$16.failureStartMs().get() == 0L;
    }

    public static final /* synthetic */ boolean $anonfun$failedMirrorPartitionCount$1(PartitionAndState x$17) {
        return x$17.failureStartMs().get() != 0L;
    }

    public ClusterLinkFetcherManager(String linkName, UUID linkId, ClusterLinkConfig initialConfig, ClusterLinkDestConnectionManager destConnectionManager, KafkaConfig brokerConfig, ReplicaManager replicaManager, Admin destAdminClient, ReplicaQuota quota, ClusterLinkMetrics metrics, Time time, Option<String> threadNamePrefix, Option<TierStateFetcher> tierStateFetcher) {
        this.linkName = linkName;
        this.linkId = linkId;
        this.destConnectionManager = destConnectionManager;
        this.brokerConfig = brokerConfig;
        this.replicaManager = replicaManager;
        this.destAdminClient = destAdminClient;
        this.quota = quota;
        this.metrics = metrics;
        this.time = time;
        this.threadNamePrefix = threadNamePrefix;
        this.tierStateFetcher = tierStateFetcher;
        super(new StringBuilder(41).append("ClusterLinkFetcherManager on broker ").append(brokerConfig.brokerId()).append(" for ").append(linkName).toString(), "ClusterLink", Predef$.MODULE$.Integer2int(initialConfig.numClusterLinkFetchers()), ClusterLinkFactory$.MODULE$.linkMetricTags(linkName));
        this.linkedPartitions = new ConcurrentHashMap();
        this.unassignedPartitions = (scala.collection.mutable.Set)scala.collection.mutable.Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        this.isTruncationOnFetchSupportedOnLocalCluster = ApiVersion$.MODULE$.isTruncationOnFetchSupported(brokerConfig.interBrokerProtocolVersion());
        this.clusterLinkConfig = initialConfig;
    }

    public static final /* synthetic */ Object $anonfun$updatePartitionCount$5$adapted(ClusterLinkFetcherManager $this, String x$10, Throwable x$11) {
        $this.debug((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> x$10, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> x$11);
        return BoxedUnit.UNIT;
    }

    public static final /* synthetic */ Object $anonfun$updatePartitionCount$8$adapted(ClusterLinkFetcherManager $this, String x$12, Throwable x$13) {
        $this.error((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> x$12, (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> x$13);
        return BoxedUnit.UNIT;
    }
}

