/*
 * Decompiled with CFR 0.152.
 */
package kafka.server.link;

import java.io.Serializable;
import java.util.Collection;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import kafka.controller.KafkaController;
import kafka.server.link.AclFiltersJson;
import kafka.server.link.AclJson$;
import kafka.server.link.ClusterLinkClientManager;
import kafka.server.link.ClusterLinkMetrics;
import kafka.server.link.ClusterLinkScheduler;
import org.apache.kafka.clients.admin.DescribeAclsResult;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.errors.AuthorizationException;
import org.apache.kafka.server.authorizer.AclCreateResult;
import org.apache.kafka.server.authorizer.AclDeleteResult;
import org.apache.kafka.server.authorizer.Authorizer;
import scala.Function0;
import scala.Function1;
import scala.MatchError;
import scala.None$;
import scala.Predef$;
import scala.Some;
import scala.Tuple2;
import scala.collection.GenSet;
import scala.collection.IterableLike;
import scala.collection.Seq;
import scala.collection.TraversableLike;
import scala.collection.TraversableOnce;
import scala.collection.immutable.List;
import scala.collection.immutable.List$;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.Buffer;
import scala.collection.mutable.Buffer$;
import scala.collection.mutable.ListBuffer;
import scala.collection.mutable.ListBuffer$;
import scala.collection.mutable.Set;
import scala.collection.mutable.Set$;
import scala.jdk.CollectionConverters$;
import scala.reflect.ClassTag$;
import scala.reflect.ScalaSignature;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;
import scala.runtime.java8.JFunction0;

