/*
 * Decompiled with CFR 0.152.
 */
package org.apache.paimon.flink.lookup;

import java.io.Closeable;
import java.io.File;
import java.io.IOException;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import javax.annotation.Nullable;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowData;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.RocksDBOptions;
import org.apache.paimon.flink.lookup.LookupTable;
import org.apache.paimon.flink.lookup.RocksDBStateFactory;
import org.apache.paimon.flink.lookup.TableStreamingReader;
import org.apache.paimon.flink.utils.TableScanUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.predicate.Predicate;
import org.apache.paimon.predicate.PredicateBuilder;
import org.apache.paimon.predicate.PredicateFilter;
import org.apache.paimon.reader.RecordReaderIterator;
import org.apache.paimon.shade.guava30.com.google.common.primitives.Ints;
import org.apache.paimon.table.Table;
import org.apache.paimon.table.source.OutOfRangeException;
import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.FileIOUtils;
import org.apache.paimon.utils.TypeUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class FileStoreLookupFunction
implements Serializable,
Closeable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(FileStoreLookupFunction.class);
    private final Table table;
    private final List<String> projectFields;
    private final List<String> joinKeys;
    @Nullable
    private final Predicate predicate;
    private transient Duration refreshInterval;
    private transient File path;
    private transient RocksDBStateFactory stateFactory;
    private transient LookupTable lookupTable;
    private transient long nextLoadTime;
    private transient TableStreamingReader streamingReader;

    public FileStoreLookupFunction(Table table, int[] projection, int[] joinKeyIndex, @Nullable Predicate predicate) {
        TableScanUtils.streamingReadingValidate(table);
        this.table = table;
        this.joinKeys = Arrays.stream(joinKeyIndex).mapToObj(i -> table.rowType().getFieldNames().get(projection[i])).collect(Collectors.toList());
        this.projectFields = Arrays.stream(projection).mapToObj(i -> table.rowType().getFieldNames().get(i)).collect(Collectors.toList());
        for (String field : table.primaryKeys()) {
            if (this.projectFields.contains(field)) continue;
            this.projectFields.add(field);
        }
        this.predicate = predicate;
    }

    public void open(FunctionContext context) throws Exception {
        String tmpDirectory = FileStoreLookupFunction.getTmpDirectory(context);
        this.open(tmpDirectory);
    }

    void open(String tmpDirectory) throws Exception {
        this.path = new File(tmpDirectory, "lookup-" + UUID.randomUUID());
        this.open();
    }

    private void open() throws Exception {
        Options options = Options.fromMap(this.table.options());
        this.refreshInterval = options.get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL);
        this.stateFactory = new RocksDBStateFactory(this.path.toString(), options);
        List<String> fieldNames = this.table.rowType().getFieldNames();
        int[] projection = this.projectFields.stream().mapToInt(fieldNames::indexOf).toArray();
        RowType rowType = TypeUtils.project(this.table.rowType(), projection);
        PredicateFilter recordFilter = this.createRecordFilter(projection);
        this.lookupTable = LookupTable.create(this.stateFactory, rowType, this.table.primaryKeys(), this.joinKeys, recordFilter, options.get(RocksDBOptions.LOOKUP_CACHE_ROWS));
        this.nextLoadTime = -1L;
        this.streamingReader = new TableStreamingReader(this.table, projection, this.predicate);
        this.refresh();
    }

    private PredicateFilter createRecordFilter(int[] projection) {
        Predicate adjustedPredicate = null;
        if (this.predicate != null) {
            adjustedPredicate = PredicateBuilder.transformFieldMapping(this.predicate, IntStream.range(0, this.table.rowType().getFieldCount()).map(i -> Ints.indexOf(projection, i)).toArray()).orElse(null);
        }
        return new PredicateFilter(TypeUtils.project(this.table.rowType(), projection), adjustedPredicate);
    }

    public Collection<RowData> lookup(RowData keyRow) {
        try {
            this.checkRefresh();
            List<InternalRow> results = this.lookupTable.get(new FlinkRowWrapper(keyRow));
            ArrayList<RowData> rows = new ArrayList<RowData>(results.size());
            for (InternalRow matchedRow : results) {
                rows.add(new FlinkRowData(matchedRow));
            }
            return rows;
        }
        catch (OutOfRangeException e) {
            this.reopen();
            return this.lookup(keyRow);
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void reopen() {
        try {
            this.close();
            this.open();
        }
        catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    private void checkRefresh() throws Exception {
        if (this.nextLoadTime > System.currentTimeMillis()) {
            return;
        }
        if (this.nextLoadTime > 0L) {
            LOG.info("Lookup table has refreshed after {} second(s), refreshing", (Object)(this.refreshInterval.toMillis() / 1000L));
        }
        this.refresh();
        this.nextLoadTime = System.currentTimeMillis() + this.refreshInterval.toMillis();
    }

    private void refresh() throws Exception {
        while (true) {
            RecordReaderIterator<InternalRow> batch = new RecordReaderIterator<InternalRow>(this.streamingReader.nextBatch());
            Throwable throwable = null;
            try {
                if (!batch.hasNext()) {
                    return;
                }
                this.lookupTable.refresh(batch);
                continue;
            }
            catch (Throwable throwable2) {
                throwable = throwable2;
                throw throwable2;
            }
            finally {
                if (batch == null) continue;
                if (throwable != null) {
                    try {
                        batch.close();
                    }
                    catch (Throwable throwable3) {
                        throwable.addSuppressed(throwable3);
                    }
                    continue;
                }
                batch.close();
                continue;
            }
            break;
        }
    }

    @Override
    public void close() throws IOException {
        if (this.stateFactory != null) {
            this.stateFactory.close();
            this.stateFactory = null;
        }
        if (this.path != null) {
            FileIOUtils.deleteDirectoryQuietly(this.path);
        }
    }

    private static String getTmpDirectory(FunctionContext context) {
        try {
            Field field = context.getClass().getDeclaredField("context");
            field.setAccessible(true);
            StreamingRuntimeContext runtimeContext = FileStoreLookupFunction.extractStreamingRuntimeContext(field.get(context));
            String[] tmpDirectories = runtimeContext.getTaskManagerRuntimeInfo().getTmpDirectories();
            return tmpDirectories[ThreadLocalRandom.current().nextInt(tmpDirectories.length)];
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException(e);
        }
    }

    private static StreamingRuntimeContext extractStreamingRuntimeContext(Object runtimeContext) throws NoSuchFieldException, IllegalAccessException {
        if (runtimeContext instanceof StreamingRuntimeContext) {
            return (StreamingRuntimeContext)runtimeContext;
        }
        Field field = runtimeContext.getClass().getDeclaredField("runtimeContext");
        field.setAccessible(true);
        return FileStoreLookupFunction.extractStreamingRuntimeContext(field.get(runtimeContext));
    }
}

