package io.trino.plugin.deltalake;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.slice.Slices;
import io.airlift.units.DataSize;
import io.trino.filesystem.Location;
import io.trino.filesystem.TrinoFileSystemFactory;
import io.trino.plugin.base.classloader.ClassLoaderSafeConnectorSplitSource;
import io.trino.plugin.deltalake.DeltaLakeAnalyzeProperties;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesSplitSource;
import io.trino.plugin.deltalake.functions.tablechanges.TableChangesTableFunctionHandle;
import io.trino.plugin.deltalake.transactionlog.AddFileEntry;
import io.trino.plugin.deltalake.transactionlog.DeltaLakeSchemaSupport;
import io.trino.plugin.deltalake.transactionlog.TransactionLogAccess;
import io.trino.plugin.deltalake.transactionlog.TransactionLogParser;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.ColumnHandle;
import io.trino.spi.connector.ConnectorSession;
import io.trino.spi.connector.ConnectorSplitManager;
import io.trino.spi.connector.ConnectorSplitSource;
import io.trino.spi.connector.ConnectorTableHandle;
import io.trino.spi.connector.ConnectorTransactionHandle;
import io.trino.spi.connector.Constraint;
import io.trino.spi.connector.DynamicFilter;
import io.trino.spi.connector.FixedSplitSource;
import io.trino.spi.function.table.ConnectorTableFunctionHandle;
import io.trino.spi.predicate.Domain;
import io.trino.spi.predicate.NullableValue;
import io.trino.spi.predicate.TupleDomain;
import io.trino.spi.type.TypeManager;
import java.io.IOException;
import java.net.URI;
import java.net.URLDecoder;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

/* loaded from: input_file:io/trino/plugin/deltalake/DeltaLakeSplitManager.class */
public class DeltaLakeSplitManager implements ConnectorSplitManager {
    private final TypeManager typeManager;
    private final TransactionLogAccess transactionLogAccess;
    private final ExecutorService executor;
    private final int maxInitialSplits;
    private final int maxSplitsPerSecond;
    private final int maxOutstandingSplits;
    private final double minimumAssignedSplitWeight;
    private final TrinoFileSystemFactory fileSystemFactory;

    @Inject
    public DeltaLakeSplitManager(TypeManager typeManager, TransactionLogAccess transactionLogAccess, ExecutorService executorService, DeltaLakeConfig deltaLakeConfig, TrinoFileSystemFactory trinoFileSystemFactory) {
        this.typeManager = (TypeManager) Objects.requireNonNull(typeManager, "typeManager is null");
        this.transactionLogAccess = (TransactionLogAccess) Objects.requireNonNull(transactionLogAccess, "transactionLogAccess is null");
        this.executor = (ExecutorService) Objects.requireNonNull(executorService, "executor is null");
        this.maxInitialSplits = deltaLakeConfig.getMaxInitialSplits();
        this.maxSplitsPerSecond = deltaLakeConfig.getMaxSplitsPerSecond();
        this.maxOutstandingSplits = deltaLakeConfig.getMaxOutstandingSplits();
        this.minimumAssignedSplitWeight = deltaLakeConfig.getMinimumAssignedSplitWeight();
        this.fileSystemFactory = (TrinoFileSystemFactory) Objects.requireNonNull(trinoFileSystemFactory, "fileSystemFactory is null");
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableHandle connectorTableHandle, DynamicFilter dynamicFilter, Constraint constraint) {
        DeltaLakeTableHandle deltaLakeTableHandle = (DeltaLakeTableHandle) connectorTableHandle;
        return (deltaLakeTableHandle.getEnforcedPartitionConstraint().isNone() || deltaLakeTableHandle.getNonPartitionConstraint().isNone()) ? deltaLakeTableHandle.isRecordScannedFiles() ? new FixedSplitSource(ImmutableList.of(), ImmutableList.of()) : FixedSplitSource.emptySplitSource() : new ClassLoaderSafeConnectorSplitSource(new DeltaLakeSplitSource(deltaLakeTableHandle.getSchemaTableName(), getSplits(deltaLakeTableHandle, connectorSession, deltaLakeTableHandle.getMaxScannedFileSize(), dynamicFilter.getColumnsCovered(), constraint), this.executor, this.maxSplitsPerSecond, this.maxOutstandingSplits, dynamicFilter, DeltaLakeSessionProperties.getDynamicFilteringWaitTimeout(connectorSession), deltaLakeTableHandle.isRecordScannedFiles()), DeltaLakeSplitManager.class.getClassLoader());
    }

