package org.apache.seatunnel.engine.server.task.flow;

import java.io.IOException;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.seatunnel.api.table.type.Record;
import org.apache.seatunnel.api.transform.Collector;
import org.apache.seatunnel.api.transform.SeaTunnelTransform;
import org.apache.seatunnel.engine.core.dag.actions.TransformChainAction;
import org.apache.seatunnel.engine.server.checkpoint.ActionStateKey;
import org.apache.seatunnel.engine.server.checkpoint.ActionSubtaskState;
import org.apache.seatunnel.engine.server.checkpoint.CheckpointBarrier;
import org.apache.seatunnel.engine.server.task.SeaTunnelTask;
import org.apache.seatunnel.engine.server.task.record.Barrier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/seatunnel/engine/server/task/flow/TransformFlowLifeCycle.class */
public class TransformFlowLifeCycle<T> extends ActionFlowLifeCycle implements OneInputFlowLifeCycle<Record<?>> {
    private static final Logger log = LoggerFactory.getLogger((Class<?>) TransformFlowLifeCycle.class);
    private final TransformChainAction<T> action;
    private final List<SeaTunnelTransform<T>> transform;
    private final Collector<Record<?>> collector;

    public TransformFlowLifeCycle(TransformChainAction<T> transformChainAction, SeaTunnelTask seaTunnelTask, Collector<Record<?>> collector, CompletableFuture<Void> completableFuture) {
        super(transformChainAction, seaTunnelTask, completableFuture);
        this.action = transformChainAction;
        this.transform = transformChainAction.getTransforms();
        this.collector = collector;
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void open() throws Exception {
        super.open();
        for (SeaTunnelTransform<T> seaTunnelTransform : this.transform) {
            try {
                seaTunnelTransform.open();
            } catch (Exception e) {
                log.error("Open transform: {} failed, cause: {}", seaTunnelTransform.getPluginName(), e.getMessage(), e);
            }
        }
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.seatunnel.engine.server.task.flow.OneInputFlowLifeCycle
    public void received(Record<?> record) {
        if (record.getData() instanceof Barrier) {
            CheckpointBarrier checkpointBarrier = (CheckpointBarrier) record.getData();
            if (checkpointBarrier.prepareClose()) {
                this.prepareClose = true;
            }
            if (checkpointBarrier.snapshot()) {
                this.runningTask.addState(checkpointBarrier, ActionStateKey.of(this.action), Collections.emptyList());
            }
            this.runningTask.ack(checkpointBarrier);
            this.collector.collect(record);
            return;
        }
        if (this.prepareClose.booleanValue()) {
            return;
        }
        Object data = record.getData();
        Object obj = data;
        Iterator<SeaTunnelTransform<T>> it = this.transform.iterator();
        while (true) {
            if (!it.hasNext()) {
                break;
            }
            SeaTunnelTransform<T> next = it.next();
            obj = next.map(data);
            log.debug("Transform[{}] input row {} and output row {}", next, data, obj);
            if (obj == null) {
                log.trace("Transform[{}] filtered data row {}", next, data);
                break;
            }
            data = obj;
        }
        if (obj != null) {
            this.collector.collect(new Record<>(obj));
        }
    }

    @Override // org.apache.seatunnel.engine.server.checkpoint.Stateful
    public void restoreState(List<ActionSubtaskState> list) throws Exception {
    }

    @Override // org.apache.seatunnel.engine.server.task.flow.AbstractFlowLifeCycle, org.apache.seatunnel.engine.server.task.flow.FlowLifeCycle
    public void close() throws IOException {
        for (SeaTunnelTransform<T> seaTunnelTransform : this.transform) {
            try {
                seaTunnelTransform.close();
            } catch (Exception e) {
                log.error("Close transform: {} failed, cause: {}", seaTunnelTransform.getPluginName(), e.getMessage(), e);
            }
        }
        super.close();
    }
}
