/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.concurrent.ExecutionException;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkConfig;
import kafka.server.link.ClusterLinkConfig$;
import kafka.server.link.ClusterLinkFilterInfo;
import kafka.server.link.ClusterLinkMetadataManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import kafka.server.link.ClusterLinkSyncOffsets$;
import kafka.server.link.ClusterLinkUtils$;
import kafka.zk.ClusterLinkData;
import org.apache.kafka.clients.admin.AlterConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ConfluentAdmin;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsOptions;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsResult;
import org.apache.kafka.clients.admin.ListConsumerGroupOffsetsSpec;
import org.apache.kafka.clients.admin.ListConsumerGroupsResult;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.GroupAuthorizationException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Option;
import scala.Option$;
import scala.Predef;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenTraversableOnce;
import scala.collection.Iterable;
import scala.collection.Iterable$;
import scala.collection.Map;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.Nil$;
import scala.collection.immutable.Set;
import scala.collection.immutable.Set$;
import scala.collection.mutable.Map$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\t%r!\u0002\u0015*\u0011\u0003\u0001d!\u0002\u001a*\u0011\u0003\u0019\u0004\"\u0002\u001e\u0002\t\u0003Y\u0004b\u0002\u001f\u0002\u0005\u0004%\t!\u0010\u0005\u0007\u0003\u0006\u0001\u000b\u0011\u0002 \t\u000f\t\u000b!\u0019!C\u0001\u0007\"1\u0011+\u0001Q\u0001\n\u0011CqAU\u0001C\u0002\u0013\u00051\u000b\u0003\u0004X\u0003\u0001\u0006I\u0001V\u0003\u00051\u0006\u0001\u0011L\u0002\u00033S\u0001i\bBCA\u0006\u0015\t\u0015\r\u0011\"\u0001\u0002\u000e!Q\u0011Q\u0003\u0006\u0003\u0002\u0003\u0006I!a\u0004\t\u0015\u0005]!B!b\u0001\n\u0003\tI\u0002\u0003\u0006\u0002\")\u0011\t\u0011)A\u0005\u00037A!\"a\t\u000b\u0005\u0003\u0005\u000b\u0011BA\u0013\u0011)\t\tD\u0003BC\u0002\u0013\u0005\u00111\u0007\u0005\u000b\u0003\u0003R!\u0011!Q\u0001\n\u0005U\u0002BCA\"\u0015\t\u0005\t\u0015!\u0003\u0002F!1!H\u0003C\u0001\u0003\u0017B\u0011\"!\u0017\u000b\u0001\u0004%I!a\u0017\t\u0013\u0005\r$\u00021A\u0005\n\u0005\u0015\u0004\u0002CA9\u0015\u0001\u0006K!!\u0018\t\u0011\u0005M$\u00021A\u0005\nuB\u0011\"!\u001e\u000b\u0001\u0004%I!a\u001e\t\u000f\u0005m$\u0002)Q\u0005}!9\u0011Q\u0010\u0006\u0005R\u0005}\u0004bBAD\u0015\u0011%\u0011\u0011\u0012\u0005\b\u0003+SA\u0011BAL\u0011\u001d\tYK\u0003C\u0005\u0003[Cq!!-\u000b\t\u0013\t\u0019\fC\u0004\u0002>*!I!a0\t\u000f\u0005%'\u0002\"\u0003\u0002L\"9\u0011\u0011\u001c\u0006\u0005\n\u0005m\u0007bBAr\u0015\u0011%\u0011Q\u001d\u0005\b\u0003WTA\u0011BAw\u0011\u001d\t9P\u0003C\u0005\u0003sDqAa\u0005\u000b\t\u0013\u0011)\u0002C\u0004\u0003\u0018)!IA!\u0006\t\u000f\u0005E&\u0002\"\u0005\u0003\u001a\u000512\t\\;ti\u0016\u0014H*\u001b8l'ft7m\u00144gg\u0016$8O\u0003\u0002+W\u0005!A.\u001b8l\u0015\taS&\u0001\u0004tKJ4XM\u001d\u0006\u0002]\u0005)1.\u00194lC\u000e\u0001\u0001CA\u0019\u0002\u001b\u0005I#AF\"mkN$XM\u001d'j].\u001c\u0016P\\2PM\u001a\u001cX\r^:\u0014\u0005\u0005!\u0004CA\u001b9\u001b\u00051$\"A\u001c\u0002\u000bM\u001c\u0017\r\\1\n\u0005e2$AB!osJ+g-\u0001\u0004=S:LGO\u0010\u000b\u0002a\u0005\u0001C*[:u\u0007>t7/^7fe\u001e\u0013x.\u001e9PM\u001a\u001cX\r\u001e\"bi\u000eD7+\u001b>f+\u0005q\u0004CA\u001b@\u0013\t\u0001eGA\u0002J]R\f\u0011\u0005T5ti\u000e{gn];nKJ<%o\\;q\u001f\u001a47/\u001a;CCR\u001c\u0007nU5{K\u0002\n1\u0004T5ti>3gm]3ug\u001a{'/\u00117m!\u0006\u0014H/\u001b;j_:\u001cX#\u0001#\u0011\u0005\u0015{U\"\u0001$\u000b\u0005\u001dC\u0015!B1e[&t'BA%K\u0003\u001d\u0019G.[3oiNT!AL&\u000b\u00051k\u0015AB1qC\u000eDWMC\u0001O\u0003\ry'oZ\u0005\u0003!\u001a\u0013A\u0004T5ti\u000e{gn];nKJ<%o\\;q\u001f\u001a47/\u001a;t'B,7-\u0001\u000fMSN$xJ\u001a4tKR\u001chi\u001c:BY2\u0004\u0016M\u001d;ji&|gn\u001d\u0011\u0002%1K7\u000f^(gMN,Go](qi&|gn]\u000b\u0002)B\u0011Q)V\u0005\u0003-\u001a\u0013q\u0004T5ti\u000e{gn];nKJ<%o\\;q\u001f\u001a47/\u001a;t\u001fB$\u0018n\u001c8t\u0003Ma\u0015n\u001d;PM\u001a\u001cX\r^:PaRLwN\\:!\u00055yeMZ:fi\u001a+H/\u001e:fgB!!,\u00193h\u001d\tYv\f\u0005\u0002]m5\tQL\u0003\u0002__\u00051AH]8pizJ!\u0001\u0019\u001c\u0002\rA\u0013X\rZ3g\u0013\t\u00117MA\u0002NCBT!\u0001\u0019\u001c\u0011\u0005i+\u0017B\u00014d\u0005\u0019\u0019FO]5oOB\u0019\u0001n[7\u000e\u0003%T!A\u001b&\u0002\r\r|W.\\8o\u0013\ta\u0017NA\u0006LC\u001a\\\u0017MR;ukJ,\u0007\u0003\u00028ti^l\u0011a\u001c\u0006\u0003aF\fA!\u001e;jY*\t!/\u0001\u0003kCZ\f\u0017B\u00012p!\tAW/\u0003\u0002wS\nqAk\u001c9jGB\u000b'\u000f^5uS>t\u0007C\u0001=|\u001b\u0005I(B\u0001>I\u0003!\u0019wN\\:v[\u0016\u0014\u0018B\u0001?z\u0005EyeMZ:fi\u0006sG-T3uC\u0012\fG/Y\n\u0003\u0015y\u00042a`A\u0003\u001d\r\t\u0014\u0011A\u0005\u0004\u0003\u0007I\u0013\u0001F\"mkN$XM\u001d'j].\u001c6\r[3ek2,'/\u0003\u0003\u0002\b\u0005%!\u0001\u0004)fe&|G-[2UCN\\'bAA\u0002S\u0005i1\r\\5f]Rl\u0015M\\1hKJ,\"!a\u0004\u0011\u0007E\n\t\"C\u0002\u0002\u0014%\u0012\u0001d\u00117vgR,'\u000fT5oW\u000ec\u0017.\u001a8u\u001b\u0006t\u0017mZ3s\u00039\u0019G.[3oi6\u000bg.Y4fe\u0002\nq\"\\3uC\u0012\fG/Y'b]\u0006<WM]\u000b\u0003\u00037\u00012!MA\u000f\u0013\r\ty\"\u000b\u0002\u001b\u00072,8\u000f^3s\u0019&t7.T3uC\u0012\fG/Y'b]\u0006<WM]\u0001\u0011[\u0016$\u0018\rZ1uC6\u000bg.Y4fe\u0002\n\u0001\u0002\\5oW\u0012\u000bG/\u0019\t\u0005\u0003O\ti#\u0004\u0002\u0002*)\u0019\u00111F\u0017\u0002\u0005i\\\u0017\u0002BA\u0018\u0003S\u0011qb\u00117vgR,'\u000fT5oW\u0012\u000bG/Y\u0001\u0011I\u0016\u001cH/\u00113nS:4\u0015m\u0019;pef,\"!!\u000e\u0011\u000bU\n9$a\u000f\n\u0007\u0005ebGA\u0005Gk:\u001cG/[8oaA\u0019Q)!\u0010\n\u0007\u0005}bI\u0001\bD_:4G.^3oi\u0006#W.\u001b8\u0002#\u0011,7\u000f^!e[&tg)Y2u_JL\b%A\u0004nKR\u0014\u0018nY:\u0011\u0007E\n9%C\u0002\u0002J%\u0012!c\u00117vgR,'\u000fT5oW6+GO]5dgRa\u0011QJA(\u0003#\n\u0019&!\u0016\u0002XA\u0011\u0011G\u0003\u0005\b\u0003\u0017\u0019\u0002\u0019AA\b\u0011\u001d\t9b\u0005a\u0001\u00037Aq!a\t\u0014\u0001\u0004\t)\u0003C\u0004\u00022M\u0001\r!!\u000e\t\u000f\u0005\r3\u00031\u0001\u0002F\u000511m\u001c8gS\u001e,\"!!\u0018\u0011\u0007E\ny&C\u0002\u0002b%\u0012\u0011c\u00117vgR,'\u000fT5oW\u000e{gNZ5h\u0003)\u0019wN\u001c4jO~#S-\u001d\u000b\u0005\u0003O\ni\u0007E\u00026\u0003SJ1!a\u001b7\u0005\u0011)f.\u001b;\t\u0013\u0005=T#!AA\u0002\u0005u\u0013a\u0001=%c\u000591m\u001c8gS\u001e\u0004\u0013\u0001\u0005;bg.\u001cx*\u001e;ti\u0006tG-\u001b8h\u0003Q!\u0018m]6t\u001fV$8\u000f^1oI&twm\u0018\u0013fcR!\u0011qMA=\u0011!\ty\u0007GA\u0001\u0002\u0004q\u0014!\u0005;bg.\u001cx*\u001e;ti\u0006tG-\u001b8hA\u0005\u0019!/\u001e8\u0015\u0005\u0005\u0005\u0005cA\u001b\u0002\u0004&\u0019\u0011Q\u0011\u001c\u0003\u000f\t{w\u000e\\3b]\u0006!b-\u001b7uKJ\u001cuN\\:v[\u0016\u0014xI]8vaN$B!a#\u0002\u0012B!!,!$e\u0013\r\tyi\u0019\u0002\u0004'\u0016$\bbBAJ7\u0001\u0007\u00111R\u0001\u0007OJ|W\u000f]:\u000291L7\u000f\u001e#fgR\u001cuN\\:v[\u0016\u0014xI]8va>3gm]3ugR!\u0011\u0011TAU!\r\tY*\u0003\b\u0004\u0003;\u0003a\u0002BAP\u0003OsA!!)\u0002&:\u0019A,a)\n\u00039J!\u0001L\u0017\n\u0005)Z\u0003bBAJ9\u0001\u0007\u00111R\u0001\u001fY&\u001cHoU8ve\u000e,7i\u001c8tk6,'o\u0012:pkB|eMZ:fiN$B!!'\u00020\"9\u00111S\u000fA\u0002\u0005-\u0015\u0001\u00077jgR\u001cuN\\:v[\u0016\u0014xI]8va>3gm]3ugRA\u0011\u0011TA[\u0003o\u000bI\fC\u0004\u0002\u0014z\u0001\r!a#\t\r\u001ds\u0002\u0019AA\u001e\u0011\u001d\tYL\ba\u0001\u0003\u0003\u000b\u0001\"[:T_V\u00148-Z\u0001\u000eI>\f5/\u001f8d\u0007>lW.\u001b;\u0015\r\u0005\u0005\u0015\u0011YAc\u0011\u001d\t\u0019m\ba\u0001\u00033\u000b1c]8ve\u000e,wJ\u001a4tKR4U\u000f^;sKNDq!a2 \u0001\u0004\tI*A\teKN$xJ\u001a4tKR4U\u000f^;sKN\fa&Y:z]\u000e\u001cu.\\7ji\u000e{gn];nKJ|eMZ:fiN$v\u000eR3ti&t\u0017\r^5p]\u000ecWo\u001d;feR1\u0011QZAk\u0003/\u0004RAW1e\u0003\u001f\u00042!RAi\u0013\r\t\u0019N\u0012\u0002 \u00032$XM]\"p]N,X.\u001a:He>,\bo\u00144gg\u0016$8OU3tk2$\bbBAbA\u0001\u0007\u0011\u0011\u0014\u0005\b\u0003\u000f\u0004\u0003\u0019AAM\u0003!B\u0017M\u001c3mK\u0012+7\u000f^5oCRLwN\\\"mkN$XM]\"p[6LG\u000f^3e\u001f\u001a47/\u001a;t)\u0019\t\t)!8\u0002b\"9\u0011q\\\u0011A\u0002\u00055\u0017!D2p[6LGOU3tk2$8\u000fC\u0004\u0002D\u0006\u0002\r!!'\u0002\u0015MDw.\u001e7e'ft7\r\u0006\u0003\u0002\u0002\u0006\u001d\bBBAuE\u0001\u0007A-A\u0003u_BL7-A\u0004pM\u001a\u001cX\r^:\u0015\r\u0005=\u0018\u0011_A{!\u0011Q\u0016\r^<\t\r\u0005M8\u00051\u0001h\u0003\u00191W\u000f^;sK\"9\u00111X\u0012A\u0002\u0005\u0005\u0015!E8gMN,GOR3uG\"4\u0015-\u001b7fIR1\u0011qMA~\u0003{Dq!a/%\u0001\u0004\t\t\tC\u0004\u0002\u0000\u0012\u0002\rA!\u0001\u0002\u0003\u0015\u0004BAa\u0001\u0003\u000e9!!Q\u0001B\u0005\u001d\ra&qA\u0005\u0002o%\u0019!1\u0002\u001c\u0002\u000fA\f7m[1hK&!!q\u0002B\t\u0005%!\u0006N]8xC\ndWMC\u0002\u0003\fY\n1b]8ve\u000e,\u0017\tZ7j]V\u0011\u00111H\u0001\nI\u0016\u001cH/\u00113nS:$bAa\u0007\u0003\"\t\r\u0002cA#\u0003\u001e%\u0019!q\u0004$\u0003=1K7\u000f^\"p]N,X.\u001a:He>,\bo\u00144gg\u0016$8OU3tk2$\bBB$(\u0001\u0004\tY\u0004C\u0004\u0003&\u001d\u0002\rAa\n\u0002\u0015\u001d\u0014x.\u001e9Ta\u0016\u001c7\u000f\u0005\u0003og\u0012$\u0005")
public class ClusterLinkSyncOffsets
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final ClusterLinkMetadataManager metadataManager;
    private final ClusterLinkData linkData;
    private final Function0<ConfluentAdmin> destAdminFactory;
    private final ClusterLinkMetrics metrics;
    private ClusterLinkConfig config;
    private int tasksOutstanding;

    public static ListConsumerGroupOffsetsOptions ListOffsetsOptions() {
        return ClusterLinkSyncOffsets$.MODULE$.ListOffsetsOptions();
    }

    public static ListConsumerGroupOffsetsSpec ListOffsetsForAllPartitions() {
        return ClusterLinkSyncOffsets$.MODULE$.ListOffsetsForAllPartitions();
    }

    public static int ListConsumerGroupOffsetBatchSize() {
        return ClusterLinkSyncOffsets$.MODULE$.ListConsumerGroupOffsetBatchSize();
    }

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    public ClusterLinkMetadataManager metadataManager() {
        return this.metadataManager;
    }

    public Function0<ConfluentAdmin> destAdminFactory() {
        return this.destAdminFactory;
    }

    private ClusterLinkConfig config() {
        return this.config;
    }

    private void config_$eq(ClusterLinkConfig x$1) {
        this.config = x$1;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    @Override
    public boolean run() {
        this.config_$eq(this.clientManager().currentConfig());
        if (this.metadataManager().isLinkCoordinator(this.linkData.linkName(), true) && this.config().consumerOffsetSyncEnable()) {
            if (this.tasksOutstanding() != 0) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(91).append("Number of outstanding tasks was ").append(this.tasksOutstanding()).append(" at the beginning of run. Resetting to 0 and continuing on.").toString());
                this.tasksOutstanding_$eq(0);
            }
            if (this.config().consumerGroupFilters().isEmpty()) {
                this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(91).append(ClusterLinkConfig$.MODULE$.ConsumerOffsetSyncEnableProp()).append(" is true but no consumer group filters are specified. No consumer offsets will be migrated.").toString());
            } else {
                this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Attempting to retrieve consumer groups from source cluster");
                ListConsumerGroupsResult listConsumerGroupsResult = this.sourceAdmin().listConsumerGroups();
                this.scheduleWhenComplete(listConsumerGroupsResult.all(), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
                    boolean bl;
                    this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
                    try {
                        Set<String> filteredGroups = this.filterConsumerGroups((Set<String>)((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)listConsumerGroupsResult.all().get()).asScala()).map((Function1 & Serializable & scala.Serializable)result -> result.groupId(), Iterable$.MODULE$.canBuildFrom())).toSet());
                        scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> destOffsetFutures = this.listDestConsumerGroupOffsets(filteredGroups);
                        scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> sourceOffsetFutures = this.listSourceConsumerGroupOffsets(filteredGroups);
                        if (!sourceOffsetFutures.nonEmpty()) {
                            return true;
                        }
                        Iterable futures = (Iterable)sourceOffsetFutures.values().$plus$plus((GenTraversableOnce)destOffsetFutures.values(), Iterable$.MODULE$.canBuildFrom());
                        this.scheduleWhenComplete(KafkaFuture.allOf((KafkaFuture[])((KafkaFuture[])futures.toSeq().toArray(ClassTag$.MODULE$.apply(KafkaFuture.class)))), (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.doAsyncCommit(sourceOffsetFutures, destOffsetFutures));
                        bl = false;
                    }
                    catch (Throwable e) {
                        this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unable to list consumer group offsets. Offsets will not be migrated.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
                        $this.metrics.listConsumerGroupsFromSourceFailedSensor().record();
                        bl = true;
                    }
                    return bl;
                });
                this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
            }
        }
        return this.tasksOutstanding() == 0;
    }

    /*
     * WARNING - void declaration
     */
    private Set<String> filterConsumerGroups(Set<String> groups) {
        void var5_5;
        void var4_4;
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(55).append("Filtering consumer groups ").append(groups).append(" to match consumer group JSON").toString());
        Option<String> clusterLinkPrefix = Predef$.MODULE$.Boolean2boolean(this.config().clusterLinkPrefixConsumerGroupEnable()) ? this.config().clusterLinkPrefix() : None$.MODULE$;
        Tuple2<Set<String>, Seq<ClusterLinkFilterInfo>> tuple2 = ClusterLinkUtils$.MODULE$.doFilter(groups, this.config().consumerGroupFilters(), this.linkData.tenantPrefix(), clusterLinkPrefix);
        if (tuple2 == null) {
            throw new MatchError(null);
        }
        Set filtered = (Set)tuple2._1();
        Seq unusedFilters = (Seq)tuple2._2();
        void filtered2 = var4_4;
        var5_5.foreach((Function1 & Serializable & scala.Serializable)unusedFilter -> {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(199).append("The filter ").append(unusedFilter).append(" does not match any consumer group. This filter may not be ").append("required or the groups it referred to may not have the correct DESCRIBE ACL ").append("for the cluster link principal on the source cluster.").toString());
            return BoxedUnit.UNIT;
        });
        this.trace((Function0<String>)((Function0 & Serializable & scala.Serializable)() -> ClusterLinkSyncOffsets.$anonfun$filterConsumerGroups$4((Set)filtered2)));
        return filtered2;
    }

    private scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> listDestConsumerGroupOffsets(Set<String> groups) {
        return this.listConsumerGroupOffsets(groups, this.destAdmin(), false);
    }

    private scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> listSourceConsumerGroupOffsets(Set<String> groups) {
        return this.listConsumerGroupOffsets(groups, this.sourceAdmin(), true);
    }

    private scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> listConsumerGroupOffsets(Set<String> groups, ConfluentAdmin admin, boolean isSource) {
        String targetCluster = isSource ? "source" : "destination";
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(74).append("Listing consumer group offsets on ").append(targetCluster).append(" cluster for following consumer groups: ").append(groups).toString());
        scala.collection.mutable.Map groupFutures = (scala.collection.mutable.Map)Map$.MODULE$.apply((Seq)Nil$.MODULE$);
        try {
            groups.grouped(ClusterLinkSyncOffsets$.MODULE$.ListConsumerGroupOffsetBatchSize()).foreach((Function1 & Serializable & scala.Serializable)batch -> {
                ClusterLinkSyncOffsets.$anonfun$listConsumerGroupOffsets$2(this, targetCluster, admin, groupFutures, batch);
                return BoxedUnit.UNIT;
            });
        }
        catch (Throwable ex) {
            this.offsetFetchFailed(isSource, ex);
        }
        return groupFutures.toMap(Predef$.MODULE$.$conforms());
    }

    private boolean doAsyncCommit(scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> sourceOffsetFutures, scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> destOffsetFutures) {
        this.tasksOutstanding_$eq(this.tasksOutstanding() - (sourceOffsetFutures.size() + destOffsetFutures.size()));
        scala.collection.immutable.Map<String, AlterConsumerGroupOffsetsResult> commitResults = this.asyncCommitConsumerOffsetsToDestinationCluster(sourceOffsetFutures, destOffsetFutures);
        if (commitResults.nonEmpty()) {
            this.handleDestinationClusterCommittedOffsets(commitResults, sourceOffsetFutures);
        }
        return this.tasksOutstanding() == 0;
    }

    private scala.collection.immutable.Map<String, AlterConsumerGroupOffsetsResult> asyncCommitConsumerOffsetsToDestinationCluster(scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> sourceOffsetFutures, scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> destOffsetFutures) {
        return (scala.collection.immutable.Map)sourceOffsetFutures.flatMap((Function1 & Serializable & scala.Serializable)x0$1 -> {
            Iterable iterable;
            if (x0$1 != null) {
                scala.collection.immutable.Map<TopicPartition, OffsetAndMetadata> map;
                String group = (String)x0$1._1();
                KafkaFuture groupFuture = (KafkaFuture)x0$1._2();
                Option option = destOffsetFutures.get((Object)group);
                if (option instanceof Some) {
                    KafkaFuture future = (KafkaFuture)((Some)option).value();
                    map = this.offsets((KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>)future, false);
                } else if (None$.MODULE$.equals(option)) {
                    map = Predef$.MODULE$.Map().empty();
                } else {
                    throw new MatchError((Object)option);
                }
                scala.collection.immutable.Map offsetsToCommit = (scala.collection.immutable.Map)this.offsets((KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>)groupFuture, true).filter((Function1 & Serializable & scala.Serializable)x0$2 -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSyncOffsets.$anonfun$asyncCommitConsumerOffsetsToDestinationCluster$2(this, map, x0$2)));
                if (offsetsToCommit.nonEmpty()) {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(63).append("Committing offsets on destination cluster for consumer group ").append(group).append(": ").append(offsetsToCommit).toString());
                    this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
                    iterable = Option$.MODULE$.option2Iterable((Option)new Some((Object)Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)group), (Object)this.destAdmin().alterConsumerGroupOffsets(group, (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)offsetsToCommit).asJava()))));
                } else {
                    iterable = Option$.MODULE$.option2Iterable((Option)None$.MODULE$);
                }
            } else {
                throw new MatchError(null);
            }
            Iterable iterable2 = iterable;
            return iterable2;
        }, scala.collection.immutable.Map$.MODULE$.canBuildFrom());
    }

    private boolean handleDestinationClusterCommittedOffsets(scala.collection.immutable.Map<String, AlterConsumerGroupOffsetsResult> commitResults, scala.collection.immutable.Map<String, KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>>> sourceOffsetFutures) {
        KafkaFuture allCommitFutures = KafkaFuture.allOf((KafkaFuture[])((KafkaFuture[])((TraversableOnce)commitResults.values().map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.all(), Iterable$.MODULE$.canBuildFrom())).toSeq().toArray(ClassTag$.MODULE$.apply(KafkaFuture.class))));
        this.scheduleWhenComplete(allCommitFutures, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> {
            commitResults.foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
                ClusterLinkSyncOffsets.$anonfun$handleDestinationClusterCommittedOffsets$3(this, sourceOffsetFutures, x0$1);
                return BoxedUnit.UNIT;
            });
            return this.tasksOutstanding() == 0;
        });
        return this.tasksOutstanding() == 0;
    }

    private boolean shouldSync(String topic) {
        return this.metadataManager().isActiveMirrorTopic(topic);
    }

    private scala.collection.immutable.Map<TopicPartition, OffsetAndMetadata> offsets(KafkaFuture<java.util.Map<TopicPartition, OffsetAndMetadata>> future, boolean isSource) {
        try {
            return ((TraversableOnce)((TraversableLike)CollectionConverters$.MODULE$.mapAsScalaMapConverter((java.util.Map)future.get()).asScala()).map((Function1 & Serializable & scala.Serializable)e -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(e._1()), e._2()), Map$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms());
        }
        catch (ExecutionException ex) {
            this.offsetFetchFailed(isSource, ex);
            return Predef$.MODULE$.Map().empty();
        }
    }

    private void offsetFetchFailed(boolean isSource, Throwable e) {
        if (isSource) {
            this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unable to list consumer group offsets on source cluster. Offsets will not be migrated.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
            this.metrics.listConsumerGroupOffsetsFromSourceFailedSensor().record();
            return;
        }
        this.info((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unable to list consumer group offsets on destination cluster. All available source offsets will be committed.", (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> e);
        this.metrics.listConsumerGroupOffsetsFromDestinationFailedSensor().record();
    }

    private ConfluentAdmin sourceAdmin() {
        return this.clientManager().getAdmin();
    }

    private ConfluentAdmin destAdmin() {
        return (ConfluentAdmin)this.destAdminFactory().apply();
    }

    public ListConsumerGroupOffsetsResult listConsumerGroupOffsets(ConfluentAdmin admin, java.util.Map<String, ListConsumerGroupOffsetsSpec> groupSpecs) {
        return admin.listConsumerGroupOffsets(groupSpecs, ClusterLinkSyncOffsets$.MODULE$.ListOffsetsOptions());
    }

    public static final /* synthetic */ String $anonfun$filterConsumerGroups$4(Set filtered$1) {
        return new StringBuilder(26).append("Filtered consumer groups: ").append(filtered$1).toString();
    }

    public static final /* synthetic */ void $anonfun$listConsumerGroupOffsets$6(ClusterLinkSyncOffsets $this, scala.collection.mutable.Map groupFutures$1, String group$1, KafkaFuture offsets) {
        groupFutures$1.$plus$eq(Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc((Object)group$1), (Object)offsets));
        $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
    }

    public static final /* synthetic */ void $anonfun$listConsumerGroupOffsets$5(ClusterLinkSyncOffsets $this, ListConsumerGroupOffsetsResult result$1, scala.collection.mutable.Map groupFutures$1, String group) {
        Option$.MODULE$.apply((Object)result$1.partitionsToOffsetAndMetadata(group)).foreach((Function1 & Serializable & scala.Serializable)offsets -> {
            ClusterLinkSyncOffsets.$anonfun$listConsumerGroupOffsets$6($this, groupFutures$1, group, offsets);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ void $anonfun$listConsumerGroupOffsets$2(ClusterLinkSyncOffsets $this, String targetCluster$1, ConfluentAdmin admin$1, scala.collection.mutable.Map groupFutures$1, Set batch) {
        $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(74).append("Listing consumer group offsets on ").append(targetCluster$1).append(" cluster for following consumer groups: ").append(batch).toString());
        java.util.Map groupMap = (java.util.Map)CollectionConverters$.MODULE$.mapAsJavaMapConverter((Map)((TraversableOnce)batch.map((Function1 & Serializable & scala.Serializable)g -> Predef.ArrowAssoc$.MODULE$.$minus$greater$extension(Predef$.MODULE$.ArrowAssoc(g), (Object)ClusterLinkSyncOffsets$.MODULE$.ListOffsetsForAllPartitions()), Set$.MODULE$.canBuildFrom())).toMap(Predef$.MODULE$.$conforms())).asJava();
        ListConsumerGroupOffsetsResult result = $this.listConsumerGroupOffsets(admin$1, groupMap);
        batch.foreach((Function1 & Serializable & scala.Serializable)group -> {
            ClusterLinkSyncOffsets.$anonfun$listConsumerGroupOffsets$5($this, result, groupFutures$1, group);
            return BoxedUnit.UNIT;
        });
    }

    public static final /* synthetic */ boolean $anonfun$asyncCommitConsumerOffsetsToDestinationCluster$4(OffsetAndMetadata sourceOffset$1, OffsetAndMetadata destOffset) {
        return destOffset.offset() == sourceOffset$1.offset();
    }

    public static final /* synthetic */ boolean $anonfun$asyncCommitConsumerOffsetsToDestinationCluster$2(ClusterLinkSyncOffsets $this, scala.collection.immutable.Map destOffsets$1, Tuple2 x0$2) {
        boolean bl;
        if (x0$2 != null) {
            TopicPartition tp = (TopicPartition)x0$2._1();
            OffsetAndMetadata sourceOffset = (OffsetAndMetadata)x0$2._2();
            if (!$this.shouldSync(tp.topic())) {
                $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Not committing offsets for ").append(tp).append(" since the topic is not an active mirror.").toString());
                bl = false;
            } else if (destOffsets$1.get((Object)tp).exists((Function1 & Serializable & scala.Serializable)destOffset -> BoxesRunTime.boxToBoolean((boolean)ClusterLinkSyncOffsets.$anonfun$asyncCommitConsumerOffsetsToDestinationCluster$4(sourceOffset, destOffset)))) {
                $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(82).append("Not committing offsets for ").append(tp).append(" since offset=").append(sourceOffset).append(" is already committed on the destination.").toString());
                bl = false;
            } else {
                bl = true;
            }
        } else {
            throw new MatchError(null);
        }
        boolean bl2 = bl;
        return bl2;
    }

    public static final /* synthetic */ void $anonfun$handleDestinationClusterCommittedOffsets$3(ClusterLinkSyncOffsets $this, scala.collection.immutable.Map sourceOffsetFutures$2, Tuple2 x0$1) {
        if (x0$1 != null) {
            String group = (String)x0$1._1();
            AlterConsumerGroupOffsetsResult commitResult = (AlterConsumerGroupOffsetsResult)x0$1._2();
            try {
                $this.tasksOutstanding_$eq($this.tasksOutstanding() - 1);
                commitResult.all().get();
                ((java.util.Map)((KafkaFuture)sourceOffsetFutures$2.apply((Object)group)).get()).forEach((tp, x$3) -> {
                    if ($this.shouldSync(tp.topic())) {
                        $this.metrics.consumerOffsetCommitSensor().record();
                        $this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(60).append("Committed offsets on destination cluster for consumer group ").append(group).toString());
                    }
                });
                return;
            }
            catch (ExecutionException executionException) {
                Throwable throwable = executionException.getCause();
                if (throwable instanceof GroupAuthorizationException) {
                    $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(277).append("Unable to commit offsets for consumer group ").append(group).append(" on the destination cluster, due to authorization issues.").append(" Please add READ ACLs for the consumer group. This action is taken by the inter-broker principal defined in the broker ").append("configuration so ACLs should be added for this principal.").toString());
                    $this.metrics.consumerOffsetCommitFailedSensor().record();
                    return;
                }
                if (throwable instanceof TopicAuthorizationException) {
                    $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(284).append("Unable to commit offsets for consumer group ").append(group).append(" on the destination cluster, due to authorization issues.").append(" Please add READ ACLs for the topics being migrated. This action is taken by the inter-broker principal defined in the broker ").append("configuration so ACLs should be added for this principal.").toString());
                    $this.metrics.consumerOffsetCommitFailedSensor().record();
                    return;
                }
                if (throwable != null) {
                    $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Unable to commit offsets for consumer group ").append(group).append(" on destination cluster.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> throwable);
                    $this.metrics.consumerOffsetCommitFailedSensor().record();
                    return;
                }
                throw new MatchError(null);
            }
            catch (Throwable ex) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(68).append("Unable to commit offsets for consumer group ").append(group).append(" on destination cluster.").toString(), (Function0<Throwable>)(Function0 & Serializable & scala.Serializable)() -> ex);
                $this.metrics.consumerOffsetCommitFailedSensor().record();
                return;
            }
        }
        throw new MatchError(null);
    }

    public ClusterLinkSyncOffsets(ClusterLinkClientManager clientManager, ClusterLinkMetadataManager metadataManager, ClusterLinkData linkData, Function0<ConfluentAdmin> destAdminFactory, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.metadataManager = metadataManager;
        this.linkData = linkData;
        this.destAdminFactory = destAdminFactory;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncOffsets", Predef$.MODULE$.Integer2int(clientManager.currentConfig().consumerOffsetSyncMs()));
        this.config = clientManager.currentConfig();
        this.tasksOutstanding = 0;
    }
}

