package org.apache.beam.fn.harness.logging;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Formatter;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.LogManager;
import java.util.logging.LogRecord;
import java.util.logging.Logger;
import java.util.logging.SimpleFormatter;
import org.apache.beam.fn.harness.control.ProcessBundleHandler;
import org.apache.beam.model.fnexecution.v1.BeamFnApi;
import org.apache.beam.model.fnexecution.v1.BeamFnLoggingGrpc;
import org.apache.beam.model.pipeline.v1.Endpoints;
import org.apache.beam.sdk.fn.stream.AdvancingPhaser;
import org.apache.beam.sdk.fn.stream.DirectStreamObserver;
import org.apache.beam.sdk.options.ExecutorOptions;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.SdkHarnessOptions;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Timestamp;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.Value;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.ManagedChannel;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientCallStreamObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.ClientResponseObserver;
import org.apache.beam.vendor.grpc.v1p54p0.io.grpc.stub.StreamObserver;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Throwables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.checkerframework.checker.nullness.qual.RequiresNonNull;
import org.checkerframework.dataflow.qual.SideEffectFree;
import org.slf4j.MDC;

/* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient.class */
public class BeamFnLoggingClient implements AutoCloseable {
    private static final String ROOT_LOGGER_NAME = "";
    private static final int MAX_BUFFERED_LOG_ENTRY_COUNT = 10000;
    private final Endpoints.ApiServiceDescriptor apiServiceDescriptor;
    private final StreamWriter streamWriter;
    private final LogRecordHandler logRecordHandler;
    private ProcessBundleHandler processBundleHandler;
    private final CompletableFuture<?> bufferedLogConsumer;
    private static final ImmutableMap<Level, BeamFnApi.LogEntry.Severity.Enum> LOG_LEVEL_MAP = ImmutableMap.builder().put(Level.SEVERE, BeamFnApi.LogEntry.Severity.Enum.ERROR).put(Level.WARNING, BeamFnApi.LogEntry.Severity.Enum.WARN).put(Level.INFO, BeamFnApi.LogEntry.Severity.Enum.INFO).put(Level.FINE, BeamFnApi.LogEntry.Severity.Enum.DEBUG).put(Level.FINEST, BeamFnApi.LogEntry.Severity.Enum.TRACE).build();
    private static final ImmutableMap<BeamFnApi.LogEntry.Severity.Enum, Level> REVERSE_LOG_LEVEL_MAP = ImmutableMap.builder().putAll(LOG_LEVEL_MAP.asMultimap().inverse().entries()).build();
    private static final Formatter DEFAULT_FORMATTER = new SimpleFormatter();
    private static final Object COMPLETED = new Object();
    private final Collection<Logger> configuredLoggers = new ArrayList();
    private final BlockingQueue<BeamFnApi.LogEntry> bufferedLogEntries = new ArrayBlockingQueue(MAX_BUFFERED_LOG_ENTRY_COUNT);
    private Thread logEntryHandlerThread = null;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$LogRecordHandler.class */
    public class LogRecordHandler extends Handler {
        private final boolean logMdc;

        LogRecordHandler(boolean z) {
            this.logMdc = z;
        }

