/*
 * Decompiled with CFR 0.152.
 */
package kafka.tier.store;

import com.google.auth.Credentials;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.ReadChannel;
import com.google.cloud.WriteChannel;
import com.google.cloud.storage.BlobId;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.storage.Bucket;
import com.google.cloud.storage.Storage;
import com.google.cloud.storage.StorageException;
import com.google.cloud.storage.StorageOptions;
import com.google.common.collect.Lists;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Map;
import java.util.Optional;
import kafka.tier.exceptions.TierObjectStoreFatalException;
import kafka.tier.exceptions.TierObjectStoreRetriableException;
import kafka.tier.store.GcsTierObjectStoreConfig;
import kafka.tier.store.TierObjectStore;
import kafka.tier.store.TierObjectStoreResponse;
import kafka.tier.store.TierObjectStoreUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class GcsTierObjectStore
implements TierObjectStore {
    private static final Logger log = LoggerFactory.getLogger(GcsTierObjectStore.class);
    private final String clusterId;
    private final int brokerId;
    private final String bucket;
    private final int writeChunkSize;
    private final int readChunkSize;
    private final Storage storage;

    public GcsTierObjectStore(GcsTierObjectStoreConfig config) {
        this(GcsTierObjectStore.storage(config), config);
    }

    GcsTierObjectStore(Storage storage, GcsTierObjectStoreConfig config) {
        this.clusterId = config.clusterId;
        this.brokerId = config.brokerId;
        this.storage = storage;
        this.bucket = config.gcsBucket;
        this.writeChunkSize = config.gcsWriteChunkSize;
        this.readChunkSize = config.gcsReadChunkSize;
        this.expectBucket(this.bucket, config.gcsRegion);
    }

    @Override
    public TierObjectStoreResponse getObject(TierObjectStore.ObjectMetadata objectMetadata, TierObjectStore.FileType fileType, Integer byteOffsetStart, Integer byteOffsetEnd) {
        String key = TierObjectStoreUtils.keyPath(objectMetadata, fileType);
        BlobId blobId = BlobId.of((String)this.bucket, (String)key);
        if (byteOffsetStart != null && byteOffsetEnd != null && byteOffsetStart > byteOffsetEnd) {
            throw new IllegalStateException("Invalid range of byteOffsetStart and byteOffsetEnd");
        }
        if (byteOffsetStart == null && byteOffsetEnd != null) {
            throw new IllegalStateException("Cannot specify a byteOffsetEnd without specifying a byteOffsetStart");
        }
        log.debug("Fetching object from gcs://{}/{}, with range of {} to {}", new Object[]{this.bucket, key, byteOffsetStart, byteOffsetEnd});
        try {
            ReadChannel reader = this.storage.reader(blobId, new Storage.BlobSourceOption[0]);
            long byteOffsetStartLong = byteOffsetStart == null ? 0L : byteOffsetStart.longValue();
            return new GcsTierObjectStoreResponse(reader, byteOffsetStartLong, this.readChunkSize);
        }
        catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to fetch segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when fetching segment " + objectMetadata, e);
        }
    }

    @Override
    public void putSegment(TierObjectStore.ObjectMetadata objectMetadata, File segmentData, File offsetIndexData, File timestampIndexData, Optional<File> producerStateSnapshotData, Optional<ByteBuffer> transactionIndexData, Optional<File> epochState) {
        Map<String, String> metadata = TierObjectStoreUtils.createSegmentMetadata(objectMetadata, this.clusterId, this.brokerId);
        try {
            this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.SEGMENT), metadata, segmentData);
            this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.OFFSET_INDEX), metadata, offsetIndexData);
            this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.TIMESTAMP_INDEX), metadata, timestampIndexData);
            if (producerStateSnapshotData.isPresent()) {
                this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.PRODUCER_STATE), metadata, producerStateSnapshotData.get());
            }
            if (transactionIndexData.isPresent()) {
                this.putBuf(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.TRANSACTION_INDEX), metadata, transactionIndexData.get());
            }
            if (epochState.isPresent()) {
                this.putFile(TierObjectStoreUtils.keyPath(objectMetadata, TierObjectStore.FileType.EPOCH_STATE), metadata, epochState.get());
            }
        }
        catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to upload segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when uploading segment " + objectMetadata, e);
        }
    }

    @Override
    public void deleteSegment(TierObjectStore.ObjectMetadata objectMetadata) {
        ArrayList<BlobId> blobIds = new ArrayList<BlobId>();
        for (TierObjectStore.FileType type : TierObjectStore.FileType.values()) {
            blobIds.add(BlobId.of((String)this.bucket, (String)TierObjectStoreUtils.keyPath(objectMetadata, type)));
        }
        log.debug("Deleting " + blobIds);
        try {
            this.storage.delete(blobIds);
        }
        catch (StorageException e) {
            throw new TierObjectStoreRetriableException("Failed to delete segment " + objectMetadata, e);
        }
        catch (Exception e) {
            throw new TierObjectStoreFatalException("Unknown exception when deleting segment " + objectMetadata, e);
        }
    }

    @Override
    public void close() {
    }

    private void putFile(String key, Map<String, String> metadata, File file) throws IOException {
        BlobId blobId = BlobId.of((String)this.bucket, (String)key);
        BlobInfo blobInfo = BlobInfo.newBuilder((BlobId)blobId).setMetadata(metadata).build();
        log.debug("Uploading object to gcs://{}/{}", (Object)this.bucket, (Object)key);
        try (WriteChannel writer = this.storage.writer(blobInfo, new Storage.BlobWriteOption[0]);
             FileChannel fileChannel = FileChannel.open(file.toPath(), StandardOpenOption.READ);){
            if (this.writeChunkSize > 0) {
                writer.setChunkSize(this.writeChunkSize);
            }
            long fileLength = file.length();
            for (long position = 0L; position < fileLength; position += fileChannel.transferTo(position, fileLength, (WritableByteChannel)writer)) {
            }
        }
    }

    private void putBuf(String key, Map<String, String> metadata, ByteBuffer buf) throws IOException {
        BlobId blobId = BlobId.of((String)this.bucket, (String)key);
        BlobInfo blobInfo = BlobInfo.newBuilder((BlobId)blobId).setMetadata(metadata).build();
        log.debug("Uploading object gcs://{}/{}", (Object)this.bucket, (Object)key);
        try (WriteChannel writer = this.storage.writer(blobInfo, new Storage.BlobWriteOption[0]);){
            if (this.writeChunkSize > 0) {
                writer.setChunkSize(this.writeChunkSize);
            }
            while (buf.hasRemaining()) {
                writer.write(buf);
            }
        }
    }

    private static Storage storage(GcsTierObjectStoreConfig config) {
        if (config.gcsCredFilePath.isPresent()) {
            try {
                GoogleCredentials credentials = GoogleCredentials.fromStream((InputStream)new FileInputStream(config.gcsCredFilePath.get())).createScoped((Collection)Lists.newArrayList((Object[])new String[]{"https://www.googleapis.com/auth/cloud-platform"}));
                return (Storage)((StorageOptions.Builder)StorageOptions.newBuilder().setCredentials((Credentials)credentials)).build().getService();
            }
            catch (IOException e) {
                throw new TierObjectStoreFatalException("Error in opening GCS credentials file", e);
            }
        }
        return (Storage)StorageOptions.getDefaultInstance().getService();
    }

    private void expectBucket(String bucket, String expectedRegion) throws TierObjectStoreFatalException {
        Bucket bucketObj;
        try {
            bucketObj = this.storage.get(bucket, new Storage.BucketGetOption[]{Storage.BucketGetOption.fields((Storage.BucketField[])new Storage.BucketField[]{Storage.BucketField.LOCATION})});
        }
        catch (StorageException e) {
            throw new TierObjectStoreFatalException("Unable to access bucket " + bucket, e);
        }
        if (bucketObj == null) {
            throw new TierObjectStoreFatalException("Configured bucket " + bucket + " does not exist or could not be found");
        }
        String actualRegion = bucketObj.getLocation();
        if (!expectedRegion.equalsIgnoreCase(actualRegion)) {
            log.warn("Bucket region {} does not match expected region {}", (Object)actualRegion, (Object)expectedRegion);
        }
    }

    private static class GcsTierObjectStoreResponse
    implements TierObjectStoreResponse {
        private final InputStream inputStream;

        GcsTierObjectStoreResponse(ReadChannel channel, long startOffset, int chunkSize) throws IOException {
            if (chunkSize > 0) {
                channel.setChunkSize(chunkSize);
            }
            channel.seek(startOffset);
            this.inputStream = Channels.newInputStream((ReadableByteChannel)channel);
        }

        @Override
        public void close() throws IOException {
            this.inputStream.close();
        }

        @Override
        public InputStream getInputStream() {
            return this.inputStream;
        }
    }
}

