package org.apache.seatunnel.engine.imap.storage.file;

import java.io.IOException;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.seatunnel.engine.imap.storage.api.IMapStorage;
import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException;
import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData;
import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants;
import org.apache.seatunnel.engine.imap.storage.file.common.WALReader;
import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor;
import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALEventType;
import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture;
import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache;
import org.apache.seatunnel.engine.serializer.api.Serializer;
import org.apache.seatunnel.engine.serializer.protobuf.ProtoStuffSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.class */
public class IMapFileStorage implements IMapStorage {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) IMapFileStorage.class);
    private static final String STORAGE_TYPE_KEY = "storage.type";
    public FileSystem fs;
    public String namespace;
    public String region;
    public String businessName;
    public String clusterName;
    public long writDataTimeoutMilliseconds;
    WALDisruptor walDisruptor;
    Serializer serializer;
    private String businessRootPath = null;
    public static final int DEFAULT_ARCHIVE_WAIT_TIME_MILLISECONDS = 60000;
    public static final int DEFAULT_QUERY_LIST_SIZE = 256;
    public static final long DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS = 60000;
    private Configuration conf;
    private FileConfiguration fileConfiguration;

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public void initialize(Map<String, Object> map) {
        checkInitStorageProperties(map);
        String valueOf = String.valueOf(map.getOrDefault(STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString()));
        this.fileConfiguration = FileConfiguration.valueOf(valueOf.toUpperCase());
        Configuration buildConfiguration = this.fileConfiguration.getConfiguration().buildConfiguration(map);
        this.conf = buildConfiguration;
        this.namespace = (String) map.getOrDefault("namespace", FileConstants.DEFAULT_IMAP_NAMESPACE);
        this.businessName = (String) map.get(FileConstants.FileInitProperties.BUSINESS_KEY);
        this.clusterName = (String) map.get(FileConstants.FileInitProperties.CLUSTER_NAME);
        this.writDataTimeoutMilliseconds = ((Long) map.getOrDefault(FileConstants.FileInitProperties.WRITE_DATA_TIMEOUT_MILLISECONDS_KEY, 60000L)).longValue();
        this.region = String.valueOf(System.nanoTime());
        this.businessRootPath = this.namespace + "/" + this.clusterName + "/" + this.businessName + "/";
        try {
            this.fs = FileSystem.get(buildConfiguration);
            this.fs.setWriteChecksum(false);
            this.serializer = new ProtoStuffSerializer();
            this.walDisruptor = new WALDisruptor(this.fs, FileConfiguration.valueOf(valueOf.toUpperCase()), this.businessRootPath + this.region + "/", this.serializer);
        } catch (IOException e) {
            throw new IMapStorageException("Failed to get file system", e);
        }
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public boolean store(Object obj, Object obj2) {
        try {
            return queryExecuteStatus(sendToDisruptorQueue(parseToIMapFileData(obj, obj2), WALEventType.APPEND));
        } catch (IOException e) {
            log.error("parse to IMapFileData error, key is {}, value is {}", obj, obj2, e);
            return false;
        }
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public Set<Object> storeAll(Map<Object, Object> map) {
        HashMap hashMap = new HashMap(map.size());
        HashSet hashSet = new HashSet();
        map.forEach((obj, obj2) -> {
            try {
                hashMap.put(Long.valueOf(sendToDisruptorQueue(parseToIMapFileData(obj, obj2), WALEventType.APPEND)), obj);
            } catch (IOException e) {
                log.error("parse to IMapFileData error", (Throwable) e);
                hashSet.add(obj);
            }
        });
        return batchQueryExecuteFailsStatus(hashMap, hashSet);
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public boolean delete(Object obj) {
        try {
            return queryExecuteStatus(sendToDisruptorQueue(buildDeleteIMapFileData(obj), WALEventType.APPEND));
        } catch (IOException e) {
            log.error("parse to IMapFileData error, key is {} ", obj, e);
            return false;
        }
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public Set<Object> deleteAll(Collection<Object> collection) {
        HashMap hashMap = new HashMap(collection.size());
        HashSet hashSet = new HashSet();
        collection.forEach(obj -> {
            try {
                IMapFileData buildDeleteIMapFileData = buildDeleteIMapFileData(obj);
                long sendToDisruptorQueue = sendToDisruptorQueue(buildDeleteIMapFileData, WALEventType.APPEND);
                this.walDisruptor.tryAppendPublish(buildDeleteIMapFileData, sendToDisruptorQueue);
                hashMap.put(Long.valueOf(sendToDisruptorQueue), buildDeleteIMapFileData);
            } catch (IOException e) {
                log.error("parse to IMapFileData error", (Throwable) e);
                hashSet.add(obj);
            }
        });
        return batchQueryExecuteFailsStatus(hashMap, hashSet);
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public Map<Object, Object> loadAll() {
        try {
            return new WALReader(this.fs, this.fileConfiguration, this.serializer).loadAllData(new Path(this.businessRootPath), new HashSet());
        } catch (IOException e) {
            throw new IMapStorageException("load all data error", e);
        }
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public Set<Object> loadAllKeys() {
        try {
            return new WALReader(this.fs, this.fileConfiguration, this.serializer).loadAllKeys(new Path(this.businessRootPath));
        } catch (IOException e) {
            throw new IMapStorageException(e, "load all keys error parent path is {}", e, this.businessRootPath);
        }
    }

    @Override // org.apache.seatunnel.engine.imap.storage.api.IMapStorage
    public void destroy(boolean z) {
        log.info("start destroy IMapFileStorage, businessName is {}, cluster name is {}", this.businessName, this.region);
        try {
            this.walDisruptor.close();
        } catch (IOException e) {
            log.error("close walDisruptor error", (Throwable) e);
        }
        if (z) {
            try {
                this.fs.delete(new Path(this.businessRootPath), true);
            } catch (IOException e2) {
                log.error("destroy IMapFileStorage error,businessName is {}, cluster name is {}", this.businessName, this.region, e2);
            }
        }
    }

    private IMapFileData parseToIMapFileData(Object obj, Object obj2) throws IOException {
        return IMapFileData.builder().key(this.serializer.serialize(obj)).keyClassName(obj.getClass().getName()).value(this.serializer.serialize(obj2)).valueClassName(obj2.getClass().getName()).timestamp(System.currentTimeMillis()).deleted(false).build();
    }

    private IMapFileData buildDeleteIMapFileData(Object obj) throws IOException {
        return IMapFileData.builder().key(this.serializer.serialize(obj)).keyClassName(obj.getClass().getName()).timestamp(System.currentTimeMillis()).deleted(true).build();
    }

    private long sendToDisruptorQueue(IMapFileData iMapFileData, WALEventType wALEventType) {
        long requestId = RequestFutureCache.getRequestId();
        RequestFutureCache.put(requestId, new RequestFuture());
        this.walDisruptor.tryPublish(iMapFileData, wALEventType, Long.valueOf(requestId));
        return requestId;
    }

    private boolean queryExecuteStatus(long j) {
        return queryExecuteStatus(j, this.writDataTimeoutMilliseconds);
    }

    private boolean queryExecuteStatus(long j, long j2) {
        RequestFuture requestFuture = RequestFutureCache.get(Long.valueOf(j));
        try {
            try {
                if (!requestFuture.isDone()) {
                    if (!Boolean.TRUE.equals(requestFuture.get(j2, TimeUnit.MILLISECONDS))) {
                        RequestFutureCache.remove(Long.valueOf(j));
                        return false;
                    }
                }
                RequestFutureCache.remove(Long.valueOf(j));
                return true;
            } catch (Exception e) {
                log.error("wait for write status error", (Throwable) e);
                RequestFutureCache.remove(Long.valueOf(j));
                return false;
            }
        } catch (Throwable th) {
            RequestFutureCache.remove(Long.valueOf(j));
            throw th;
        }
    }

    private Set<Object> batchQueryExecuteFailsStatus(Map<Long, Object> map, Set<Object> set) {
        for (Map.Entry<Long, Object> entry : map.entrySet()) {
            RequestFuture requestFuture = RequestFutureCache.get(entry.getKey());
            try {
                try {
                    r9 = requestFuture.isDone() || Boolean.TRUE.equals(requestFuture.get());
                    RequestFutureCache.remove(entry.getKey());
                } catch (Exception e) {
                    log.error("wait for write status error", (Throwable) e);
                    RequestFutureCache.remove(entry.getKey());
                }
                if (!r9) {
                    set.add(entry.getValue());
                }
            } catch (Throwable th) {
                RequestFutureCache.remove(entry.getKey());
                throw th;
            }
        }
        return set;
    }

    private void checkInitStorageProperties(Map<String, Object> map) {
        if (map == null || map.isEmpty()) {
            throw new IllegalArgumentException("init file storage properties is empty");
        }
        for (String str : Arrays.asList(FileConstants.FileInitProperties.BUSINESS_KEY, FileConstants.FileInitProperties.CLUSTER_NAME)) {
            if (!map.containsKey(str)) {
                throw new IllegalArgumentException("init file storage properties is not contains " + str);
            }
        }
    }
}