        @Override // java.util.logging.Handler
        public void publish(LogRecord logRecord) {
            Map copyOfContextMap;
            ProcessBundleHandler.BundleProcessor find;
            String currentThreadsPTransformId;
            BeamFnApi.LogEntry.Severity.Enum r0 = (BeamFnApi.LogEntry.Severity.Enum) BeamFnLoggingClient.LOG_LEVEL_MAP.get(logRecord.getLevel());
            if (r0 == null) {
                return;
            }
            BeamFnApi.LogEntry.Builder timestamp = BeamFnApi.LogEntry.newBuilder().setSeverity(r0).setMessage(getFormatter().formatMessage(logRecord)).setThread(Integer.toString(logRecord.getThreadID())).setTimestamp(Timestamp.newBuilder().setSeconds(logRecord.getMillis() / 1000).setNanos(((int) (logRecord.getMillis() % 1000)) * 1000000));
            String instructionId = BeamFnLoggingMDC.getInstructionId();
            if (instructionId != null) {
                timestamp.setInstructionId(instructionId);
            }
            Throwable thrown = logRecord.getThrown();
            if (thrown != null) {
                timestamp.setTrace(Throwables.getStackTraceAsString(thrown));
            }
            String loggerName = logRecord.getLoggerName();
            if (loggerName != null) {
                timestamp.setLogLocation(loggerName);
            }
            if (instructionId != null && BeamFnLoggingClient.this.processBundleHandler != null && (find = BeamFnLoggingClient.this.processBundleHandler.getBundleProcessorCache().find(instructionId)) != null && (currentThreadsPTransformId = find.getStateTracker().getCurrentThreadsPTransformId()) != null) {
                timestamp.setTransformId(currentThreadsPTransformId);
            }
            if (this.logMdc && (copyOfContextMap = MDC.getCopyOfContextMap()) != null) {
                Struct.Builder customDataBuilder = timestamp.getCustomDataBuilder();
                copyOfContextMap.forEach((str, str2) -> {
                    customDataBuilder.putFields(str, Value.newBuilder().setStringValue(str2).build());
                });
            }
            if (Thread.currentThread() == BeamFnLoggingClient.this.logEntryHandlerThread) {
                dropIfBufferFull(timestamp.m742build());
                return;
            }
            try {
                BeamFnLoggingClient.this.bufferedLogEntries.put(timestamp.m742build());
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
                throw new RuntimeException(e);
            }
        }

        private boolean dropIfBufferFull(BeamFnApi.LogEntry logEntry) {
            return BeamFnLoggingClient.this.bufferedLogEntries.offer(logEntry);
        }

        @Override // java.util.logging.Handler
        public void flush() {
        }

