package com.microsoft.azure.kusto.ingest;

import com.azure.core.util.BinaryData;
import com.azure.data.tables.TableAsyncClient;
import com.azure.data.tables.implementation.models.TableServiceErrorException;
import com.azure.data.tables.models.TableEntity;
import com.azure.storage.blob.BlobAsyncClient;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.queue.QueueAsyncClient;
import com.microsoft.azure.kusto.data.Ensure;
import com.microsoft.azure.kusto.ingest.utils.IngestionUtils;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.lang.invoke.MethodHandles;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import org.apache.commons.codec.binary.Base64;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.Exceptions;
import reactor.core.publisher.Mono;

/* loaded from: input_file:com/microsoft/azure/kusto/ingest/AzureStorageClient.class */
public class AzureStorageClient {
    private static final Logger log = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> postMessageToQueue(QueueAsyncClient queueAsyncClient, String str) {
        Ensure.argIsNotNull(queueAsyncClient, "queueAsyncClient");
        Ensure.stringIsNotBlank(str, "content");
        return queueAsyncClient.sendMessage(BinaryData.fromBytes(Base64.encodeBase64(str.getBytes()))).then();
    }

    public Mono<Void> azureTableInsertEntity(TableAsyncClient tableAsyncClient, TableEntity tableEntity) throws TableServiceErrorException {
        Ensure.argIsNotNull(tableAsyncClient, "tableAsyncClient");
        Ensure.argIsNotNull(tableEntity, "tableEntity");
        return tableAsyncClient.createEntity(tableEntity);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Void> uploadLocalFileToBlob(File file, String str, BlobContainerAsyncClient blobContainerAsyncClient, boolean z) throws IOException {
        log.debug("uploadLocalFileToBlob: filePath: {}, blobName: {}, storageUri: {}", new Object[]{file.getPath(), str, blobContainerAsyncClient.getBlobContainerUrl()});
        Ensure.fileExists(file, "sourceFile");
        Ensure.stringIsNotBlank(str, "blobName");
        Ensure.argIsNotNull(blobContainerAsyncClient, "asyncContainer");
        BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(str);
        return z ? compressAndUploadFileToBlob(file, blobAsyncClient) : uploadFileToBlob(file, blobAsyncClient);
    }

    Mono<Void> compressAndUploadFileToBlob(File file, BlobAsyncClient blobAsyncClient) throws IOException {
        Ensure.fileExists(file, "sourceFile");
        Ensure.argIsNotNull(blobAsyncClient, "blobAsyncClient");
        return Mono.defer(() -> {
            try {
                return IngestionUtils.compressStream(Files.newInputStream(file.toPath(), new OpenOption[0]), false).flatMap(byteArrayInputStream -> {
                    return blobAsyncClient.getBlockBlobAsyncClient().upload(BinaryData.fromStream(byteArrayInputStream, Long.valueOf(byteArrayInputStream.available())), true);
                });
            } catch (IOException e) {
                throw Exceptions.propagate(e);
            }
        }).then();
    }

    Mono<Void> uploadFileToBlob(File file, BlobAsyncClient blobAsyncClient) throws IOException {
        Ensure.argIsNotNull(blobAsyncClient, "blob");
        Ensure.fileExists(file, "sourceFile");
        return blobAsyncClient.uploadFromFile(file.getPath());
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public Mono<Integer> uploadStreamToBlob(InputStream inputStream, String str, BlobContainerAsyncClient blobContainerAsyncClient, boolean z) {
        Ensure.argIsNotNull(inputStream, "inputStream");
        Ensure.stringIsNotBlank(str, "blobName");
        Ensure.argIsNotNull(blobContainerAsyncClient, "asyncContainer");
        log.debug("uploadStreamToBlob: blobName: {}, storageUri: {}", str, blobContainerAsyncClient.getBlobContainerUrl());
        BlobAsyncClient blobAsyncClient = blobContainerAsyncClient.getBlobAsyncClient(str);
        return z ? compressAndUploadStream(inputStream, blobAsyncClient) : uploadStream(inputStream, blobAsyncClient);
    }

    Mono<Integer> uploadStream(InputStream inputStream, BlobAsyncClient blobAsyncClient) {
        Ensure.argIsNotNull(inputStream, "inputStream");
        Ensure.argIsNotNull(blobAsyncClient, "blobAsyncClient");
        IngestionUtils.IntegerHolder integerHolder = new IngestionUtils.IntegerHolder();
        return IngestionUtils.toByteArray(inputStream).flatMap(bArr -> {
            integerHolder.add(bArr.length);
            return blobAsyncClient.getBlockBlobAsyncClient().upload(BinaryData.fromBytes(bArr), true);
        }).map(blockBlobItem -> {
            return Integer.valueOf(integerHolder.getValue());
        });
    }

    Mono<Integer> compressAndUploadStream(InputStream inputStream, BlobAsyncClient blobAsyncClient) {
        Ensure.argIsNotNull(inputStream, "inputStream");
        Ensure.argIsNotNull(blobAsyncClient, "blobAsyncClient");
        IngestionUtils.IntegerHolder integerHolder = new IngestionUtils.IntegerHolder();
        return IngestionUtils.compressStream(inputStream, false).flatMap(byteArrayInputStream -> {
            integerHolder.add(byteArrayInputStream.available());
            return blobAsyncClient.getBlockBlobAsyncClient().upload(BinaryData.fromStream(byteArrayInputStream, Long.valueOf(byteArrayInputStream.available())), true);
        }).map(blockBlobItem -> {
            return Integer.valueOf(integerHolder.getValue());
        });
    }
}