@ScalaSignature(bytes="\u0006\u0001\u0005]c\u0001\u0002\t\u0012\u0001aA\u0001\"\t\u0001\u0003\u0006\u0004%\tA\t\u0005\tM\u0001\u0011\t\u0011)A\u0005G!Aq\u0005\u0001B\u0001B\u0003%\u0001\u0006\u0003\u0005.\u0001\t\u0005\t\u0015!\u0003/\u0011\u0015\t\u0004\u0001\"\u00013\u0011\u001d9\u0004A1A\u0005\naBa\u0001\u0015\u0001!\u0002\u0013I\u0004bB)\u0001\u0001\u0004%IA\u0015\u0005\b/\u0002\u0001\r\u0011\"\u0003Y\u0011\u0019q\u0006\u0001)Q\u0005'\")q\f\u0001C)A\")A\r\u0001C\u0005K\")q\u000f\u0001C\u0005q\"9\u00111\u0007\u0001\u0005\n\u0005U\u0002BBA+\u0001\u0011\u0005\u0001HA\nDYV\u001cH/\u001a:MS:\\7+\u001f8d\u0003\u000ed7O\u0003\u0002\u0013'\u0005!A.\u001b8l\u0015\t!R#\u0001\u0004tKJ4XM\u001d\u0006\u0002-\u0005)1.\u00194lC\u000e\u00011C\u0001\u0001\u001a!\tQbD\u0004\u0002\u001c95\t\u0011#\u0003\u0002\u001e#\u0005!2\t\\;ti\u0016\u0014H*\u001b8l'\u000eDW\rZ;mKJL!a\b\u0011\u0003\u0019A+'/[8eS\u000e$\u0016m]6\u000b\u0005u\t\u0012!D2mS\u0016tG/T1oC\u001e,'/F\u0001$!\tYB%\u0003\u0002&#\tA2\t\\;ti\u0016\u0014H*\u001b8l\u00072LWM\u001c;NC:\fw-\u001a:\u0002\u001d\rd\u0017.\u001a8u\u001b\u0006t\u0017mZ3sA\u0005Q1m\u001c8ue>dG.\u001a:\u0011\u0005%ZS\"\u0001\u0016\u000b\u0005\u001d*\u0012B\u0001\u0017+\u0005=Y\u0015MZ6b\u0007>tGO]8mY\u0016\u0014\u0018aB7fiJL7m\u001d\t\u00037=J!\u0001M\t\u0003%\rcWo\u001d;fe2Kgn['fiJL7m]\u0001\u0007y%t\u0017\u000e\u001e \u0015\tM\"TG\u000e\t\u00037\u0001AQ!I\u0003A\u0002\rBQaJ\u0003A\u0002!BQ!L\u0003A\u00029\nQbY;se\u0016tG/Q2m'\u0016$X#A\u001d\u0011\u0007i\n5)D\u0001<\u0015\taT(A\u0004nkR\f'\r\\3\u000b\u0005yz\u0014AC2pY2,7\r^5p]*\t\u0001)A\u0003tG\u0006d\u0017-\u0003\u0002Cw\t\u00191+\u001a;\u0011\u0005\u0011sU\"A#\u000b\u0005\u0019;\u0015aA1dY*\u0011\u0001*S\u0001\u0007G>lWn\u001c8\u000b\u0005YQ%BA&M\u0003\u0019\t\u0007/Y2iK*\tQ*A\u0002pe\u001eL!aT#\u0003\u0015\u0005\u001bGNQ5oI&tw-\u0001\bdkJ\u0014XM\u001c;BG2\u001cV\r\u001e\u0011\u0002!Q\f7o[:PkR\u001cH/\u00198eS:<W#A*\u0011\u0005Q+V\"A \n\u0005Y{$aA%oi\u0006!B/Y:lg>+Ho\u001d;b]\u0012LgnZ0%KF$\"!\u0017/\u0011\u0005QS\u0016BA.@\u0005\u0011)f.\u001b;\t\u000fuK\u0011\u0011!a\u0001'\u0006\u0019\u0001\u0010J\u0019\u0002#Q\f7o[:PkR\u001cH/\u00198eS:<\u0007%A\u0002sk:$\u0012!\u0019\t\u0003)\nL!aY \u0003\u000f\t{w\u000e\\3b]\u0006QQ\u000f\u001d3bi\u0016\f5\r\\:\u0015\u0005\u00054\u0007\"B4\r\u0001\u0004A\u0017A\u00034viV\u0014X\rT5tiB\u0019!([6\n\u0005)\\$A\u0003'jgR\u0014UO\u001a4feB\u0019A.\\8\u000e\u0003\u001dK!A\\$\u0003\u0017-\u000bgm[1GkR,(/\u001a\t\u0004aV\u001cU\"A9\u000b\u0005I\u001c\u0018\u0001B;uS2T\u0011\u0001^\u0001\u0005U\u00064\u0018-\u0003\u0002wc\nQ1i\u001c7mK\u000e$\u0018n\u001c8\u0002;\u0005$G-Q2mg\u0006sG\rT8h\u0007J,\u0017\r^5p]^\u000b'O\\5oON$b!Y=\u0002*\u0005=\u0002\"\u0002>\u000e\u0001\u0004Y\u0018aE1dY\u000e\u0013X-\u0019;f%\u0016\u001cX\u000f\u001c;MSN$\b#\u0002?\u0002\n\u0005=abA?\u0002\u00069\u0019a0a\u0001\u000e\u0003}T1!!\u0001\u0018\u0003\u0019a$o\\8u}%\t\u0001)C\u0002\u0002\b}\nq\u0001]1dW\u0006<W-\u0003\u0003\u0002\f\u00055!\u0001\u0002'jgRT1!a\u0002@!\u0019\t\t\"a\u0006\u0002\u001c5\u0011\u00111\u0003\u0006\u0004\u0003+\t\u0018AC2p]\u000e,(O]3oi&!\u0011\u0011DA\n\u0005E\u0019u.\u001c9mKR\f'\r\\3GkR,(/\u001a\t\u0005\u0003;\t)#\u0004\u0002\u0002 )!\u0011\u0011EA\u0012\u0003)\tW\u000f\u001e5pe&TXM\u001d\u0006\u0003)%KA!a\n\u0002 \ty\u0011i\u00197De\u0016\fG/\u001a*fgVdG\u000fC\u0004\u0002,5\u0001\r!!\f\u0002\u001d\r\u0014X-\u0019;fI\u0006\u001bG\u000eT5tiB!A0!\u0003D\u0011\u0019\t\t$\u0004a\u0001s\u0005Y\u0011\r\u001a3fI\u0006\u001bGnU3u\u0003\u0001\"W\r\\3uK\u0006\u001bGn]!oI2{w\rR3mKRLwN\\,be:LgnZ:\u0015\u000f\u0005\f9$!\u0012\u0002J!9\u0011\u0011\b\bA\u0002\u0005m\u0012aE1dY\u0012+G.\u001a;f%\u0016\u001cX\u000f\u001c;MSN$\b#\u0002?\u0002\n\u0005u\u0002CBA\t\u0003/\ty\u0004\u0005\u0003\u0002\u001e\u0005\u0005\u0013\u0002BA\"\u0003?\u0011q\"Q2m\t\u0016dW\r^3SKN,H\u000e\u001e\u0005\u0007\u0003\u000fr\u0001\u0019A\u001d\u0002\u001b\u0011,G.\u001a;fI\u0006\u001bGnU3u\u0011\u001d\tYE\u0004a\u0001\u0003\u001b\nA\u0003Z3mKR,G-Q2m\r&dG/\u001a:MSN$\b#\u0002?\u0002\n\u0005=\u0003c\u0001#\u0002R%\u0019\u00111K#\u0003!\u0005\u001bGNQ5oI&twMR5mi\u0016\u0014\u0018\u0001E4fi\u000e+(O]3oi\u0006\u001bGnU3u\u0001")
public class ClusterLinkSyncAcls
extends ClusterLinkScheduler.PeriodicTask {
    private final ClusterLinkClientManager clientManager;
    private final KafkaController controller;
    private final ClusterLinkMetrics metrics;
    private final Set<AclBinding> currentAclSet;
    private int tasksOutstanding;

    public ClusterLinkClientManager clientManager() {
        return this.clientManager;
    }

    private Set<AclBinding> currentAclSet() {
        return this.currentAclSet;
    }

    private int tasksOutstanding() {
        return this.tasksOutstanding;
    }

    private void tasksOutstanding_$eq(int x$1) {
        this.tasksOutstanding = x$1;
    }

    @Override
    public boolean run() {
        if (this.controller.isActive()) {
            AclFiltersJson aclFilterJson = (AclFiltersJson)this.clientManager().currentConfig().aclFilters().get();
            ListBuffer<AclBindingFilter> aclFilterList = AclJson$.MODULE$.toAclBindingFilters(aclFilterJson);
            ListBuffer describeAclsResultList = (ListBuffer)ListBuffer$.MODULE$.apply((Seq)Nil$.MODULE$);
            aclFilterList.foreach((Function1 & Serializable & scala.Serializable)aclFilter -> {
                Some describeAclsResult;
                Some some;
                try {
                    this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Attempting to retrieve ACLs from source cluster");
                    some = new Some((Object)this.clientManager().getAdmin().describeAcls(aclFilter));
                }
                catch (AuthorizationException authorizationException) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Unable to retrieve ACLs on source cluster. Please enable DESCRIBE ACLs on the source cluster to proceed with ACL migration.");
                    some = None$.MODULE$;
                }
                catch (Throwable e) {
                    this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(78).append("Unexpected error encountered while trying to retrieve ACLs on source cluster: ").append(e).toString());
                    some = describeAclsResult = None$.MODULE$;
                }
                if (describeAclsResult.isDefined()) {
                    return describeAclsResultList.$plus$eq((Object)describeAclsResult);
                }
                return BoxedUnit.UNIT;
            });
            if (describeAclsResultList.nonEmpty()) {
                ListBuffer futureList = (ListBuffer)describeAclsResultList.map((Function1 & Serializable & scala.Serializable)result -> ((DescribeAclsResult)result.get()).values(), ListBuffer$.MODULE$.canBuildFrom());
                KafkaFuture future = KafkaFuture.allOf((KafkaFuture[])((KafkaFuture[])futureList.toArray(ClassTag$.MODULE$.apply(KafkaFuture.class))));
                this.scheduleWhenComplete(future, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> this.updateAcls((ListBuffer<KafkaFuture<Collection<AclBinding>>>)futureList));
                this.tasksOutstanding_$eq(this.tasksOutstanding() + 1);
            }
        }
        return this.tasksOutstanding() == 0;
    }

    private boolean updateAcls(ListBuffer<KafkaFuture<Collection<AclBinding>>> futureList) {
        this.trace((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> "Updating ACLs on target cluster");
        Set describeAclResultSet = (Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        futureList.foreach((Function1 & Serializable & scala.Serializable)future -> (Set)describeAclResultSet.$plus$plus$eq((TraversableOnce)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter((Collection)future.get()).asScala()));
        Set deletedAcls = (Set)this.currentAclSet().diff((GenSet)describeAclResultSet);
        Set addedAcls = (Set)describeAclResultSet.diff(this.currentAclSet());
        this.clientManager().getAuthorizer().foreach((Function1 & Serializable & scala.Serializable)auth -> {
            ClusterLinkSyncAcls.$anonfun$updateAcls$3(this, addedAcls, deletedAcls, auth);
            return BoxedUnit.UNIT;
        });
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        return this.tasksOutstanding() == 0;
    }

    private boolean addAclsAndLogCreationWarnings(List<CompletableFuture<AclCreateResult>> aclCreateResultList, List<AclBinding> createdAclList, Set<AclBinding> addedAclSet) {
        ((List)aclCreateResultList.zip(createdAclList, List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            ClusterLinkSyncAcls.$anonfun$addAclsAndLogCreationWarnings$1(this, addedAclSet, x0$1);
            return BoxedUnit.UNIT;
        });
        this.currentAclSet().$plus$plus$eq(addedAclSet);
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        return this.tasksOutstanding() == 0;
    }

    private boolean deleteAclsAndLogDeletionWarnings(List<CompletableFuture<AclDeleteResult>> aclDeleteResultList, Set<AclBinding> deletedAclSet, List<AclBindingFilter> deletedAclFilterList) {
        ((List)aclDeleteResultList.zip(deletedAclFilterList, List$.MODULE$.canBuildFrom())).foreach((Function1 & Serializable & scala.Serializable)x0$1 -> {
            ClusterLinkSyncAcls.$anonfun$deleteAclsAndLogDeletionWarnings$1(this, deletedAclSet, x0$1);
            return BoxedUnit.UNIT;
        });
        this.currentAclSet().$minus$minus$eq(deletedAclSet);
        this.tasksOutstanding_$eq(this.tasksOutstanding() - 1);
        return this.tasksOutstanding() == 0;
    }

    public Set<AclBinding> getCurrentAclSet() {
        return this.currentAclSet();
    }

    public static final /* synthetic */ void $anonfun$updateAcls$3(ClusterLinkSyncAcls $this, Set addedAcls$1, Set deletedAcls$1, Authorizer auth) {
        if (addedAcls$1.nonEmpty()) {
            try {
                List addedAclsList = addedAcls$1.toList();
                Buffer createdAclResults = (Buffer)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(auth.createAcls(null, (java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)addedAclsList).asJava(), Optional.empty())).asScala()).map((Function1 & Serializable & scala.Serializable)x$1 -> x$1.toCompletableFuture(), Buffer$.MODULE$.canBuildFrom());
                CompletableFuture<Void> createdAclsFuture = CompletableFuture.allOf((CompletableFuture[])createdAclResults.toArray(ClassTag$.MODULE$.apply(CompletableFuture.class)));
                $this.scheduleWhenComplete(createdAclsFuture, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> $this.addAclsAndLogCreationWarnings((List<CompletableFuture<AclCreateResult>>)createdAclResults.toList(), (List<AclBinding>)addedAclsList, (Set<AclBinding>)addedAcls$1));
                $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("Unexpected error encountered while trying to create ACLs on destination cluster: ").append(e).toString());
            }
        }
        if (deletedAcls$1.nonEmpty()) {
            try {
                List deleteAclsFilterList = ((TraversableOnce)deletedAcls$1.map((Function1 & Serializable & scala.Serializable)acl -> acl.toFilter(), Set$.MODULE$.canBuildFrom())).toList();
                Buffer deletedAclResults = (Buffer)((TraversableLike)CollectionConverters$.MODULE$.asScalaBufferConverter(auth.deleteAcls(null, (java.util.List)CollectionConverters$.MODULE$.seqAsJavaListConverter((Seq)deleteAclsFilterList).asJava(), Optional.empty())).asScala()).map((Function1 & Serializable & scala.Serializable)x$2 -> x$2.toCompletableFuture(), Buffer$.MODULE$.canBuildFrom());
                CompletableFuture<Void> deletedAclsFuture = CompletableFuture.allOf((CompletableFuture[])deletedAclResults.toArray(ClassTag$.MODULE$.apply(CompletableFuture.class)));
                $this.scheduleWhenComplete(deletedAclsFuture, (Function0<Object>)(JFunction0.mcZ.sp & Serializable & scala.Serializable)() -> $this.deleteAclsAndLogDeletionWarnings((List<CompletableFuture<AclDeleteResult>>)deletedAclResults.toList(), (Set<AclBinding>)deletedAcls$1, (List<AclBindingFilter>)deleteAclsFilterList));
                $this.tasksOutstanding_$eq($this.tasksOutstanding() + 1);
                return;
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(81).append("Unexpected error encountered while trying to create ACLs on destination cluster: ").append(e).toString());
                return;
            }
        }
    }

    public static final /* synthetic */ void $anonfun$addAclsAndLogCreationWarnings$1(ClusterLinkSyncAcls $this, Set addedAcls$2, Tuple2 x0$1) {
        if (x0$1 != null) {
            CompletableFuture future = (CompletableFuture)x0$1._1();
            AclBinding createdList = (AclBinding)x0$1._2();
            try {
                AclCreateResult createdAcl = (AclCreateResult)future.get();
                if (!createdAcl.exception().isPresent()) {
                    $this.metrics.aclsAddedSensor().record();
                    return;
                }
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Encountered the following exception while trying to create ACL: ").append(createdAcl.exception().get()).toString());
                addedAcls$2.$minus$eq((Object)createdList);
                $this.metrics.aclsAddFailedSensor().record();
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Unexpected error encountered while trying to create ACL: ").append(e).toString());
                addedAcls$2.$minus$eq((Object)createdList);
                $this.metrics.aclsAddFailedSensor().record();
                return;
            }
        } else {
            throw new MatchError(null);
        }
    }

    public static final /* synthetic */ void $anonfun$deleteAclsAndLogDeletionWarnings$2(ClusterLinkSyncAcls $this, Set deletedAcls$2, AclDeleteResult.AclBindingDeleteResult aclBindingDeleteResult) {
        if (aclBindingDeleteResult.exception().isPresent()) {
            $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(64).append("Encountered the following exception while trying to delete ACL: ").append(aclBindingDeleteResult.exception().get()).toString());
            deletedAcls$2.$minus$eq((Object)aclBindingDeleteResult.aclBinding());
            $this.metrics.aclsDeleteFailedSensor().record();
            return;
        }
        $this.metrics.aclsDeletedSensor().record();
    }

    public static final /* synthetic */ void $anonfun$deleteAclsAndLogDeletionWarnings$1(ClusterLinkSyncAcls $this, Set deletedAcls$2, Tuple2 x0$1) {
        if (x0$1 != null) {
            CompletableFuture future = (CompletableFuture)x0$1._1();
            AclBindingFilter filter = (AclBindingFilter)x0$1._2();
            try {
                Collection aclBindingDeleteResultList = ((AclDeleteResult)future.get()).aclBindingDeleteResults();
                ((IterableLike)CollectionConverters$.MODULE$.collectionAsScalaIterableConverter(aclBindingDeleteResultList).asScala()).foreach((Function1 & Serializable & scala.Serializable)aclBindingDeleteResult -> {
                    ClusterLinkSyncAcls.$anonfun$deleteAclsAndLogDeletionWarnings$2($this, deletedAcls$2, aclBindingDeleteResult);
                    return BoxedUnit.UNIT;
                });
                return;
            }
            catch (Throwable e) {
                $this.warn((Function0<String>)(Function0 & Serializable & scala.Serializable)() -> new StringBuilder(57).append("Unexpected error encountered while trying to delete ACL: ").append(e).toString());
                deletedAcls$2.$minus$minus$eq((TraversableOnce)deletedAcls$2.filter((Function1 & Serializable & scala.Serializable)x$1 -> BoxesRunTime.boxToBoolean((boolean)filter.matches(x$1))));
                $this.metrics.aclsDeleteFailedSensor().record();
                return;
            }
        }
        throw new MatchError(null);
    }

    public ClusterLinkSyncAcls(ClusterLinkClientManager clientManager, KafkaController controller, ClusterLinkMetrics metrics) {
        this.clientManager = clientManager;
        this.controller = controller;
        this.metrics = metrics;
        super(clientManager.scheduler(), "ClusterLinkSyncAcls", Predef$.MODULE$.Integer2int(clientManager.currentConfig().aclSyncMs()));
        this.currentAclSet = (Set)Set$.MODULE$.apply((Seq)Nil$.MODULE$);
        this.tasksOutstanding = 0;
    }
}

