package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.ByteString;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords.class */
public class StorageApiWriteUnshardedRecords<DestinationT, ElementT> extends PTransform<PCollection<KV<DestinationT, byte[]>>, PCollection<Void>> {
    private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final BigQueryIO.Write.CreateDisposition createDisposition;
    private final String kmsKey;
    private final BigQueryServices bqServices;
    private final Coder<DestinationT> destinationCoder;
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiWriteUnshardedRecords.class);
    private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
    private static final Cache<String, BigQueryServices.StreamAppendClient> APPEND_CLIENTS = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).removalListener(removalNotification -> {
        BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient) removalNotification.getValue();
        ExecutorService executorService = closeWriterExecutor;
        Objects.requireNonNull(streamAppendClient);
        runAsyncIgnoreFailure(executorService, streamAppendClient::close);
    }).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$ThrowingRunnable.class */
    public interface ThrowingRunnable {
        void run() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$WriteRecordsDoFn.class */
    public static class WriteRecordsDoFn<DestinationT, ElementT> extends DoFn<KV<DestinationT, byte[]>, KV<String, String>> {
        private final TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;

        @Nullable
        private transient BigQueryServices.DatasetService datasetService;
        private static final int FLUSH_THRESHOLD_RECORDS = 100;
        private static final int FLUSH_THRESHOLD_RECORD_BYTES = 2097152;
        private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
        private final BigQueryServices bqServices;
        private final Coder<DestinationT> destinationCoder;
        private final BigQueryIO.Write.CreateDisposition createDisposition;
        private final String kmsKey;
        private final boolean useDefaultStream;
        private Map<DestinationT, WriteRecordsDoFn<DestinationT, ElementT>.DestinationState> destinations = Maps.newHashMap();
        private int numPendingRecords = 0;
        private int numPendingRecordBytes = 0;

        /* JADX INFO: Access modifiers changed from: package-private */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWriteUnshardedRecords$WriteRecordsDoFn$DestinationState.class */
        public class DestinationState {
            private final String tableUrn;
            private final StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter;

            @Nullable
            private transient BigQueryServices.DatasetService datasetService;
            private final boolean useDefaultStream;
            private String streamName = ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;

            @Nullable
            private BigQueryServices.StreamAppendClient streamAppendClient = null;
            private long currentOffset = 0;
            private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
            private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
            private List<ByteString> pendingMessages = Lists.newArrayList();

            public DestinationState(String str, StorageApiDynamicDestinations.MessageConverter<ElementT> messageConverter, BigQueryServices.DatasetService datasetService, boolean z) {
                this.tableUrn = str;
                this.messageConverter = messageConverter;
                this.datasetService = datasetService;
                this.useDefaultStream = z;
            }

            void teardown() {
                if (this.streamAppendClient != null) {
                    ExecutorService executorService = StorageApiWriteUnshardedRecords.closeWriterExecutor;
                    BigQueryServices.StreamAppendClient streamAppendClient = this.streamAppendClient;
                    Objects.requireNonNull(streamAppendClient);
                    StorageApiWriteUnshardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                }
            }

            String getDefaultStreamName() {
                return BigQueryHelpers.stripPartitionDecorator(this.tableUrn) + "/streams/_default";
            }

            BigQueryServices.StreamAppendClient getWriteStream() {
                try {
                    if (this.streamAppendClient == null) {
                        if (this.useDefaultStream) {
                            this.streamName = getDefaultStreamName();
                        } else {
                            this.streamName = ((BigQueryServices.DatasetService) Preconditions.checkNotNull(this.datasetService)).createWriteStream(this.tableUrn, WriteStream.Type.PENDING).getName();
                        }
                        synchronized (StorageApiWriteUnshardedRecords.APPEND_CLIENTS) {
                            this.streamAppendClient = (BigQueryServices.StreamAppendClient) StorageApiWriteUnshardedRecords.APPEND_CLIENTS.get(this.streamName, () -> {
                                return this.datasetService.getStreamAppendClient(this.streamName, this.messageConverter.getSchemaDescriptor());
                            });
                            this.streamAppendClient.pin();
                        }
                        this.currentOffset = 0L;
                    }
                    return this.streamAppendClient;
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }

            void invalidateWriteStream() {
                if (this.streamAppendClient != null) {
                    synchronized (StorageApiWriteUnshardedRecords.APPEND_CLIENTS) {
                        ExecutorService executorService = StorageApiWriteUnshardedRecords.closeWriterExecutor;
                        BigQueryServices.StreamAppendClient streamAppendClient = this.streamAppendClient;
                        Objects.requireNonNull(streamAppendClient);
                        StorageApiWriteUnshardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                        StorageApiWriteUnshardedRecords.APPEND_CLIENTS.invalidate(this.streamName);
                    }
                    this.streamAppendClient = null;
                }
            }

            void addMessage(byte[] bArr) throws Exception {
                this.pendingMessages.add(ByteString.copyFrom(bArr));
            }

            void flush(RetryManager<AppendRowsResponse, RetryManager.Operation.Context<AppendRowsResponse>> retryManager) throws Exception {
                if (this.pendingMessages.isEmpty()) {
                    return;
                }
                ProtoRows.Builder newBuilder = ProtoRows.newBuilder();
                Iterator<ByteString> it = this.pendingMessages.iterator();
                while (it.hasNext()) {
                    newBuilder.addSerializedRows(it.next());
                }
                ProtoRows build = newBuilder.build();
                this.pendingMessages.clear();
                retryManager.addOperation(context -> {
                    try {
                        BigQueryServices.StreamAppendClient writeStream = getWriteStream();
                        long j = -1;
                        if (!this.useDefaultStream) {
                            j = this.currentOffset;
                            this.currentOffset += newBuilder.getSerializedRowsCount();
                        }
                        return writeStream.appendRows(j, build);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                }, iterable -> {
                    StorageApiWriteUnshardedRecords.LOG.info("Append to stream " + this.streamName + " failed with error " + ((RetryManager.Operation.Context) Iterables.getFirst(iterable, (Object) null)).getError());
                    invalidateWriteStream();
                    this.appendFailures.inc();
                    return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                }, context2 -> {
                    this.recordsAppended.inc(build.getSerializedRowsCount());
                }, new RetryManager.Operation.Context<>());
            }
        }

        /* JADX INFO: Access modifiers changed from: package-private */
        public WriteRecordsDoFn(String str, StorageApiDynamicDestinations<ElementT, DestinationT> storageApiDynamicDestinations, BigQueryServices bigQueryServices, Coder<DestinationT> coder, BigQueryIO.Write.CreateDisposition createDisposition, String str2, boolean z) {
            this.messageConverters = new TwoLevelMessageConverterCache<>(str);
            this.dynamicDestinations = storageApiDynamicDestinations;
            this.bqServices = bigQueryServices;
            this.destinationCoder = coder;
            this.createDisposition = createDisposition;
            this.kmsKey = str2;
            this.useDefaultStream = z;
        }

        boolean shouldFlush() {
            return this.numPendingRecords > FLUSH_THRESHOLD_RECORDS || this.numPendingRecordBytes > FLUSH_THRESHOLD_RECORD_BYTES;
        }

        void flushIfNecessary() throws Exception {
            if (shouldFlush()) {
                flushAll();
            }
        }

        void flushAll() throws Exception {
            RetryManager<AppendRowsResponse, RetryManager.Operation.Context<AppendRowsResponse>> retryManager = new RetryManager<>(Duration.standardSeconds(1L), Duration.standardSeconds(10L), 1000);
            Iterator<WriteRecordsDoFn<DestinationT, ElementT>.DestinationState> it = this.destinations.values().iterator();
            while (it.hasNext()) {
                it.next().flush(retryManager);
            }
            retryManager.run(true);
            this.numPendingRecords = 0;
            this.numPendingRecordBytes = 0;
        }

        private void initializeDatasetService(PipelineOptions pipelineOptions) {
            if (this.datasetService == null) {
                this.datasetService = this.bqServices.getDatasetService((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            }
        }

        @DoFn.StartBundle
        public void startBundle() throws IOException {
            this.destinations = Maps.newHashMap();
            this.numPendingRecords = 0;
            this.numPendingRecordBytes = 0;
        }

        WriteRecordsDoFn<DestinationT, ElementT>.DestinationState createDestinationState(DoFn<KV<DestinationT, byte[]>, KV<String, String>>.ProcessContext processContext, DestinationT destinationt, BigQueryServices.DatasetService datasetService) {
            TableDestination table = this.dynamicDestinations.getTable(destinationt);
            Preconditions.checkArgument(table != null, "DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", this.dynamicDestinations, destinationt);
            try {
                return new DestinationState(CreateTableHelpers.possiblyCreateTable(processContext, table, () -> {
                    return this.dynamicDestinations.getSchema(destinationt);
                }, this.createDisposition, this.destinationCoder, this.kmsKey, this.bqServices).getTableUrn(), this.messageConverters.get(destinationt, this.dynamicDestinations, datasetService), datasetService, this.useDefaultStream);
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        @DoFn.ProcessElement
        public void process(DoFn<KV<DestinationT, byte[]>, KV<String, String>>.ProcessContext processContext, PipelineOptions pipelineOptions, @DoFn.Element KV<DestinationT, byte[]> kv) throws Exception {
            initializeDatasetService(pipelineOptions);
            this.dynamicDestinations.setSideInputAccessorFromProcessContext(processContext);
            DestinationState destinationState = (DestinationState) this.destinations.computeIfAbsent(kv.getKey(), obj -> {
                return createDestinationState(processContext, obj, this.datasetService);
            });
            flushIfNecessary();
            destinationState.addMessage((byte[]) kv.getValue());
            this.numPendingRecords++;
            this.numPendingRecordBytes += ((byte[]) kv.getValue()).length;
        }

        @DoFn.FinishBundle
        public void finishBundle(DoFn<KV<DestinationT, byte[]>, KV<String, String>>.FinishBundleContext finishBundleContext) throws Exception {
            flushAll();
            for (WriteRecordsDoFn<DestinationT, ElementT>.DestinationState destinationState : this.destinations.values()) {
                if (!this.useDefaultStream) {
                    finishBundleContext.output(KV.of(((DestinationState) destinationState).tableUrn, ((DestinationState) destinationState).streamName), BoundedWindow.TIMESTAMP_MAX_VALUE.minus(Duration.millis(1L)), GlobalWindow.INSTANCE);
                }
                destinationState.teardown();
            }
            this.destinations.clear();
            this.destinations = null;
        }

        @DoFn.Teardown
        public void teardown() {
            this.destinations = null;
            try {
                if (this.datasetService != null) {
                    this.datasetService.close();
                    this.datasetService = null;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void runAsyncIgnoreFailure(ExecutorService executorService, ThrowingRunnable throwingRunnable) {
        executorService.submit(() -> {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
            }
        });
    }

    public StorageApiWriteUnshardedRecords(StorageApiDynamicDestinations<ElementT, DestinationT> storageApiDynamicDestinations, BigQueryIO.Write.CreateDisposition createDisposition, String str, BigQueryServices bigQueryServices, Coder<DestinationT> coder) {
        this.dynamicDestinations = storageApiDynamicDestinations;
        this.createDisposition = createDisposition;
        this.kmsKey = str;
        this.bqServices = bigQueryServices;
        this.destinationCoder = coder;
    }

    public PCollection<Void> expand(PCollection<KV<DestinationT, byte[]>> pCollection) {
        return pCollection.apply("Write Records", ParDo.of(new WriteRecordsDoFn(pCollection.getName() + "/" + getName(), this.dynamicDestinations, this.bqServices, this.destinationCoder, this.createDisposition, this.kmsKey, false)).withSideInputs(this.dynamicDestinations.getSideInputs())).setCoder(KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())).apply("Reshuffle", Reshuffle.of()).apply("Finalize writes", ParDo.of(new StorageApiFinalizeWritesDoFn(this.bqServices)));
    }
}
