package org.apache.seatunnel.connectors.seatunnel.file.sink;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.UUID;
import java.util.stream.Collectors;
import org.apache.commons.cli.HelpFormatter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
import org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;

/* loaded from: input_file:org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.class */
public class BaseFileSinkWriter implements SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> {
    private final WriteStrategy writeStrategy;
    private final FileSystemUtils fileSystemUtils;

    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String str, List<FileSinkState> list) {
        this.writeStrategy = writeStrategy;
        this.fileSystemUtils = writeStrategy.getFileSystemUtils();
        int indexOfSubtask = context.getIndexOfSubtask();
        String uuidPrefix = !list.isEmpty() ? list.get(0).getUuidPrefix() : UUID.randomUUID().toString().replaceAll(HelpFormatter.DEFAULT_OPT_PREFIX, "").substring(0, 10);
        writeStrategy.init(hadoopConf, str, uuidPrefix, indexOfSubtask);
        if (list.isEmpty()) {
            writeStrategy.beginTransaction(1L);
            return;
        }
        try {
            List<String> findTransactionList = findTransactionList(str, uuidPrefix);
            FileSinkAggregatedCommitter fileSinkAggregatedCommitter = new FileSinkAggregatedCommitter(this.fileSystemUtils);
            HashMap hashMap = new HashMap();
            list.forEach(fileSinkState -> {
            });
            for (String str2 : findTransactionList) {
                if (hashMap.containsKey(str2)) {
                    FileSinkState fileSinkState2 = (FileSinkState) hashMap.get(str2);
                    fileSinkAggregatedCommitter.commit(Collections.singletonList(fileSinkAggregatedCommitter.combine(Collections.singletonList(new FileCommitInfo(fileSinkState2.getNeedMoveFiles(), fileSinkState2.getPartitionDirAndValuesMap(), fileSinkState2.getTransactionDir())))));
                } else {
                    writeStrategy.abortPrepare(str2);
                }
            }
            writeStrategy.beginTransaction(Long.valueOf(list.get(0).getCheckpointId().longValue() + 1));
        } catch (IOException e) {
            throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, String.format("Try to process these fileStates %s failed", list), e);
        }
    }

    private List<String> findTransactionList(String str, String str2) throws IOException {
        return (List) this.fileSystemUtils.dirList(AbstractWriteStrategy.getTransactionDirPrefix(this.writeStrategy.getFileSinkConfig().getTmpPath(), str, str2)).stream().map((v0) -> {
            return v0.getName();
        }).collect(Collectors.toList());
    }

    public BaseFileSinkWriter(WriteStrategy writeStrategy, HadoopConf hadoopConf, SinkWriter.Context context, String str) {
        this(writeStrategy, hadoopConf, context, str, Collections.emptyList());
        writeStrategy.beginTransaction(1L);
    }

    public void write(SeaTunnelRow seaTunnelRow) throws IOException {
        try {
            this.writeStrategy.write(seaTunnelRow);
        } catch (FileConnectorException e) {
            throw new FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, String.format("Write this data [%s] to file failed", seaTunnelRow), e);
        }
    }

    public Optional<FileCommitInfo> prepareCommit() throws IOException {
        return this.writeStrategy.prepareCommit();
    }

    public void abortPrepare() {
        this.writeStrategy.abortPrepare();
    }

    public List<FileSinkState> snapshotState(long j) throws IOException {
        return this.writeStrategy.snapshotState(j);
    }

    public void close() throws IOException {
    }
}