        @Override // java.util.logging.Handler
        public synchronized void close() {
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$StreamWriter.class */
    public static class StreamWriter {
        private final ManagedChannel channel;
        private final StreamObserver<BeamFnApi.LogEntry.List> outboundObserver;
        private final LogControlObserver inboundObserver;
        private final CompletableFuture<Object> softClosing = new CompletableFuture<>();
        private final CompletableFuture<Object> inboundObserverCompletion = new CompletableFuture<>();
        private final AdvancingPhaser streamPhaser = new AdvancingPhaser(1);

        /* loaded from: input_file:org/apache/beam/fn/harness/logging/BeamFnLoggingClient$StreamWriter$LogControlObserver.class */
        private class LogControlObserver implements ClientResponseObserver<BeamFnApi.LogEntry, BeamFnApi.LogControl> {
            private LogControlObserver() {
            }

            public void beforeStart(ClientCallStreamObserver<BeamFnApi.LogEntry> clientCallStreamObserver) {
                AdvancingPhaser advancingPhaser = StreamWriter.this.streamPhaser;
                Objects.requireNonNull(advancingPhaser);
                clientCallStreamObserver.setOnReadyHandler(advancingPhaser::arrive);
            }

            public void onNext(BeamFnApi.LogControl logControl) {
            }

            public void onError(Throwable th) {
                StreamWriter.this.inboundObserverCompletion.completeExceptionally(th);
                StreamWriter.this.hardClose();
            }

            public void onCompleted() {
                StreamWriter.this.inboundObserverCompletion.complete(BeamFnLoggingClient.COMPLETED);
                StreamWriter.this.hardClose();
            }
        }

        public StreamWriter(ManagedChannel managedChannel) {
            this.channel = managedChannel;
            BeamFnLoggingGrpc.BeamFnLoggingStub newStub = BeamFnLoggingGrpc.newStub(managedChannel);
            this.inboundObserver = new LogControlObserver();
            this.outboundObserver = new DirectStreamObserver(this.streamPhaser, newStub.logging(this.inboundObserver));
        }

        /* JADX WARN: Finally extract failed */
        public void drainQueueToStream(BlockingQueue<BeamFnApi.LogEntry> blockingQueue) {
            try {
                try {
                    ArrayList arrayList = new ArrayList(BeamFnLoggingClient.MAX_BUFFERED_LOG_ENTRY_COUNT);
                    while (!this.streamPhaser.isTerminated()) {
                        BeamFnApi.LogEntry poll = blockingQueue.poll(1L, TimeUnit.SECONDS);
                        if (poll != null) {
                            BeamFnApi.LogEntry.List.Builder addLogEntries = BeamFnApi.LogEntry.List.newBuilder().addLogEntries(poll);
                            blockingQueue.drainTo(arrayList);
                            addLogEntries.addAllLogEntries(arrayList);
                            this.outboundObserver.onNext(addLogEntries.m789build());
                            arrayList.clear();
                        } else if (this.softClosing.isDone()) {
                            break;
                        }
                    }
                    if (this.inboundObserverCompletion.isDone()) {
                        try {
                            this.inboundObserverCompletion.get();
                            throw new IllegalStateException("Logging stream terminated unexpectedly with success before it was closed by the client.");
                        } catch (InterruptedException e) {
                            Thread.currentThread().interrupt();
                            throw new RuntimeException(e);
                        } catch (ExecutionException e2) {
                            throw new IllegalStateException("Logging stream terminated unexpectedly before it was closed by the client with error: " + e2.getCause());
                        }
                    }
                    if (0 == 0) {
                        this.outboundObserver.onCompleted();
                    } else {
                        this.outboundObserver.onError((Throwable) null);
                    }
                    this.channel.shutdown();
                    boolean z = false;
                    try {
                        try {
                            z = this.channel.awaitTermination(10L, TimeUnit.SECONDS);
                            if (z) {
                                return;
                            }
                            this.channel.shutdownNow();
                        } catch (InterruptedException e3) {
                            Thread.currentThread().interrupt();
                            if (z) {
                                return;
                            }
                            this.channel.shutdownNow();
                        }
                    } catch (Throwable th) {
                        if (!z) {
                            this.channel.shutdownNow();
                        }
                        throw th;
                    }
                } finally {
                    RuntimeException runtimeException = new RuntimeException(e);
                }
            } catch (Throwable th2) {
                if (0 == 0) {
                    this.outboundObserver.onCompleted();
                } else {
                    this.outboundObserver.onError((Throwable) null);
                }
                this.channel.shutdown();
                boolean z2 = false;
                try {
                    try {
                        z2 = this.channel.awaitTermination(10L, TimeUnit.SECONDS);
                        if (!z2) {
                            this.channel.shutdownNow();
                        }
                    } catch (InterruptedException e4) {
                        Thread.currentThread().interrupt();
                        if (!z2) {
                            this.channel.shutdownNow();
                        }
                        throw th2;
                    }
                    throw th2;
                } catch (Throwable th3) {
                    if (!z2) {
                        this.channel.shutdownNow();
                    }
                    throw th3;
                }
            }
        }

        public void softClose() {
            this.softClosing.complete(BeamFnLoggingClient.COMPLETED);
        }

        public void hardClose() {
            this.streamPhaser.forceTermination();
        }
    }

    public static BeamFnLoggingClient createAndStart(PipelineOptions pipelineOptions, Endpoints.ApiServiceDescriptor apiServiceDescriptor, Function<Endpoints.ApiServiceDescriptor, ManagedChannel> function) {
        return new BeamFnLoggingClient(apiServiceDescriptor, new StreamWriter(function.apply(apiServiceDescriptor)), pipelineOptions.as(SdkHarnessOptions.class).getLogMdc(), pipelineOptions.as(ExecutorOptions.class).getScheduledExecutorService(), pipelineOptions.as(SdkHarnessOptions.class));
    }

    private BeamFnLoggingClient(Endpoints.ApiServiceDescriptor apiServiceDescriptor, StreamWriter streamWriter, boolean z, ScheduledExecutorService scheduledExecutorService, SdkHarnessOptions sdkHarnessOptions) {
        this.apiServiceDescriptor = apiServiceDescriptor;
        this.streamWriter = streamWriter;
        this.logRecordHandler = new LogRecordHandler(z);
        this.logRecordHandler.setLevel(Level.ALL);
        this.logRecordHandler.setFormatter(DEFAULT_FORMATTER);
        CompletableFuture completableFuture = new CompletableFuture();
        this.bufferedLogConsumer = CompletableFuture.supplyAsync(() -> {
            try {
                this.logEntryHandlerThread = Thread.currentThread();
                installLogging(sdkHarnessOptions);
                completableFuture.complete(COMPLETED);
                streamWriter.drainQueueToStream(this.bufferedLogEntries);
                restoreLoggers();
                flushFinalLogs();
                return COMPLETED;
            } catch (Throwable th) {
                restoreLoggers();
                flushFinalLogs();
                throw th;
            }
        }, scheduledExecutorService);
        try {
            CompletableFuture.anyOf(this.bufferedLogConsumer, completableFuture).get();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new RuntimeException(e);
        } catch (ExecutionException e2) {
            throw new RuntimeException("Error starting background log thread " + e2.getCause());
        }
    }

    @RequiresNonNull.List({@RequiresNonNull({"logRecordHandler"}), @RequiresNonNull({"configuredLoggers"})})
    private void installLogging(SdkHarnessOptions sdkHarnessOptions) {
        LogManager logManager = LogManager.getLogManager();
        logManager.reset();
        Logger logger = logManager.getLogger(ROOT_LOGGER_NAME);
        for (Handler handler : logger.getHandlers()) {
            logger.removeHandler(handler);
        }
        this.configuredLoggers.addAll(SdkHarnessOptions.getConfiguredLoggerFromOptions(sdkHarnessOptions));
        logger.addHandler(this.logRecordHandler);
    }

    @Override // java.lang.AutoCloseable
    public void close() throws Exception {
        Preconditions.checkNotNull(this.bufferedLogConsumer, "BeamFnLoggingClient not fully started");
        try {
            try {
                this.streamWriter.softClose();
                this.bufferedLogConsumer.get(10L, TimeUnit.SECONDS);
            } catch (TimeoutException e) {
                this.streamWriter.hardClose();
                this.bufferedLogConsumer.get();
            }
        } catch (ExecutionException e2) {
            if (!(e2.getCause() instanceof Exception)) {
                throw e2;
            }
            throw ((Exception) e2.getCause());
        }
    }

    public void setProcessBundleHandler(ProcessBundleHandler processBundleHandler) {
        this.processBundleHandler = processBundleHandler;
    }

    @RequiresNonNull.List({@RequiresNonNull({"configuredLoggers"}), @RequiresNonNull({"logRecordHandler"})})
    private void restoreLoggers() {
        for (Logger logger : this.configuredLoggers) {
            logger.setLevel(null);
            logger.removeHandler(this.logRecordHandler);
        }
        this.configuredLoggers.clear();
        LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).removeHandler(this.logRecordHandler);
        try {
            LogManager.getLogManager().readConfiguration();
        } catch (IOException e) {
            System.out.print("Unable to restore log managers from configuration: " + e.toString());
        }
    }