    public ConnectorSplitSource getSplits(ConnectorTransactionHandle connectorTransactionHandle, ConnectorSession connectorSession, ConnectorTableFunctionHandle connectorTableFunctionHandle) {
        if (!(connectorTableFunctionHandle instanceof TableChangesTableFunctionHandle)) {
            throw new UnsupportedOperationException("Unrecognized function: " + connectorTableFunctionHandle);
        }
        return new TableChangesSplitSource(connectorSession, this.fileSystemFactory, (TableChangesTableFunctionHandle) connectorTableFunctionHandle);
    }

    private Stream<DeltaLakeSplit> getSplits(DeltaLakeTableHandle deltaLakeTableHandle, ConnectorSession connectorSession, Optional<DataSize> optional, Set<ColumnHandle> set, Constraint constraint) {
        try {
            List<AddFileEntry> activeFiles = this.transactionLogAccess.getActiveFiles(this.transactionLogAccess.loadSnapshot(deltaLakeTableHandle.getSchemaTableName(), deltaLakeTableHandle.getLocation(), connectorSession), connectorSession);
            TupleDomain<DeltaLakeColumnHandle> enforcedPartitionConstraint = deltaLakeTableHandle.getEnforcedPartitionConstraint();
            TupleDomain<DeltaLakeColumnHandle> nonPartitionConstraint = deltaLakeTableHandle.getNonPartitionConstraint();
            Domain pathDomain = getPathDomain(nonPartitionConstraint);
            boolean z = deltaLakeTableHandle.getWriteType().isEmpty() && mayAnyDataColumnProjected(deltaLakeTableHandle);
            AtomicInteger atomicInteger = new AtomicInteger(this.maxInitialSplits);
            Optional<U> flatMap = deltaLakeTableHandle.getAnalyzeHandle().flatMap((v0) -> {
                return v0.getFilesModifiedAfter();
            });
            Optional<U> map = optional.map((v0) -> {
                return v0.toBytes();
            });
            Stream stream = ((Map) nonPartitionConstraint.getDomains().orElseThrow()).keySet().stream();
            Stream<ColumnHandle> stream2 = set.stream();
            Class<DeltaLakeColumnHandle> cls = DeltaLakeColumnHandle.class;
            Objects.requireNonNull(DeltaLakeColumnHandle.class);
            Set set2 = (Set) Stream.concat(stream, stream2.map((v1) -> {
                return r2.cast(v1);
            })).map((v0) -> {
                return v0.getBaseColumnName();
            }).collect(ImmutableSet.toImmutableSet());
            List list = (List) DeltaLakeSchemaSupport.extractSchema(deltaLakeTableHandle.getMetadataEntry(), this.typeManager).stream().filter(deltaLakeColumnMetadata -> {
                return set2.contains(deltaLakeColumnMetadata.getName());
            }).collect(ImmutableList.toImmutableList());
            return activeFiles.stream().flatMap(addFileEntry -> {
                if (deltaLakeTableHandle.getAnalyzeHandle().isPresent() && deltaLakeTableHandle.getAnalyzeHandle().get().getAnalyzeMode() != DeltaLakeAnalyzeProperties.AnalyzeMode.FULL_REFRESH && !addFileEntry.isDataChange()) {
                    return Stream.empty();
                }
                String location = buildSplitPath(Location.of(deltaLakeTableHandle.getLocation()), addFileEntry).toString();
                if (!pathMatchesPredicate(pathDomain, location)) {
                    return Stream.empty();
                }
                if (flatMap.isPresent() && addFileEntry.getModificationTime() <= ((Instant) flatMap.get()).toEpochMilli()) {
                    return Stream.empty();
                }
                if (map.isPresent() && addFileEntry.getSize() > ((Long) map.get()).longValue()) {
                    return Stream.empty();
                }
                if (!partitionMatchesPredicate(addFileEntry.getCanonicalPartitionValues(), (Map) enforcedPartitionConstraint.getDomains().orElseThrow())) {
                    return Stream.empty();
                }
                TupleDomain<DeltaLakeColumnHandle> createStatisticsPredicate = DeltaLakeMetadata.createStatisticsPredicate(addFileEntry, list, deltaLakeTableHandle.getMetadataEntry().getLowercasePartitionColumns());
                if (!nonPartitionConstraint.overlaps(createStatisticsPredicate)) {
                    return Stream.empty();
                }
                if (constraint.predicate().isPresent()) {
                    Map<String, Optional<String>> canonicalPartitionValues = addFileEntry.getCanonicalPartitionValues();
                    Stream stream3 = ((Set) constraint.getPredicateColumns().orElseThrow()).stream();
                    Class<DeltaLakeColumnHandle> cls2 = DeltaLakeColumnHandle.class;
                    Objects.requireNonNull(DeltaLakeColumnHandle.class);
                    if (!((Predicate) constraint.predicate().get()).test((Map) stream3.map((v1) -> {
                        return r1.cast(v1);
                    }).filter(deltaLakeColumnHandle -> {
                        return deltaLakeColumnHandle.isBaseColumn() && canonicalPartitionValues.containsKey(deltaLakeColumnHandle.getBaseColumnName());
                    }).collect(ImmutableMap.toImmutableMap(Function.identity(), deltaLakeColumnHandle2 -> {
                        return new NullableValue(deltaLakeColumnHandle2.getBaseType(), TransactionLogParser.deserializePartitionValue(deltaLakeColumnHandle2, (Optional) canonicalPartitionValues.get(deltaLakeColumnHandle2.getBaseColumnName())));
                    })))) {
                        return Stream.empty();
                    }
                }
                return splitsForFile(connectorSession, addFileEntry, location, addFileEntry.getCanonicalPartitionValues(), createStatisticsPredicate, z, atomicInteger).stream();
            });
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
    }

    private static boolean mayAnyDataColumnProjected(DeltaLakeTableHandle deltaLakeTableHandle) {
        if (deltaLakeTableHandle.getProjectedColumns().isEmpty()) {
            return true;
        }
        Stream<R> map = deltaLakeTableHandle.getProjectedColumns().get().stream().map((v0) -> {
            return v0.getColumnType();
        });
        DeltaLakeColumnType deltaLakeColumnType = DeltaLakeColumnType.REGULAR;
        Objects.requireNonNull(deltaLakeColumnType);
        return map.anyMatch((v1) -> {
            return r1.equals(v1);
        });
    }

    public static boolean partitionMatchesPredicate(Map<String, Optional<String>> map, Map<DeltaLakeColumnHandle, Domain> map2) {
        for (Map.Entry<DeltaLakeColumnHandle, Domain> entry : map2.entrySet()) {
            DeltaLakeColumnHandle key = entry.getKey();
            if (!entry.getValue().includesNullableValue(TransactionLogParser.deserializePartitionValue(key, map.get(key.getBasePhysicalColumnName())))) {
                return false;
            }
        }
        return true;
    }

    private static Domain getPathDomain(TupleDomain<DeltaLakeColumnHandle> tupleDomain) {
        return (Domain) tupleDomain.getDomains().flatMap(map -> {
            return Optional.ofNullable((Domain) map.get(DeltaLakeColumnHandle.pathColumnHandle()));
        }).orElseGet(() -> {
            return Domain.all(DeltaLakeColumnHandle.pathColumnHandle().getBaseType());
        });
    }

    private static boolean pathMatchesPredicate(Domain domain, String str) {
        return domain.includesNullableValue(Slices.utf8Slice(str));
    }

    private List<DeltaLakeSplit> splitsForFile(ConnectorSession connectorSession, AddFileEntry addFileEntry, String str, Map<String, Optional<String>> map, TupleDomain<DeltaLakeColumnHandle> tupleDomain, boolean z, AtomicInteger atomicInteger) {
        long size = addFileEntry.getSize();
        if (!z) {
            return ImmutableList.of(new DeltaLakeSplit(str, 0L, size, size, addFileEntry.getStats().flatMap((v0) -> {
                return v0.getNumRecords();
            }), addFileEntry.getModificationTime(), SplitWeight.standard(), tupleDomain, map));
        }
        ImmutableList.Builder builder = ImmutableList.builder();
        long j = 0;
        while (true) {
            long j2 = j;
            if (j2 >= size) {
                return builder.build();
            }
            long bytes = (atomicInteger.get() <= 0 || atomicInteger.getAndDecrement() <= 0) ? DeltaLakeSessionProperties.getMaxSplitSize(connectorSession).toBytes() : DeltaLakeSessionProperties.getMaxInitialSplitSize(connectorSession).toBytes();
            long min = Math.min(bytes, size - j2);
            builder.add(new DeltaLakeSplit(str, j2, min, size, Optional.empty(), addFileEntry.getModificationTime(), SplitWeight.fromProportion(Math.min(Math.max(min / bytes, this.minimumAssignedSplitWeight), 1.0d)), tupleDomain, map));
            j = j2 + min;
        }
    }

    private static Location buildSplitPath(Location location, AddFileEntry addFileEntry) {
        String path = URI.create(addFileEntry.getPath()).getPath();
        Optional scheme = location.scheme();
        if (scheme.isPresent() && (((String) scheme.get()).equals("abfs") || ((String) scheme.get()).equals("abfss"))) {
            path = URLDecoder.decode(path.replace("+", "%2B"), StandardCharsets.UTF_8);
        }
        return location.appendPath(path);
    }
}
