package org.apache.flink.table.planner.runtime.batch.sql;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Stream;
import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.factories.TestManagedTableFactory;
import org.apache.flink.table.planner.runtime.utils.BatchTestBase;
import org.apache.flink.table.utils.PartitionPathUtils;
import org.assertj.core.api.Assertions;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;

/* loaded from: input_file:org/apache/flink/table/planner/runtime/batch/sql/CompactManagedTableITCase.class */
public class CompactManagedTableITCase extends BatchTestBase {
    private final ObjectIdentifier tableIdentifier = ObjectIdentifier.of(tEnv().getCurrentCatalog(), tEnv().getCurrentDatabase(), "MyTable");
    private final Map<CatalogPartitionSpec, List<RowData>> collectedElements = new HashMap();
    private Path rootPath;
    private AtomicReference<Map<CatalogPartitionSpec, List<Path>>> referenceOfManagedTableFileEntries;

    @Override // org.apache.flink.table.planner.runtime.utils.BatchTestBase
    @Before
    public void before() throws Exception {
        super.before();
        TestManagedTableFactory.MANAGED_TABLES.put(this.tableIdentifier, new AtomicReference());
        this.referenceOfManagedTableFileEntries = new AtomicReference<>();
        TestManagedTableFactory.MANAGED_TABLE_FILE_ENTRIES.put(this.tableIdentifier, this.referenceOfManagedTableFileEntries);
        try {
            this.rootPath = new Path(new Path(createTempFolder().getPath()), this.tableIdentifier.asSummaryString());
            this.rootPath.getFileSystem().mkdirs(this.rootPath);
        } catch (IOException e) {
            Assertions.fail(String.format("Failed to create dir for %s", this.rootPath), e);
        }
    }

    @Override // org.apache.flink.table.planner.runtime.utils.BatchTestBase
    @After
    public void after() {
        super.after();
        tEnv().executeSql("DROP TABLE MyTable");
        this.collectedElements.clear();
        try {
            this.rootPath.getFileSystem().delete(this.rootPath, true);
        } catch (IOException e) {
            Assertions.fail(String.format("Failed to delete dir for %s", this.rootPath), e);
        }
    }

    @Test
    public void testCompactPartitionOnNonPartitionedTable() {
        tEnv().executeSql("CREATE TABLE MyTable (id BIGINT, content STRING)");
        Assertions.assertThatThrownBy(() -> {
            tEnv().executeSql("ALTER TABLE MyTable PARTITION (season = 'summer') COMPACT");
        }).isInstanceOf(ValidationException.class).hasMessageContaining(String.format("Table %s is not partitioned.", this.tableIdentifier));
    }

