package org.apache.beam.sdk.io.gcp.bigquery;

import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.Exceptions;
import com.google.cloud.bigquery.storage.v1.ProtoRows;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import io.grpc.Status;
import java.io.IOException;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryServices;
import org.apache.beam.sdk.io.gcp.bigquery.RetryManager;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiDynamicDestinations;
import org.apache.beam.sdk.io.gcp.bigquery.StorageApiFlushAndFinalizeDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Distribution;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Max;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
import org.apache.beam.sdk.transforms.windowing.Repeatedly;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.util.ShardedKey;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.Cache;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords.class */
public class StorageApiWritesShardedRecords<DestinationT, ElementT> extends PTransform<PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>>, PCollection<Void>> {
    private final StorageApiDynamicDestinations<ElementT, DestinationT> dynamicDestinations;
    private final BigQueryIO.Write.CreateDisposition createDisposition;
    private final String kmsKey;
    private final BigQueryServices bqServices;
    private final Coder<DestinationT> destinationCoder;
    private final Duration streamIdleTime = DEFAULT_STREAM_IDLE_TIME;
    private static final Logger LOG = LoggerFactory.getLogger(StorageApiWritesShardedRecords.class);
    private static final Duration DEFAULT_STREAM_IDLE_TIME = Duration.standardHours(1);
    private static final ExecutorService closeWriterExecutor = Executors.newCachedThreadPool();
    private static final Cache<String, BigQueryServices.StreamAppendClient> APPEND_CLIENTS = CacheBuilder.newBuilder().expireAfterAccess(5, TimeUnit.MINUTES).removalListener(removalNotification -> {
        BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient) removalNotification.getValue();
        ExecutorService executorService = closeWriterExecutor;
        Objects.requireNonNull(streamAppendClient);
        runAsyncIgnoreFailure(executorService, streamAppendClient::close);
    }).build();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords$ThrowingRunnable.class */
    public interface ThrowingRunnable {
        void run() throws Exception;
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords$WriteRecordsDoFn.class */
    public class WriteRecordsDoFn extends DoFn<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>, KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> {
        private TwoLevelMessageConverterCache<DestinationT, ElementT> messageConverters;
        private final Duration streamIdleTime;
        private final Counter recordsAppended = Metrics.counter(WriteRecordsDoFn.class, "recordsAppended");
        private final Counter streamsCreated = Metrics.counter(WriteRecordsDoFn.class, "streamsCreated");
        private final Counter streamsIdle = Metrics.counter(WriteRecordsDoFn.class, "idleStreamsFinalized");
        private final Counter appendFailures = Metrics.counter(WriteRecordsDoFn.class, "appendFailures");
        private final Counter appendOffsetFailures = Metrics.counter(WriteRecordsDoFn.class, "appendOffsetFailures");
        private final Counter flushesScheduled = Metrics.counter(WriteRecordsDoFn.class, "flushesScheduled");
        private final Distribution appendLatencyDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendLatencyDistributionMs");
        private final Distribution appendSizeDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendSizeDistribution");
        private final Distribution appendSplitDistribution = Metrics.distribution(WriteRecordsDoFn.class, "appendSplitDistribution");
        private Map<DestinationT, TableDestination> destinations = Maps.newHashMap();
        private transient BigQueryServices.DatasetService datasetServiceInternal = null;

        @DoFn.StateId("streamName")
        private final StateSpec<ValueState<String>> streamNameSpec = StateSpecs.value();

        @DoFn.StateId("streamOffset")
        private final StateSpec<ValueState<Long>> streamOffsetSpec = StateSpecs.value();

        @DoFn.TimerId("idleTimer")
        private final TimerSpec idleTimer = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

        /* JADX INFO: Access modifiers changed from: package-private */
        /* renamed from: org.apache.beam.sdk.io.gcp.bigquery.StorageApiWritesShardedRecords$WriteRecordsDoFn$1AppendRowsContext, reason: invalid class name */
        /* loaded from: input_file:org/apache/beam/sdk/io/gcp/bigquery/StorageApiWritesShardedRecords$WriteRecordsDoFn$1AppendRowsContext.class */
        public class C1AppendRowsContext extends RetryManager.Operation.Context<AppendRowsResponse> {
            final ShardedKey<DestinationT> key;
            String streamName = ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME;
            BigQueryServices.StreamAppendClient client = null;
            long offset = -1;
            long numRows = 0;
            long tryIteration = 0;

            C1AppendRowsContext(ShardedKey<DestinationT> shardedKey) {
                this.key = shardedKey;
            }

            public String toString() {
                return "Context: key=" + this.key + " streamName=" + this.streamName + " offset=" + this.offset + " numRows=" + this.numRows + " tryIteration: " + this.tryIteration;
            }
        }

        public WriteRecordsDoFn(String str, Duration duration) {
            this.messageConverters = new TwoLevelMessageConverterCache<>(str);
            this.streamIdleTime = duration;
        }

        @DoFn.StartBundle
        public void startBundle() throws IOException {
            this.destinations = Maps.newHashMap();
        }

        String getOrCreateStream(String str, ValueState<String> valueState, ValueState<Long> valueState2, Timer timer, BigQueryServices.DatasetService datasetService) throws IOException, InterruptedException {
            String str2 = (String) valueState.read();
            if (Strings.isNullOrEmpty(str2)) {
                str2 = datasetService.createWriteStream(str, WriteStream.Type.BUFFERED).getName();
                valueState.write(str2);
                valueState2.write(0L);
                this.streamsCreated.inc();
            }
            timer.offset(this.streamIdleTime).withNoOutputTimestamp().setRelative();
            return str2;
        }

        private BigQueryServices.DatasetService getDatasetService(PipelineOptions pipelineOptions) throws IOException {
            if (this.datasetServiceInternal == null) {
                this.datasetServiceInternal = StorageApiWritesShardedRecords.this.bqServices.getDatasetService((BigQueryOptions) pipelineOptions.as(BigQueryOptions.class));
            }
            return this.datasetServiceInternal;
        }

        @DoFn.Teardown
        public void onTeardown() {
            try {
                if (this.datasetServiceInternal != null) {
                    this.datasetServiceInternal.close();
                    this.datasetServiceInternal = null;
                }
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }

        /* JADX WARN: Multi-variable type inference failed */
        @DoFn.ProcessElement
        public void process(DoFn<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>, KV<String, StorageApiFlushAndFinalizeDoFn.Operation>>.ProcessContext processContext, PipelineOptions pipelineOptions, @DoFn.Element KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>> kv, @DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, @DoFn.TimerId("idleTimer") Timer timer, DoFn.OutputReceiver<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> outputReceiver) throws Exception {
            StorageApiWritesShardedRecords.this.dynamicDestinations.setSideInputAccessorFromProcessContext(processContext);
            String tableUrn = ((TableDestination) this.destinations.computeIfAbsent(((ShardedKey) kv.getKey()).getKey(), obj -> {
                TableDestination table = StorageApiWritesShardedRecords.this.dynamicDestinations.getTable(obj);
                Preconditions.checkArgument(table != null, "DynamicDestinations.getTable() may not return null, but %s returned null for destination %s", StorageApiWritesShardedRecords.this.dynamicDestinations, obj);
                return table;
            })).getTableUrn();
            BigQueryServices.DatasetService datasetService = getDatasetService(pipelineOptions);
            StorageApiDynamicDestinations.MessageConverter messageConverter = this.messageConverters.get(((ShardedKey) kv.getKey()).getKey(), StorageApiWritesShardedRecords.this.dynamicDestinations, datasetService);
            AtomicReference atomicReference = new AtomicReference(messageConverter.getSchemaDescriptor());
            SplittingIterable splittingIterable = new SplittingIterable((Iterable) kv.getValue(), 1048576L, (StorageApiDynamicDestinations.DescriptorWrapper) atomicReference.get(), l -> {
                try {
                    StorageApiWritesShardedRecords.LOG.info("Schema does not match. Querying BigQuery for the current table schema.");
                    messageConverter.refreshSchema(l.longValue());
                    atomicReference.set(messageConverter.getSchemaDescriptor());
                    String str = (String) valueState.read();
                    if (str != null) {
                        StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(str);
                    }
                    return (StorageApiDynamicDestinations.DescriptorWrapper) atomicReference.get();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            });
            BiConsumer biConsumer = (iterable, bool) -> {
                try {
                    if (bool.booleanValue()) {
                        valueState.write(ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME);
                    }
                    String orCreateStream = getOrCreateStream(tableUrn, valueState, valueState2, timer, datasetService);
                    BigQueryServices.StreamAppendClient streamAppendClient = (BigQueryServices.StreamAppendClient) StorageApiWritesShardedRecords.APPEND_CLIENTS.get(orCreateStream, () -> {
                        return datasetService.getStreamAppendClient(orCreateStream, ((StorageApiDynamicDestinations.DescriptorWrapper) atomicReference.get()).descriptor);
                    });
                    Iterator it = iterable.iterator();
                    while (it.hasNext()) {
                        C1AppendRowsContext c1AppendRowsContext = (C1AppendRowsContext) it.next();
                        c1AppendRowsContext.streamName = orCreateStream;
                        streamAppendClient.pin();
                        c1AppendRowsContext.client = streamAppendClient;
                        c1AppendRowsContext.offset = ((Long) valueState2.read()).longValue();
                        c1AppendRowsContext.tryIteration++;
                        valueState2.write(Long.valueOf(c1AppendRowsContext.offset + c1AppendRowsContext.numRows));
                    }
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            };
            Consumer consumer = iterable2 -> {
                StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(valueState.read());
                Iterator it = iterable2.iterator();
                while (it.hasNext()) {
                    C1AppendRowsContext c1AppendRowsContext = (C1AppendRowsContext) it.next();
                    if (c1AppendRowsContext.client != null) {
                        ExecutorService executorService = StorageApiWritesShardedRecords.closeWriterExecutor;
                        BigQueryServices.StreamAppendClient streamAppendClient = c1AppendRowsContext.client;
                        Objects.requireNonNull(streamAppendClient);
                        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                        c1AppendRowsContext.client = null;
                    }
                }
            };
            Instant now = Instant.now();
            ArrayList<C1AppendRowsContext> newArrayList = Lists.newArrayList();
            RetryManager retryManager = new RetryManager(Duration.standardSeconds(1L), Duration.standardSeconds(10L), 1000);
            int i = 0;
            for (ProtoRows protoRows : splittingIterable) {
                i++;
                Function function = c1AppendRowsContext -> {
                    try {
                        return ((BigQueryServices.StreamAppendClient) StorageApiWritesShardedRecords.APPEND_CLIENTS.get(c1AppendRowsContext.streamName, () -> {
                            return datasetService.getStreamAppendClient(c1AppendRowsContext.streamName, ((StorageApiDynamicDestinations.DescriptorWrapper) atomicReference.get()).descriptor);
                        })).appendRows(c1AppendRowsContext.offset, protoRows);
                    } catch (Exception e) {
                        throw new RuntimeException(e);
                    }
                };
                Function function2 = iterable3 -> {
                    C1AppendRowsContext c1AppendRowsContext2 = (C1AppendRowsContext) org.apache.beam.sdk.util.Preconditions.checkStateNotNull((C1AppendRowsContext) Iterables.getFirst(iterable3, (Object) null));
                    StorageApiWritesShardedRecords.LOG.error("Got error " + c1AppendRowsContext2.getError() + " closing " + c1AppendRowsContext2.streamName);
                    consumer.accept(newArrayList);
                    this.appendFailures.inc();
                    boolean z = c1AppendRowsContext2.getError() instanceof Exceptions.StreamFinalizedException;
                    Status.Code code = Status.fromThrowable((Throwable) org.apache.beam.sdk.util.Preconditions.checkStateNotNull(c1AppendRowsContext2.getError())).getCode();
                    boolean z2 = code.equals(Status.Code.OUT_OF_RANGE) || code.equals(Status.Code.ALREADY_EXISTS);
                    boolean z3 = z || code.equals(Status.Code.INVALID_ARGUMENT) || code.equals(Status.Code.NOT_FOUND) || code.equals(Status.Code.FAILED_PRECONDITION);
                    if (!z2 && !z3) {
                        return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                    }
                    this.appendOffsetFailures.inc();
                    StorageApiWritesShardedRecords.LOG.warn("Append to " + c1AppendRowsContext2 + " failed with " + c1AppendRowsContext2.getError() + " Will retry with a new stream");
                    outputReceiver.output(KV.of(c1AppendRowsContext2.streamName, new StorageApiFlushAndFinalizeDoFn.Operation(c1AppendRowsContext2.offset - 1, true)));
                    biConsumer.accept(iterable3, true);
                    return RetryManager.RetryType.RETRY_ALL_OPERATIONS;
                };
                Consumer consumer2 = c1AppendRowsContext2 -> {
                    outputReceiver.output(KV.of(c1AppendRowsContext2.streamName, new StorageApiFlushAndFinalizeDoFn.Operation((c1AppendRowsContext2.offset + c1AppendRowsContext2.numRows) - 1, false)));
                    this.flushesScheduled.inc(protoRows.getSerializedRowsCount());
                };
                C1AppendRowsContext c1AppendRowsContext3 = new C1AppendRowsContext((ShardedKey) kv.getKey());
                c1AppendRowsContext3.numRows = protoRows.getSerializedRowsCount();
                newArrayList.add(c1AppendRowsContext3);
                retryManager.addOperation(function, function2, consumer2, c1AppendRowsContext3);
                this.recordsAppended.inc(protoRows.getSerializedRowsCount());
                this.appendSizeDistribution.update(c1AppendRowsContext3.numRows);
            }
            biConsumer.accept(newArrayList, false);
            try {
                retryManager.run(true);
                for (C1AppendRowsContext c1AppendRowsContext4 : newArrayList) {
                    if (c1AppendRowsContext4.client != null) {
                        ExecutorService executorService = StorageApiWritesShardedRecords.closeWriterExecutor;
                        BigQueryServices.StreamAppendClient streamAppendClient = c1AppendRowsContext4.client;
                        Objects.requireNonNull(streamAppendClient);
                        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(executorService, streamAppendClient::unpin);
                    }
                }
                this.appendSplitDistribution.update(i);
                this.appendLatencyDistribution.update(java.time.Duration.between(now, Instant.now()).toMillis());
                timer.offset(this.streamIdleTime).withNoOutputTimestamp().setRelative();
            } catch (Throwable th) {
                for (C1AppendRowsContext c1AppendRowsContext5 : newArrayList) {
                    if (c1AppendRowsContext5.client != null) {
                        ExecutorService executorService2 = StorageApiWritesShardedRecords.closeWriterExecutor;
                        BigQueryServices.StreamAppendClient streamAppendClient2 = c1AppendRowsContext5.client;
                        Objects.requireNonNull(streamAppendClient2);
                        StorageApiWritesShardedRecords.runAsyncIgnoreFailure(executorService2, streamAppendClient2::unpin);
                    }
                }
                throw th;
            }
        }

        private void finalizeStream(@DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, DoFn.OutputReceiver<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> outputReceiver) {
            String str = (String) MoreObjects.firstNonNull((String) valueState.read(), ChangeStreamsConstants.DEFAULT_CHANGE_STREAM_NAME);
            if (Strings.isNullOrEmpty(str)) {
                return;
            }
            outputReceiver.output(KV.of(str, new StorageApiFlushAndFinalizeDoFn.Operation(((Long) MoreObjects.firstNonNull((Long) valueState2.read(), 0L)).longValue() - 1, true)));
            valueState.clear();
            valueState2.clear();
            StorageApiWritesShardedRecords.APPEND_CLIENTS.invalidate(str);
        }

        @DoFn.OnTimer("idleTimer")
        public void onTimer(@DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, DoFn.OutputReceiver<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> outputReceiver) {
            finalizeStream(valueState, valueState2, outputReceiver);
            this.streamsIdle.inc();
        }

        @DoFn.OnWindowExpiration
        public void onWindowExpiration(@DoFn.StateId("streamName") @DoFn.AlwaysFetched ValueState<String> valueState, @DoFn.StateId("streamOffset") @DoFn.AlwaysFetched ValueState<Long> valueState2, DoFn.OutputReceiver<KV<String, StorageApiFlushAndFinalizeDoFn.Operation>> outputReceiver) {
            finalizeStream(valueState, valueState2, outputReceiver);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public static void clearCache() {
        APPEND_CLIENTS.invalidateAll();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void runAsyncIgnoreFailure(ExecutorService executorService, ThrowingRunnable throwingRunnable) {
        executorService.submit(() -> {
            try {
                throwingRunnable.run();
            } catch (Exception e) {
            }
        });
    }

    public StorageApiWritesShardedRecords(StorageApiDynamicDestinations<ElementT, DestinationT> storageApiDynamicDestinations, BigQueryIO.Write.CreateDisposition createDisposition, String str, BigQueryServices bigQueryServices, Coder<DestinationT> coder) {
        this.dynamicDestinations = storageApiDynamicDestinations;
        this.createDisposition = createDisposition;
        this.kmsKey = str;
        this.bqServices = bigQueryServices;
        this.destinationCoder = coder;
    }

    public PCollection<Void> expand(PCollection<KV<ShardedKey<DestinationT>, Iterable<StorageApiWritePayload>>> pCollection) {
        PCollection apply = pCollection.apply("Write Records", ParDo.of(new WriteRecordsDoFn(pCollection.getName() + "/" + getName(), this.streamIdleTime)).withSideInputs(this.dynamicDestinations.getSideInputs()));
        try {
            SchemaRegistry schemaRegistry = pCollection.getPipeline().getSchemaRegistry();
            return apply.setCoder(KvCoder.of(StringUtf8Coder.of(), SchemaCoder.of(schemaRegistry.getSchema(StorageApiFlushAndFinalizeDoFn.Operation.class), TypeDescriptor.of(StorageApiFlushAndFinalizeDoFn.Operation.class), schemaRegistry.getToRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class), schemaRegistry.getFromRowFunction(StorageApiFlushAndFinalizeDoFn.Operation.class)))).apply(Window.configure().triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane().plusDelayOf(Duration.standardSeconds(1L)))).discardingFiredPanes()).apply("maxFlushPosition", Combine.perKey(Max.naturalOrder(new StorageApiFlushAndFinalizeDoFn.Operation(-1L, false)))).apply("Flush and finalize writes", ParDo.of(new StorageApiFlushAndFinalizeDoFn(this.bqServices)));
        } catch (NoSuchSchemaException e) {
            throw new RuntimeException((Throwable) e);
        }
    }
}
