package io.trino.plugin.exchange.filesystem.azure;

import com.azure.core.http.rest.PagedResponse;
import com.azure.core.util.BinaryData;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobServiceAsyncClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.batch.BlobBatchAsyncClient;
import com.azure.storage.blob.batch.BlobBatchClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.DeleteSnapshotsOptionType;
import com.azure.storage.blob.models.DownloadRetryOptions;
import com.azure.storage.blob.models.ListBlobsOptions;
import com.azure.storage.blob.specialized.BlockBlobAsyncClient;
import com.azure.storage.common.policy.RequestRetryOptions;
import com.azure.storage.common.policy.RetryPolicyType;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import com.google.common.base.Verify;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.FluentFuture;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import com.google.inject.Inject;
import io.airlift.concurrent.MoreFutures;
import io.airlift.slice.SizeOf;
import io.airlift.slice.Slice;
import io.airlift.slice.SliceInput;
import io.airlift.slice.Slices;
import io.trino.annotation.NotThreadSafe;
import io.trino.plugin.exchange.filesystem.ExchangeSourceFile;
import io.trino.plugin.exchange.filesystem.ExchangeStorageReader;
import io.trino.plugin.exchange.filesystem.ExchangeStorageWriter;
import io.trino.plugin.exchange.filesystem.FileStatus;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeFutures;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeManager;
import io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage;
import jakarta.annotation.PreDestroy;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.Queue;
import java.util.UUID;
import reactor.core.publisher.Flux;

/* loaded from: input_file:io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage.class */
public class AzureBlobFileSystemExchangeStorage implements FileSystemExchangeStorage {
    private final int blockSize;
    private final BlobServiceAsyncClient blobServiceAsyncClient;

    @ThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage$AzureExchangeStorageReader.class */
    private static class AzureExchangeStorageReader implements ExchangeStorageReader {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(AzureExchangeStorageReader.class);
        private final BlobServiceAsyncClient blobServiceAsyncClient;

        @GuardedBy("this")
        private final Queue<ExchangeSourceFile> sourceFiles;
        private final int blockSize;
        private final int bufferSize;

        @GuardedBy("this")
        private ExchangeSourceFile currentFile;

        @GuardedBy("this")
        private long fileOffset;

        @GuardedBy("this")
        private SliceInput sliceInput;
        private volatile boolean closed;
        private volatile long bufferRetainedSize;

        @GuardedBy("this")
        private int sliceSize = -1;
        private volatile ListenableFuture<Void> inProgressReadFuture = Futures.immediateVoidFuture();

