/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.store;

import com.azure.core.credential.TokenCredential;
import com.azure.identity.ClientSecretCredential;
import com.azure.identity.ClientSecretCredentialBuilder;
import com.azure.identity.DefaultAzureCredential;
import com.azure.identity.DefaultAzureCredentialBuilder;
import com.azure.storage.blob.BlobClient;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
import com.azure.storage.blob.models.AccessTier;
import com.azure.storage.blob.models.BlobErrorCode;
import com.azure.storage.blob.models.BlobHttpHeaders;
import com.azure.storage.blob.models.BlobRange;
import com.azure.storage.blob.models.BlobRequestConditions;
import com.azure.storage.blob.models.BlobStorageException;
import com.azure.storage.blob.models.ParallelTransferOptions;
import com.azure.storage.blob.specialized.BlobInputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.ByteBuffer;
import java.util.Map;
import java.util.Optional;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.AutoAbortingGenericInputStream;
import kafka.tier.store.AzureBlockBlobTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import org.apache.kafka.common.utils.ByteBufferInputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class AzureBlockBlobTierObjectStore
implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(AzureBlockBlobTierObjectStore.class);
    private final BlobServiceClient blobServiceClient;
    private final BlobContainerClient blobContainerClient;
    private final Optional<String> clusterIdOpt;
    private final Optional<Integer> brokerIdOpt;
    private final String container;
    private final String prefix;
    private final int drainThreshold;

    public AzureBlockBlobTierObjectStore(AzureBlockBlobTierObjectStoreConfig config) {
        this.clusterIdOpt = config.clusterIdOpt;
        this.brokerIdOpt = config.brokerIdOpt;
        this.container = config.container;
        this.prefix = config.azureBlobPrefix;
        this.drainThreshold = config.drainThreshold;
        this.blobServiceClient = AzureBlockBlobTierObjectStore.createServiceClient(config);
        this.blobContainerClient = AzureBlockBlobTierObjectStore.createContainerClient(this.blobServiceClient, config);
    }

    @Override
    public TierObjectStore.Backend getBackend() {
        return TierObjectStore.Backend.AzureBlockBlob;
    }

    @Override
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectStoreMetadata objectMetadata, TierObjectStore.FileType fileType, Integer byteOffsetStart, Integer byteOffsetEnd) {
        String key = this.keyPath(objectMetadata, fileType);
        BlobClient blob = this.blobContainerClient.getBlobClient(key);
        if (byteOffsetStart != null && byteOffsetEnd != null && byteOffsetStart > byteOffsetEnd) {
            throw new IllegalStateException("Invalid range of byteOffsetStart and byteOffsetEnd");
        }
        if (byteOffsetStart == null && byteOffsetEnd != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from {}/{}, with range of {} to {}", new Object[]{this.container, key, byteOffsetStart, byteOffsetEnd});
        long byteOffsetStartLong = byteOffsetStart == null ? 0L : byteOffsetStart.longValue();
        BlobRange range = byteOffsetEnd != null ? new BlobRange(byteOffsetStartLong, Long.valueOf(byteOffsetEnd.longValue() - byteOffsetStartLong)) : new BlobRange(byteOffsetStartLong);
        BlobInputStream inputStream = blob.getBlockBlobClient().openInputStream(range, new BlobRequestConditions());
        long streamSize = byteOffsetEnd == null ? Long.MAX_VALUE : byteOffsetEnd.longValue() - byteOffsetStartLong;
        return new AzureBlockBlobTierObjectStoreResponse((InputStream)inputStream, this.drainThreshold, streamSize);
    }

    @Override
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File segmentData, File offsetIndexData, File timestampIndexData, Optional<File> producerStateSnapshotData, Optional<ByteBuffer> transactionIndexData, Optional<ByteBuffer> epochState) {
        Map<String, String> metadata = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            this.putFile(this.keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), metadata, segmentData);
            this.putFile(this.keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), metadata, offsetIndexData);
            this.putFile(this.keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), metadata, timestampIndexData);
            producerStateSnapshotData.ifPresent(file -> this.putFile(this.keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), metadata, (File)file));
            transactionIndexData.ifPresent(byteBuffer -> this.putBuf(this.keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), metadata, (ByteBuffer)byteBuffer));
            epochState.ifPresent(byteBuffer -> this.putBuf(this.keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), metadata, (ByteBuffer)byteBuffer));
        }
        catch (UncheckedIOException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment: " + objectMetadata, e);
        }
    }

    @Override
    public void putObject(TierObjectStore.ObjectStoreMetadata objectMetadata, File file, TierObjectStore.FileType fileType) {
        Map<String, String> metadata = objectMetadata.objectMetadata(this.clusterIdOpt, this.brokerIdOpt);
        try {
            this.putFile(this.keyPath(objectMetadata, fileType), metadata, file);
        }
        catch (UncheckedIOException ex) {
            throw new TierObjectStoreRetriableException(String.format("Failed to upload object %s, file %s, type %s", new Object[]{objectMetadata, file, fileType}), ex);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException(String.format("Failed to upload object %s, file %s, type %s", new Object[]{objectMetadata, file, fileType}), e);
        }
    }

    @Override
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        for (TierObjectStore.FileType type : TierObjectStore.FileType.values()) {
            String key = this.keyPath(objectMetadata, type);
            try {
                BlobClient blob = this.blobContainerClient.getBlobClient(key);
                log.debug("Deleting " + key);
                blob.delete();
            }
            catch (BlobStorageException be) {
                if (be.getErrorCode().equals((Object)BlobErrorCode.BLOB_NOT_FOUND)) continue;
                throw new TierObjectStoreRetriableException("Failed to delete file " + key, be);
            }
            catch (Exception e) {
                throw new TierObjectStoreFatalException("Unknown exception when deleting segment " + objectMetadata, e);
            }
        }
    }

    @Override
    public void close() {
    }

    private void putFile(String key, Map<String, String> metadata, File file) {
        BlobClient blobClient = this.blobContainerClient.getBlobClient(key);
        ParallelTransferOptions transferOptions = new ParallelTransferOptions(null, null, null);
        blobClient.uploadFromFile(file.getPath(), transferOptions, new BlobHttpHeaders(), metadata, AccessTier.HOT, new BlobRequestConditions(), null);
    }

    private void putBuf(String key, Map<String, String> metadata, ByteBuffer buf) {
        BlobClient blobClient = this.blobContainerClient.getBlobClient(key);
        blobClient.getBlockBlobClient().uploadWithResponse((InputStream)new ByteBufferInputStream(buf.duplicate()), (long)(buf.limit() - buf.position()), new BlobHttpHeaders(), metadata, AccessTier.HOT, null, new BlobRequestConditions(), null, null);
    }

    private static BlobServiceClient createServiceClient(AzureBlockBlobTierObjectStoreConfig config) {
        BlobServiceClient blobServiceClient;
        if (config.azureCredentialsConfig.isPresent()) {
            AzureBlockBlobTierObjectStoreConfig.AzureCredentialsConfig azureCredentialsConfig = config.azureCredentialsConfig.get();
            if (azureCredentialsConfig.connectionStringAuthMethod().booleanValue()) {
                blobServiceClient = new BlobServiceClientBuilder().connectionString(azureCredentialsConfig.connectionString()).buildClient();
            } else {
                ClientSecretCredential credential = ((ClientSecretCredentialBuilder)((ClientSecretCredentialBuilder)new ClientSecretCredentialBuilder().clientId(azureCredentialsConfig.azureClientId())).tenantId(azureCredentialsConfig.azureTenantId())).clientSecret(azureCredentialsConfig.azureClientSecret()).build();
                blobServiceClient = new BlobServiceClientBuilder().endpoint(config.endpoint.get()).credential((TokenCredential)credential).buildClient();
            }
        } else {
            DefaultAzureCredential credential = new DefaultAzureCredentialBuilder().build();
            blobServiceClient = new BlobServiceClientBuilder().endpoint(config.endpoint.get()).credential((TokenCredential)credential).buildClient();
        }
        return blobServiceClient;
    }

    private static BlobContainerClient createContainerClient(BlobServiceClient blobServiceClient, AzureBlockBlobTierObjectStoreConfig config) {
        BlobContainerClient blobContainerClient = blobServiceClient.getBlobContainerClient(config.container);
        if (!blobContainerClient.exists()) {
            throw new TierObjectStoreFatalException("Container " + config.container + " does not exist or could not be found");
        }
        return blobContainerClient;
    }

    private String keyPath(TierObjectStore.ObjectStoreMetadata objectMetadata, TierObjectStore.FileType fileType) {
        return objectMetadata.toPath(this.prefix, fileType);
    }

    private static class AzureBlockBlobTierObjectStoreResponse
    implements TierObjectStoreResponse {
        private final InputStream inputStream;

        AzureBlockBlobTierObjectStoreResponse(InputStream inputStream, int drainThreshold, long streamSize) {
            this.inputStream = new AutoAbortingGenericInputStream(inputStream, drainThreshold, streamSize);
        }

        @Override
        public void close() throws IOException {
            this.inputStream.close();
        }

        @Override
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }
}