    @Test
    public void testCompactPartitionOnNonExistedPartitionKey() {
        tEnv().executeSql("CREATE TABLE MyTable (\n  id BIGINT,\n  content STRING,\n  season STRING\n) PARTITIONED BY (season)");
        Assertions.assertThatThrownBy(() -> {
            tEnv().executeSql("ALTER TABLE MyTable PARTITION (saeson = 'summer') COMPACT");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Partition column 'saeson' not defined in the table schema. Available ordered partition columns: ['season']");
    }

    @Test
    public void testCompactPartitionOnNonExistedPartitionValue() throws Exception {
        prepare("CREATE TABLE MyTable (\n  id BIGINT,\n  content STRING,\n  season STRING\n) PARTITIONED BY (season)", Collections.singletonList(of("season", "'spring'")));
        Assertions.assertThatThrownBy(() -> {
            tEnv().executeSql("ALTER TABLE MyTable PARTITION (season = 'summer') COMPACT");
        }).isInstanceOf(ValidationException.class).hasMessageContaining("Cannot resolve partition spec CatalogPartitionSpec{{season=summer}}");
    }

    @Test
    public void testCompactNonPartitionedTable() throws Exception {
        prepare("CREATE TABLE MyTable (id BIGINT, content STRING)", Collections.emptyList());
        CatalogPartitionSpec catalogPartitionSpec = new CatalogPartitionSpec(Collections.emptyMap());
        executeAndCheck(catalogPartitionSpec, Collections.singleton(catalogPartitionSpec));
    }

    @Test
    public void testCompactSinglePartitionedTable() throws Exception {
        prepare("CREATE TABLE MyTable (\n  id BIGINT,\n  content STRING,\n  season STRING\n) PARTITIONED BY (season)", Arrays.asList(of("season", "'spring'"), of("season", "'summer'")));
        HashSet hashSet = new HashSet();
        CatalogPartitionSpec catalogPartitionSpec = new CatalogPartitionSpec(of("season", "'summer'"));
        hashSet.add(new CatalogPartitionSpec(of("season", "summer")));
        executeAndCheck(catalogPartitionSpec, hashSet);
        CatalogPartitionSpec catalogPartitionSpec2 = new CatalogPartitionSpec(Collections.emptyMap());
        hashSet.add(new CatalogPartitionSpec(of("season", "spring")));
        executeAndCheck(catalogPartitionSpec2, hashSet);
    }

    @Test
    public void testCompactMultiPartitionedTable() throws Exception {
        prepare("CREATE TABLE MyTable (  id BIGINT,\n  content STRING,\n  season STRING,\n  `month` INT\n) PARTITIONED BY (season, `month`)", Arrays.asList(of("season", "'spring'", "`month`", "2"), of("season", "'spring'", "`month`", "3"), of("season", "'spring'", "`month`", "4"), of("season", "'summer'", "`month`", "5"), of("season", "'summer'", "`month`", "6"), of("season", "'summer'", "`month`", "7"), of("season", "'summer'", "`month`", "8"), of("season", "'autumn'", "`month`", "8"), of("season", "'autumn'", "`month`", "9"), of("season", "'autumn'", "`month`", "10"), of("season", "'winter'", "`month`", "11"), of("season", "'winter'", "`month`", "12"), of("season", "'winter'", "`month`", "1")));
        HashSet hashSet = new HashSet();
        CatalogPartitionSpec catalogPartitionSpec = new CatalogPartitionSpec(of("season", "'spring'", "`month`", "2"));
        hashSet.add(new CatalogPartitionSpec(of("season", "spring", "month", "2")));
        executeAndCheck(catalogPartitionSpec, hashSet);
        CatalogPartitionSpec catalogPartitionSpec2 = new CatalogPartitionSpec(of("`month`", "3", "season", "'spring'"));
        hashSet.add(new CatalogPartitionSpec(of("season", "spring", "month", "3")));
        executeAndCheck(catalogPartitionSpec2, hashSet);
        CatalogPartitionSpec catalogPartitionSpec3 = new CatalogPartitionSpec(of("season", "'winter'"));
        hashSet.add(new CatalogPartitionSpec(of("season", "winter", "month", "1")));
        hashSet.add(new CatalogPartitionSpec(of("season", "winter", "month", "11")));
        hashSet.add(new CatalogPartitionSpec(of("season", "winter", "month", "12")));
        executeAndCheck(catalogPartitionSpec3, hashSet);
        CatalogPartitionSpec catalogPartitionSpec4 = new CatalogPartitionSpec(of("`month`", "5"));
        hashSet.add(new CatalogPartitionSpec(of("season", "summer", "month", "5")));
        executeAndCheck(catalogPartitionSpec4, hashSet);
        CatalogPartitionSpec catalogPartitionSpec5 = new CatalogPartitionSpec(of("`month`", "8"));
        hashSet.add(new CatalogPartitionSpec(of("season", "summer", "month", "8")));
        hashSet.add(new CatalogPartitionSpec(of("season", "autumn", "month", "8")));
        executeAndCheck(catalogPartitionSpec5, hashSet);
        CatalogPartitionSpec catalogPartitionSpec6 = new CatalogPartitionSpec(Collections.emptyMap());
        hashSet.add(new CatalogPartitionSpec(of("season", "spring", "month", "4")));
        hashSet.add(new CatalogPartitionSpec(of("season", "summer", "month", "6")));
        hashSet.add(new CatalogPartitionSpec(of("season", "summer", "month", "7")));
        hashSet.add(new CatalogPartitionSpec(of("season", "autumn", "month", "9")));
        hashSet.add(new CatalogPartitionSpec(of("season", "autumn", "month", "10")));
        executeAndCheck(catalogPartitionSpec6, hashSet);
    }

    private void prepare(String str, List<LinkedHashMap<String, String>> list) throws Exception {
        prepareMirrorTables(str);
        prepareFileEntries(list);
        scanFileEntries();
    }

    private void prepareMirrorTables(String str) {
        tEnv().executeSql(str);
        String format = String.format("CREATE TABLE HelperSink WITH (  'connector' = 'filesystem',   'format' = 'testcsv',   'path' = '%s' )LIKE MyTable (EXCLUDING OPTIONS)", this.rootPath.getPath());
        tEnv().executeSql("CREATE TABLE HelperSource (id BIGINT, content STRING ) WITH (  'connector' = 'datagen',   'rows-per-second' = '5',   'fields.id.kind' = 'sequence',   'fields.id.start' = '0',   'fields.id.end' = '200',   'fields.content.kind' = 'random',   'number-of-rows' = '50')");
        tEnv().executeSql(format);
    }

    private void prepareFileEntries(List<LinkedHashMap<String, String>> list) throws Exception {
        tEnv().executeSql(prepareInsertDML(list)).await();
    }

    private static String prepareInsertDML(List<LinkedHashMap<String, String>> list) {
        StringBuilder sb = new StringBuilder("INSERT INTO HelperSink\n");
        if (list.isEmpty()) {
            return sb.append("SELECT id,\n  content\nFROM HelperSource\n").toString();
        }
        for (int i = 0; i < list.size(); i++) {
            sb.append("SELECT id,\n  content,\n");
            int i2 = 0;
            for (Map.Entry<String, String> entry : list.get(i).entrySet()) {
                sb.append("  ");
                sb.append(entry.getValue());
                sb.append(" AS ");
                sb.append(entry.getKey());
                if (i2 < list.get(i).size() - 1) {
                    sb.append(",\n");
                } else {
                    sb.append("\n");
                }
                i2++;
            }
            sb.append("FROM HelperSource\n");
            if (i < list.size() - 1) {
                sb.append("UNION ALL\n");
            }
        }
        return sb.toString();
    }

    private void scanFileEntries() throws IOException {
        HashMap hashMap = new HashMap();
        Stream<java.nio.file.Path> walk = Files.walk(Paths.get(this.rootPath.getPath(), new String[0]), new FileVisitOption[0]);
        Throwable th = null;
        try {
            try {
                walk.filter(path -> {
                    return Files.isRegularFile(path, new LinkOption[0]);
                }).forEach(path2 -> {
                    Path path2 = new Path(path2.toString());
                    CatalogPartitionSpec catalogPartitionSpec = new CatalogPartitionSpec(PartitionPathUtils.extractPartitionSpecFromPath(path2));
                    List list = (List) hashMap.getOrDefault(catalogPartitionSpec, new ArrayList());
                    list.add(path2);
                    hashMap.put(catalogPartitionSpec, list);
                    List<RowData> orDefault = this.collectedElements.getOrDefault(catalogPartitionSpec, new ArrayList());
                    orDefault.addAll(readElementsFromFile(path2.toFile()));
                    this.collectedElements.put(catalogPartitionSpec, orDefault);
                });
                if (walk != null) {
                    if (0 != 0) {
                        try {
                            walk.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        walk.close();
                    }
                }
                this.referenceOfManagedTableFileEntries.set(hashMap);
            } finally {
            }
        } catch (Throwable th3) {
            if (walk != null) {
                if (th != null) {
                    try {
                        walk.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    walk.close();
                }
            }
            throw th3;
        }
    }

    private static List<RowData> readElementsFromFile(File file) {
        ArrayList arrayList = new ArrayList();
        try {
            BufferedReader bufferedReader = new BufferedReader(new FileReader(file));
            Throwable th = null;
            while (true) {
                try {
                    try {
                        String readLine = bufferedReader.readLine();
                        if (readLine == null) {
                            break;
                        }
                        arrayList.add(GenericRowData.of(new Object[]{StringData.fromString(readLine)}));
                    } finally {
                    }
                } finally {
                }
            }
            if (bufferedReader != null) {
                if (0 != 0) {
                    try {
                        bufferedReader.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                } else {
                    bufferedReader.close();
                }
            }
        } catch (IOException e) {
            Assertions.fail("This should not happen");
        }
        return arrayList;
    }

    private LinkedHashMap<String, String> of(String... strArr) {
        Assertions.assertThat(strArr != null && strArr.length % 2 == 0).isTrue();
        LinkedHashMap<String, String> linkedHashMap = new LinkedHashMap<>();
        for (int i = 0; i < strArr.length - 1; i += 2) {
            linkedHashMap.put(strArr[i], strArr[i + 1]);
        }
        return linkedHashMap;
    }

    private static String prepareCompactSql(CatalogPartitionSpec catalogPartitionSpec) {
        Map partitionSpec = catalogPartitionSpec.getPartitionSpec();
        StringBuilder sb = new StringBuilder();
        int i = 0;
        for (Map.Entry entry : partitionSpec.entrySet()) {
            if (i == 0) {
                sb.append(" PARTITION (");
            }
            sb.append((String) entry.getKey());
            sb.append(" = ");
            sb.append((String) entry.getValue());
            if (i < partitionSpec.size() - 1) {
                sb.append(", ");
            }
            if (i == partitionSpec.size() - 1) {
                sb.append(")");
            }
            i++;
        }
        return String.format("ALTER TABLE MyTable%s COMPACT", sb);
    }

    private void executeAndCheck(CatalogPartitionSpec catalogPartitionSpec, Set<CatalogPartitionSpec> set) throws ExecutionException, InterruptedException {
        String prepareCompactSql = prepareCompactSql(catalogPartitionSpec);
        tEnv().executeSql(prepareCompactSql).await();
        Map<CatalogPartitionSpec, Long> checkFileAndElements = checkFileAndElements(set);
        tEnv().executeSql(prepareCompactSql).await();
        checkModifiedTime(checkFileAndElements, checkFileAndElements(set));
    }

    private Map<CatalogPartitionSpec, Long> checkFileAndElements(Set<CatalogPartitionSpec> set) {
        HashMap hashMap = new HashMap();
        this.referenceOfManagedTableFileEntries.get().forEach((catalogPartitionSpec, list) -> {
            if (!set.contains(catalogPartitionSpec)) {
                list.forEach(path -> {
                    Assertions.assertThat(path.getName()).startsWith("part-");
                    Assertions.assertThat(this.collectedElements.get(catalogPartitionSpec)).containsAll(readElementsFromFile(new File(path.getPath())));
                });
                return;
            }
            Assertions.assertThat(list).hasSize(1);
            Path path2 = (Path) list.get(0);
            Assertions.assertThat(path2.getName()).startsWith("compact-");
            Assertions.assertThat(readElementsFromFile(new File(path2.getPath()))).hasSameElementsAs(this.collectedElements.get(catalogPartitionSpec));
            hashMap.put(catalogPartitionSpec, Long.valueOf(getLastModifiedTime(path2)));
        });
        return hashMap;
    }

    private void checkModifiedTime(Map<CatalogPartitionSpec, Long> map, Map<CatalogPartitionSpec, Long> map2) {
        map.forEach((catalogPartitionSpec, l) -> {
            Assertions.assertThat((Long) map2.get(catalogPartitionSpec)).isEqualTo(l).isNotEqualTo(-1L);
        });
    }

    private static long getLastModifiedTime(Path path) {
        try {
            return path.getFileSystem().getFileStatus(path).getModificationTime();
        } catch (IOException e) {
            Assertions.fail("This should not happen");
            return -1L;
        }
    }
}
