package org.apache.spark.eventhubs.client;

import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventhubs.ReceiverDisconnectedException;
import java.util.concurrent.CompletionException;
import java.util.concurrent.RejectedExecutionException;
import org.apache.spark.SparkEnv$;
import org.apache.spark.eventhubs.EventHubsConf;
import org.apache.spark.eventhubs.EventHubsUtils$;
import org.apache.spark.eventhubs.NameAndPartition;
import org.apache.spark.eventhubs.PartitionPerformanceReceiver$;
import org.apache.spark.eventhubs.package$;
import org.apache.spark.internal.Logging;
import org.apache.spark.rpc.RpcEndpointRef;
import org.apache.spark.util.RpcUtils$;
import org.slf4j.Logger;
import scala.Function0;
import scala.Predef$;
import scala.StringContext;
import scala.collection.Iterator;
import scala.collection.immutable.Nil$;
import scala.collection.mutable.HashMap;
import scala.runtime.BoxesRunTime;

/* compiled from: CachedEventHubsReceiver.scala */
/* loaded from: input_file:org/apache/spark/eventhubs/client/CachedEventHubsReceiver$.class */
public final class CachedEventHubsReceiver$ implements CachedReceiver, Logging {
    public static CachedEventHubsReceiver$ MODULE$;
    private final long startRecieverTimeNs;
    private final HashMap<String, CachedEventHubsReceiver> receivers;
    private final RpcEndpointRef partitionPerformanceReceiverRef;
    private transient Logger org$apache$spark$internal$Logging$$log_;

    static {
        new CachedEventHubsReceiver$();
    }

    public String logName() {
        return Logging.logName$(this);
    }

    public Logger log() {
        return Logging.log$(this);
    }

    public void logInfo(Function0<String> function0) {
        Logging.logInfo$(this, function0);
    }

    public void logDebug(Function0<String> function0) {
        Logging.logDebug$(this, function0);
    }

    public void logTrace(Function0<String> function0) {
        Logging.logTrace$(this, function0);
    }

    public void logWarning(Function0<String> function0) {
        Logging.logWarning$(this, function0);
    }

    public void logError(Function0<String> function0) {
        Logging.logError$(this, function0);
    }

    public void logInfo(Function0<String> function0, Throwable th) {
        Logging.logInfo$(this, function0, th);
    }

    public void logDebug(Function0<String> function0, Throwable th) {
        Logging.logDebug$(this, function0, th);
    }

    public void logTrace(Function0<String> function0, Throwable th) {
        Logging.logTrace$(this, function0, th);
    }

    public void logWarning(Function0<String> function0, Throwable th) {
        Logging.logWarning$(this, function0, th);
    }

    public void logError(Function0<String> function0, Throwable th) {
        Logging.logError$(this, function0, th);
    }

    public boolean isTraceEnabled() {
        return Logging.isTraceEnabled$(this);
    }

    public void initializeLogIfNecessary(boolean z) {
        Logging.initializeLogIfNecessary$(this, z);
    }

    public boolean initializeLogIfNecessary(boolean z, boolean z2) {
        return Logging.initializeLogIfNecessary$(this, z, z2);
    }

    public boolean initializeLogIfNecessary$default$2() {
        return Logging.initializeLogIfNecessary$default$2$(this);
    }

    public Logger org$apache$spark$internal$Logging$$log_() {
        return this.org$apache$spark$internal$Logging$$log_;
    }

    public void org$apache$spark$internal$Logging$$log__$eq(Logger logger) {
        this.org$apache$spark$internal$Logging$$log_ = logger;
    }

    private long startRecieverTimeNs() {
        return this.startRecieverTimeNs;
    }

    public RpcEndpointRef partitionPerformanceReceiverRef() {
        return this.partitionPerformanceReceiverRef;
    }

    private String key(EventHubsConf eventHubsConf, NameAndPartition nameAndPartition) {
        return (eventHubsConf.connectionString() + eventHubsConf.consumerGroup() + nameAndPartition.partitionId()).toLowerCase();
    }

