/*
 * Decompiled with CFR 0.152.
 */
package test.org.apache.spark.sql.connector;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.util.Arrays;
import java.util.EnumSet;
import java.util.Iterator;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.deploy.SparkHadoopUtil;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
import org.apache.spark.sql.connector.SimpleCounter;
import org.apache.spark.sql.connector.TestingV2Source;
import org.apache.spark.sql.connector.catalog.SupportsWrite;
import org.apache.spark.sql.connector.catalog.Table;
import org.apache.spark.sql.connector.catalog.TableCapability;
import org.apache.spark.sql.connector.read.InputPartition;
import org.apache.spark.sql.connector.read.PartitionReader;
import org.apache.spark.sql.connector.read.PartitionReaderFactory;
import org.apache.spark.sql.connector.read.ScanBuilder;
import org.apache.spark.sql.connector.write.BatchWrite;
import org.apache.spark.sql.connector.write.DataWriter;
import org.apache.spark.sql.connector.write.DataWriterFactory;
import org.apache.spark.sql.connector.write.LogicalWriteInfo;
import org.apache.spark.sql.connector.write.PhysicalWriteInfo;
import org.apache.spark.sql.connector.write.SupportsTruncate;
import org.apache.spark.sql.connector.write.Write;
import org.apache.spark.sql.connector.write.WriteBuilder;
import org.apache.spark.sql.connector.write.WriterCommitMessage;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
import org.apache.spark.util.SerializableConfiguration;
import test.org.apache.spark.sql.connector.JavaSimpleBatchTable;
import test.org.apache.spark.sql.connector.JavaSimpleScanBuilder;