        public AzureExchangeStorageReader(BlobServiceAsyncClient blobServiceAsyncClient, List<ExchangeSourceFile> list, int i, int i2) {
            this.blobServiceAsyncClient = (BlobServiceAsyncClient) Objects.requireNonNull(blobServiceAsyncClient, "blobServiceAsyncClient is null");
            this.sourceFiles = new ArrayDeque((Collection) Objects.requireNonNull(list, "sourceFiles is null"));
            this.blockSize = i;
            this.bufferSize = i2 + i;
            fillBuffer();
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public synchronized Slice read() throws IOException {
            if (this.closed || !this.inProgressReadFuture.isDone()) {
                return null;
            }
            try {
                MoreFutures.getFutureValue(this.inProgressReadFuture);
                if (this.sliceSize < 0) {
                    this.sliceSize = this.sliceInput.readInt();
                }
                Slice readSlice = this.sliceInput.readSlice(this.sliceSize);
                if (this.sliceInput.available() > 4) {
                    this.sliceSize = this.sliceInput.readInt();
                    if (this.sliceInput.available() < this.sliceSize) {
                        fillBuffer();
                    }
                } else {
                    this.sliceSize = -1;
                    fillBuffer();
                }
                return readSlice;
            } catch (RuntimeException e) {
                throw new IOException(e);
            }
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public ListenableFuture<Void> isBlocked() {
            return this.inProgressReadFuture;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public synchronized long getRetainedSize() {
            return INSTANCE_SIZE + this.bufferRetainedSize;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader
        public boolean isFinished() {
            return this.closed;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageReader, java.io.Closeable, java.lang.AutoCloseable
        public synchronized void close() {
            if (this.closed) {
                return;
            }
            this.closed = true;
            this.currentFile = null;
            this.sliceInput = null;
            this.bufferRetainedSize = 0L;
            this.inProgressReadFuture.cancel(true);
            this.inProgressReadFuture = Futures.immediateVoidFuture();
        }

        @GuardedBy("this")
        private void fillBuffer() {
            if (this.currentFile == null || this.fileOffset == this.currentFile.getFileSize()) {
                this.currentFile = this.sourceFiles.poll();
                if (this.currentFile == null) {
                    close();
                    return;
                }
                this.fileOffset = 0L;
            }
            byte[] bArr = new byte[this.bufferSize];
            int i = 0;
            if (this.sliceInput != null) {
                int available = this.sliceInput.available();
                this.sliceInput.readBytes(bArr, 0, available);
                i = 0 + available;
            }
            ImmutableList.Builder builder = ImmutableList.builder();
            while (true) {
                long fileSize = this.currentFile.getFileSize();
                int length = (bArr.length - i) / this.blockSize;
                if (length == 0) {
                    if (bArr.length - i < fileSize - this.fileOffset) {
                        break;
                    } else {
                        length = 1;
                    }
                }
                BlockBlobAsyncClient blockBlobAsyncClient = this.blobServiceAsyncClient.getBlobContainerAsyncClient(AzureBlobFileSystemExchangeStorage.getContainerName(this.currentFile.getFileUri())).getBlobAsyncClient(AzureBlobFileSystemExchangeStorage.getPath(this.currentFile.getFileUri())).getBlockBlobAsyncClient();
                for (int i2 = 0; i2 < length && this.fileOffset < fileSize; i2++) {
                    int min = (int) Math.min(this.blockSize, fileSize - this.fileOffset);
                    int i3 = i;
                    builder.add(FluentFuture.from(MoreFutures.toListenableFuture(blockBlobAsyncClient.downloadWithResponse(new BlobRange(this.fileOffset, Long.valueOf(min)), (DownloadRetryOptions) null, (BlobRequestConditions) null, false).toFuture())).transformAsync(blobDownloadAsyncResponse -> {
                        return MoreFutures.toListenableFuture(((Flux) blobDownloadAsyncResponse.getValue()).collectList().toFuture());
                    }, MoreExecutors.directExecutor()).transform(list -> {
                        int i4 = i3;
                        Iterator it = list.iterator();
                        while (it.hasNext()) {
                            ByteBuffer byteBuffer = (ByteBuffer) it.next();
                            int remaining = byteBuffer.remaining();
                            if (byteBuffer.hasArray()) {
                                System.arraycopy(byteBuffer.array(), byteBuffer.arrayOffset() + byteBuffer.position(), bArr, i4, remaining);
                            } else {
                                byteBuffer.asReadOnlyBuffer().get(bArr, i4, remaining);
                            }
                            i4 += remaining;
                        }
                        return null;
                    }, MoreExecutors.directExecutor()));
                    i += min;
                    this.fileOffset += min;
                }
                if (this.fileOffset == fileSize) {
                    this.currentFile = this.sourceFiles.poll();
                    if (this.currentFile == null) {
                        break;
                    } else {
                        this.fileOffset = 0L;
                    }
                }
            }
            this.inProgressReadFuture = MoreFutures.asVoid(Futures.allAsList(builder.build()));
            this.sliceInput = Slices.wrappedBuffer(bArr, 0, i).getInput();
            this.bufferRetainedSize = this.sliceInput.getRetainedSize();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    @NotThreadSafe
    /* loaded from: input_file:io/trino/plugin/exchange/filesystem/azure/AzureBlobFileSystemExchangeStorage$AzureExchangeStorageWriter.class */
    public static class AzureExchangeStorageWriter implements ExchangeStorageWriter {
        private static final int INSTANCE_SIZE = SizeOf.instanceSize(AzureExchangeStorageWriter.class);
        private final BlockBlobAsyncClient blockBlobAsyncClient;
        private final int blockSize;
        private ListenableFuture<Void> directUploadFuture;
        private final List<ListenableFuture<Void>> multiPartUploadFutures = new ArrayList();
        private final List<String> blockIds = new ArrayList();
        private volatile boolean closed;

        public AzureExchangeStorageWriter(BlockBlobAsyncClient blockBlobAsyncClient, int i) {
            this.blockBlobAsyncClient = (BlockBlobAsyncClient) Objects.requireNonNull(blockBlobAsyncClient, "blockBlobAsyncClient is null");
            this.blockSize = i;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public ListenableFuture<Void> write(Slice slice) {
            Preconditions.checkState(this.directUploadFuture == null, "Direct upload already started");
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            if (slice.length() < this.blockSize && this.multiPartUploadFutures.isEmpty()) {
                this.directUploadFuture = FileSystemExchangeFutures.translateFailures(MoreFutures.toListenableFuture(this.blockBlobAsyncClient.upload(Flux.just(slice.toByteBuffer()), slice.length()).toFuture()));
                return this.directUploadFuture;
            }
            String encodeToString = Base64.getEncoder().encodeToString(UUID.randomUUID().toString().getBytes(StandardCharsets.UTF_8));
            ListenableFuture<Void> listenableFuture = MoreFutures.toListenableFuture(this.blockBlobAsyncClient.stageBlock(encodeToString, Flux.just(slice.toByteBuffer()), slice.length()).toFuture());
            this.multiPartUploadFutures.add(listenableFuture);
            this.blockIds.add(encodeToString);
            return FileSystemExchangeFutures.translateFailures(listenableFuture);
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public ListenableFuture<Void> finish() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            if (this.multiPartUploadFutures.isEmpty()) {
                return (ListenableFuture) Objects.requireNonNullElseGet(this.directUploadFuture, Futures::immediateVoidFuture);
            }
            ListenableFuture<Void> translateFailures = FileSystemExchangeFutures.translateFailures(Futures.transformAsync(Futures.allAsList(this.multiPartUploadFutures), list -> {
                return MoreFutures.toListenableFuture(this.blockBlobAsyncClient.commitBlockList(this.blockIds).toFuture());
            }, MoreExecutors.directExecutor()));
            Futures.addCallback(translateFailures, new FutureCallback<Void>() { // from class: io.trino.plugin.exchange.filesystem.azure.AzureBlobFileSystemExchangeStorage.AzureExchangeStorageWriter.1
                public void onSuccess(Void r4) {
                    AzureExchangeStorageWriter.this.closed = true;
                }

                public void onFailure(Throwable th) {
                }
            }, MoreExecutors.directExecutor());
            return translateFailures;
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public ListenableFuture<Void> abort() {
            if (this.closed) {
                return Futures.immediateVoidFuture();
            }
            this.closed = true;
            if (this.multiPartUploadFutures.isEmpty()) {
                if (this.directUploadFuture != null) {
                    this.directUploadFuture.cancel(true);
                }
                return Futures.immediateVoidFuture();
            }
            Verify.verify(this.directUploadFuture == null);
            this.multiPartUploadFutures.forEach(listenableFuture -> {
                listenableFuture.cancel(true);
            });
            return Futures.immediateVoidFuture();
        }

        @Override // io.trino.plugin.exchange.filesystem.ExchangeStorageWriter
        public long getRetainedSize() {
            return INSTANCE_SIZE + SizeOf.estimatedSizeOf(this.blockIds, SizeOf::estimatedSizeOf);
        }
    }

    @Inject
    public AzureBlobFileSystemExchangeStorage(ExchangeAzureConfig exchangeAzureConfig) {
        this.blockSize = Math.toIntExact(exchangeAzureConfig.getAzureStorageBlockSize().toBytes());
        BlobServiceClientBuilder retryOptions = new BlobServiceClientBuilder().retryOptions(new RequestRetryOptions(RetryPolicyType.EXPONENTIAL, Integer.valueOf(exchangeAzureConfig.getMaxErrorRetries()), (Integer) null, (Long) null, (Long) null, (String) null));
        Optional<String> azureStorageConnectionString = exchangeAzureConfig.getAzureStorageConnectionString();
        if (azureStorageConnectionString.isPresent()) {
            retryOptions.connectionString(azureStorageConnectionString.get());
        } else {
            retryOptions.credential(new DefaultAzureCredentialBuilder().build());
        }
        this.blobServiceAsyncClient = retryOptions.buildAsyncClient();
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public void createDirectories(URI uri) throws IOException {
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ExchangeStorageReader createExchangeStorageReader(List<ExchangeSourceFile> list, int i) {
        return new AzureExchangeStorageReader(this.blobServiceAsyncClient, list, this.blockSize, i);
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ExchangeStorageWriter createExchangeStorageWriter(URI uri) {
        String containerName = getContainerName(uri);
        return new AzureExchangeStorageWriter(this.blobServiceAsyncClient.getBlobContainerAsyncClient(containerName).getBlobAsyncClient(getPath(uri)).getBlockBlobAsyncClient(), this.blockSize);
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ListenableFuture<Void> createEmptyFile(URI uri) {
        String containerName = getContainerName(uri);
        return FileSystemExchangeFutures.translateFailures(MoreFutures.toListenableFuture(this.blobServiceAsyncClient.getBlobContainerAsyncClient(containerName).getBlobAsyncClient(getPath(uri)).upload(BinaryData.fromString("")).toFuture()));
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ListenableFuture<Void> deleteRecursively(List<URI> list) {
        ImmutableMultimap.Builder builder = ImmutableMultimap.builder();
        list.forEach(uri -> {
            builder.put(getContainerName(uri), listObjectsRecursively(uri));
        });
        ImmutableMultimap build = builder.build();
        ImmutableList.Builder builder2 = ImmutableList.builder();
        for (String str : build.keySet()) {
            BlobContainerAsyncClient blobContainerAsyncClient = this.blobServiceAsyncClient.getBlobContainerAsyncClient(str);
            builder2.add(Futures.transformAsync(Futures.allAsList(build.get(str)), list2 -> {
                ImmutableList.Builder builder3 = ImmutableList.builder();
                Iterator it = list2.iterator();
                while (it.hasNext()) {
                    Iterator it2 = ((List) it.next()).iterator();
                    while (it2.hasNext()) {
                        ((PagedResponse) it2.next()).getValue().forEach(blobItem -> {
                            builder3.add(blobContainerAsyncClient.getBlobAsyncClient(blobItem.getName()).getBlobUrl());
                        });
                    }
                }
                return deleteObjects(builder3.build());
            }, MoreExecutors.directExecutor()));
        }
        return FileSystemExchangeFutures.translateFailures(Futures.allAsList(builder2.build()));
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public ListenableFuture<List<FileStatus>> listFilesRecursively(URI uri) {
        return Futures.transform(listObjectsRecursively(uri), list -> {
            ImmutableList.Builder builder = ImmutableList.builder();
            Iterator it = list.iterator();
            while (it.hasNext()) {
                for (BlobItem blobItem : ((PagedResponse) it.next()).getValue()) {
                    if (blobItem.isPrefix() != Boolean.TRUE) {
                        try {
                            builder.add(new FileStatus(new URI(uri.getScheme(), uri.getUserInfo(), uri.getHost(), -1, "/" + blobItem.getName(), null, uri.getFragment()).toString(), blobItem.getProperties().getContentLength().longValue()));
                        } catch (URISyntaxException e) {
                            throw new IllegalArgumentException(e);
                        }
                    }
                }
            }
            return builder.build();
        }, MoreExecutors.directExecutor());
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage
    public int getWriteBufferSize() {
        return this.blockSize;
    }

    @Override // io.trino.plugin.exchange.filesystem.FileSystemExchangeStorage, java.lang.AutoCloseable
    @PreDestroy
    public void close() throws IOException {
    }

    private ListenableFuture<List<PagedResponse<BlobItem>>> listObjectsRecursively(URI uri) {
        Preconditions.checkArgument(isDirectory(uri), "listObjectsRecursively called on file uri %s", uri);
        return MoreFutures.toListenableFuture(this.blobServiceAsyncClient.getBlobContainerAsyncClient(getContainerName(uri)).listBlobsByHierarchy((String) null, new ListBlobsOptions().setPrefix(getPath(uri))).byPage().collectList().toFuture());
    }

    private ListenableFuture<List<Void>> deleteObjects(List<String> list) {
        BlobBatchAsyncClient buildAsyncClient = new BlobBatchClientBuilder(this.blobServiceAsyncClient).buildAsyncClient();
        return Futures.allAsList((Iterable) Lists.partition(list, 256).stream().map(list2 -> {
            return MoreFutures.toListenableFuture(buildAsyncClient.deleteBlobs(list2, DeleteSnapshotsOptionType.INCLUDE).then().toFuture());
        }).collect(ImmutableList.toImmutableList()));
    }

    private static String getContainerName(URI uri) {
        return uri.getUserInfo();
    }

    private static String getPath(URI uri) {
        Preconditions.checkArgument(uri.isAbsolute(), "Uri is not absolute: %s", uri);
        String nullToEmpty = Strings.nullToEmpty(uri.getPath());
        if (nullToEmpty.startsWith(FileSystemExchangeManager.PATH_SEPARATOR)) {
            nullToEmpty = nullToEmpty.substring(FileSystemExchangeManager.PATH_SEPARATOR.length());
        }
        if (nullToEmpty.endsWith(FileSystemExchangeManager.PATH_SEPARATOR)) {
            nullToEmpty = nullToEmpty.substring(0, nullToEmpty.length() - FileSystemExchangeManager.PATH_SEPARATOR.length());
        }
        return nullToEmpty;
    }

    private static boolean isDirectory(URI uri) {
        return uri.toString().endsWith(FileSystemExchangeManager.PATH_SEPARATOR);
    }
}