    @Override // org.apache.spark.eventhubs.client.CachedReceiver
    public Iterator<EventData> receive(EventHubsConf eventHubsConf, NameAndPartition nameAndPartition, long j, int i) {
        CachedEventHubsReceiver cachedEventHubsReceiver;
        long taskId = EventHubsUtils$.MODULE$.getTaskId();
        logInfo(() -> {
            return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"(TID ", ") EventHubsCachedReceiver look up. For namespaceUri ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(taskId), eventHubsConf.namespaceUri()})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"EventHubNameAndPartition ", " consumer group ", ". "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{nameAndPartition, eventHubsConf.consumerGroup().getOrElse(() -> {
                return package$.MODULE$.DefaultConsumerGroup();
            })})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"requestSeqNo: ", ", batchSize: ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(j), BoxesRunTime.boxToInteger(i)}));
        });
        HashMap<String, CachedEventHubsReceiver> hashMap = this.receivers;
        synchronized (hashMap) {
            cachedEventHubsReceiver = (CachedEventHubsReceiver) this.receivers.getOrElseUpdate(key(eventHubsConf, nameAndPartition), () -> {
                return MODULE$.apply(eventHubsConf, nameAndPartition, j);
            });
        }
        try {
            return cachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(j, i);
        } catch (CompletionException e) {
            Throwable cause = e.getCause();
            if (cause == null || !(cause instanceof RejectedExecutionException) || !cause.getMessage().contains("ReactorDispatcher instance is closed")) {
                if (cause == null || !(cause instanceof ReceiverDisconnectedException)) {
                    throw e;
                }
                logInfo(() -> {
                    return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"(TID ", ") EventHubsCachedReceiver receive execution for namespaceUri ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(taskId), eventHubsConf.namespaceUri()})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"EventHubNameAndPartition ", " consumer group ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{nameAndPartition, eventHubsConf.consumerGroup().getOrElse(() -> {
                        return package$.MODULE$.DefaultConsumerGroup();
                    })})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"failed because another receiver for the same <NS-EH-CG-Part> combo has been created and caused this one "})).s(Nil$.MODULE$) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"to get disconnected. The full error is: ", ". Throw the exception so that the driver can "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{e})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"retry the task."})).s(Nil$.MODULE$);
                });
                throw e;
            }
            logInfo(() -> {
                return new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"(TID ", ") EventHubsCachedReceiver receive execution for namespaceUri ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToLong(taskId), eventHubsConf.namespaceUri()})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"EventHubNameAndPartition ", " consumer group ", " "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{nameAndPartition, eventHubsConf.consumerGroup().getOrElse(() -> {
                    return package$.MODULE$.DefaultConsumerGroup();
                })})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"failed with ", ". Try to recreate the entire CachedEventHubsReceiver instance in order to "})).s(Predef$.MODULE$.genericWrapArray(new Object[]{e})) + new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"use a fresh EventHubClient from the underlying java SDK, then try receiving events again."})).s(Nil$.MODULE$);
            });
            cachedEventHubsReceiver.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$client().close();
            CachedEventHubsReceiver apply = apply(eventHubsConf, nameAndPartition, j);
            HashMap<String, CachedEventHubsReceiver> hashMap2 = this.receivers;
            synchronized (hashMap2) {
                this.receivers.update(key(eventHubsConf, nameAndPartition), apply);
                return apply.org$apache$spark$eventhubs$client$CachedEventHubsReceiver$$receive(j, i);
            }
        }
    }

    public CachedEventHubsReceiver apply(EventHubsConf eventHubsConf, NameAndPartition nameAndPartition, long j) {
        return new CachedEventHubsReceiver(eventHubsConf, nameAndPartition, j);
    }

    private CachedEventHubsReceiver$() {
        MODULE$ = this;
        Logging.$init$(this);
        this.startRecieverTimeNs = System.nanoTime();
        this.receivers = new HashMap<>();
        this.partitionPerformanceReceiverRef = RpcUtils$.MODULE$.makeDriverRef(PartitionPerformanceReceiver$.MODULE$.ENDPOINT_NAME(), SparkEnv$.MODULE$.get().conf(), SparkEnv$.MODULE$.get().rpcEnv());
    }
}