    @RequiresNonNull({"bufferedLogEntries"})
    void flushFinalLogs() {
        ArrayList<BeamFnApi.LogEntry> arrayList = new ArrayList(MAX_BUFFERED_LOG_ENTRY_COUNT);
        this.bufferedLogEntries.drainTo(arrayList);
        for (BeamFnApi.LogEntry logEntry : arrayList) {
            LogRecord logRecord = new LogRecord((Level) REVERSE_LOG_LEVEL_MAP.get(logEntry.getSeverity()), logEntry.getMessage());
            logRecord.setLoggerName(logEntry.getLogLocation());
            logRecord.setMillis((logEntry.getTimestamp().getSeconds() * 1000) + (logEntry.getTimestamp().getNanos() / 1000000));
            logRecord.setThreadID(Integer.parseInt(logEntry.getThread()));
            if (!logEntry.getTrace().isEmpty()) {
                logRecord.setThrown(new Throwable(logEntry.getTrace()));
            }
            LogManager.getLogManager().getLogger(ROOT_LOGGER_NAME).log(logRecord);
        }
    }

    public CompletableFuture<?> terminationFuture() {
        Preconditions.checkNotNull(this.bufferedLogConsumer, "BeamFnLoggingClient not fully started");
        return this.bufferedLogConsumer;
    }

    @SideEffectFree
    public String toString() {
        return MoreObjects.toStringHelper(BeamFnLoggingClient.class).add("apiServiceDescriptor", this.apiServiceDescriptor).toString();
    }
}