public class JavaSimpleWritableDataSource
implements TestingV2Source {
    @Override
    public Table getTable(CaseInsensitiveStringMap options) {
        return new MyTable(options);
    }

    static class MyTable
    extends JavaSimpleBatchTable
    implements SupportsWrite {
        private final String path;
        private final Configuration conf = SparkHadoopUtil.get().conf();

        MyTable(CaseInsensitiveStringMap options) {
            this.path = options.get((Object)"path");
        }

        public ScanBuilder newScanBuilder(CaseInsensitiveStringMap options) {
            return new MyScanBuilder(new Path(this.path).toUri().toString(), this.conf);
        }

        public WriteBuilder newWriteBuilder(LogicalWriteInfo info) {
            return new MyWriteBuilder(this.path, info);
        }

        @Override
        public Set<TableCapability> capabilities() {
            return EnumSet.of(TableCapability.BATCH_READ, TableCapability.BATCH_WRITE, TableCapability.TRUNCATE);
        }
    }

    static class JavaCSVDataWriter
    implements DataWriter<InternalRow> {
        private final FileSystem fs;
        private final Path file;
        private final FSDataOutputStream out;

        JavaCSVDataWriter(FileSystem fs, Path file) throws IOException {
            this.fs = fs;
            this.file = file;
            this.out = fs.create(file);
        }

        public void write(InternalRow record) throws IOException {
            this.out.writeBytes(String.format("%d,%d\n", record.getInt(0), record.getInt(1)));
        }

        public WriterCommitMessage commit() throws IOException {
            this.out.close();
            return null;
        }

        public void abort() throws IOException {
            try {
                this.out.close();
            }
            finally {
                this.fs.delete(this.file, false);
            }
        }

        public void close() {
        }
    }

    static class JavaCSVDataWriterFactory
    implements DataWriterFactory {
        private final String path;
        private final String jobId;
        private final SerializableConfiguration conf;

        JavaCSVDataWriterFactory(String path, String jobId, SerializableConfiguration conf) {
            this.path = path;
            this.jobId = jobId;
            this.conf = conf;
        }

        public DataWriter<InternalRow> createWriter(int partitionId, long taskId) {
            try {
                Path jobPath = new Path(new Path(this.path, "_temporary"), this.jobId);
                Path filePath = new Path(jobPath, String.format("%s-%d-%d", this.jobId, partitionId, taskId));
                FileSystem fs = filePath.getFileSystem(this.conf.value());
                return new JavaCSVDataWriter(fs, filePath);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class JavaCSVReaderFactory
    implements PartitionReaderFactory {
        private final SerializableConfiguration conf;

        JavaCSVReaderFactory(SerializableConfiguration conf) {
            this.conf = conf;
        }

        public PartitionReader<InternalRow> createReader(InputPartition partition) {
            String path = ((JavaCSVInputPartitionReader)partition).getPath();
            final Path filePath = new Path(path);
            try {
                final FileSystem fs = filePath.getFileSystem(this.conf.value());
                return new PartitionReader<InternalRow>(){
                    private final FSDataInputStream inputStream;
                    private final Iterator<String> lines;
                    private String currentLine;
                    {
                        this.inputStream = fs.open(filePath);
                        this.lines = new BufferedReader(new InputStreamReader((InputStream)this.inputStream)).lines().iterator();
                        this.currentLine = "";
                    }

                    public boolean next() {
                        if (this.lines.hasNext()) {
                            this.currentLine = this.lines.next();
                            return true;
                        }
                        return false;
                    }

                    public InternalRow get() {
                        Object[] objects = Arrays.stream(this.currentLine.split(",")).map(String::trim).map(Integer::parseInt).toArray();
                        return new GenericInternalRow(objects);
                    }

                    public void close() throws IOException {
                        this.inputStream.close();
                    }
                };
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class JavaCSVInputPartitionReader
    implements InputPartition {
        private String path;

        JavaCSVInputPartitionReader(String path) {
            this.path = path;
        }

        public String getPath() {
            return this.path;
        }

        public void setPath(String path) {
            this.path = path;
        }
    }

    static class MyBatchWrite
    implements BatchWrite {
        private final String queryId;
        private final String path;
        private final Configuration conf;

        MyBatchWrite(String queryId, String path, Configuration conf) {
            this.queryId = queryId;
            this.path = path;
            this.conf = conf;
        }

        public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
            SimpleCounter.resetCounter();
            return new JavaCSVDataWriterFactory(this.path, this.queryId, new SerializableConfiguration(this.conf));
        }

        public void onDataWriterCommit(WriterCommitMessage message) {
            SimpleCounter.increaseCounter();
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void commit(WriterCommitMessage[] messages) {
            Path finalPath = new Path(this.path);
            Path jobPath = new Path(new Path(finalPath, "_temporary"), this.queryId);
            try {
                FileSystem fs = jobPath.getFileSystem(this.conf);
                FileStatus[] fileStatuses = fs.listStatus(jobPath);
                try {
                    for (FileStatus status : fileStatuses) {
                        Path dest;
                        Path file = status.getPath();
                        if (fs.rename(file, dest = new Path(finalPath, file.getName()))) continue;
                        throw new IOException(String.format("failed to rename(%s, %s)", file, dest));
                    }
                }
                finally {
                    fs.delete(jobPath, true);
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        public void abort(WriterCommitMessage[] messages) {
            try {
                Path jobPath = new Path(new Path(this.path, "_temporary"), this.queryId);
                FileSystem fs = jobPath.getFileSystem(this.conf);
                fs.delete(jobPath, true);
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
    }

    static class MyWrite
    implements Write {
        private final String path;
        private final String queryId;
        private final boolean needTruncate;

        MyWrite(String path, String queryId, boolean needTruncate) {
            this.path = path;
            this.queryId = queryId;
            this.needTruncate = needTruncate;
        }

        public BatchWrite toBatch() {
            Path hadoopPath = new Path(this.path);
            Configuration hadoopConf = SparkHadoopUtil.get().conf();
            try {
                FileSystem fs = hadoopPath.getFileSystem(hadoopConf);
                if (this.needTruncate) {
                    fs.delete(hadoopPath, true);
                }
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
            String pathStr = hadoopPath.toUri().toString();
            return new MyBatchWrite(this.queryId, pathStr, hadoopConf);
        }
    }

    static class MyWriteBuilder
    implements WriteBuilder,
    SupportsTruncate {
        private final String path;
        private final String queryId;
        private boolean needTruncate = false;

        MyWriteBuilder(String path, LogicalWriteInfo info) {
            this.path = path;
            this.queryId = info.queryId();
        }

        public WriteBuilder truncate() {
            this.needTruncate = true;
            return this;
        }

        public Write build() {
            return new MyWrite(this.path, this.queryId, this.needTruncate);
        }
    }

    static class MyScanBuilder
    extends JavaSimpleScanBuilder {
        private final String path;
        private final Configuration conf;

        MyScanBuilder(String path, Configuration conf) {
            this.path = path;
            this.conf = conf;
        }

        public InputPartition[] planInputPartitions() {
            Path dataPath = new Path(this.path);
            try {
                FileSystem fs = dataPath.getFileSystem(this.conf);
                if (fs.exists(dataPath)) {
                    return (InputPartition[])Arrays.stream(fs.listStatus(dataPath)).filter(status -> {
                        String name = status.getPath().getName();
                        return !name.startsWith("_") && !name.startsWith(".");
                    }).map(f -> new JavaCSVInputPartitionReader(f.getPath().toUri().toString())).toArray(InputPartition[]::new);
                }
                return new InputPartition[0];
            }
            catch (IOException e) {
                throw new RuntimeException(e);
            }
        }

        @Override
        public PartitionReaderFactory createReaderFactory() {
            SerializableConfiguration serializableConf = new SerializableConfiguration(this.conf);
            return new JavaCSVReaderFactory(serializableConf);
        }
    }
}

