package io.confluent.ksql.execution.pull;

import com.google.common.collect.ImmutableList;
import io.confluent.ksql.rest.entity.ConsistencyToken;
import io.confluent.ksql.rest.entity.KsqlErrorMessage;
import io.confluent.ksql.rest.entity.KsqlHostInfoEntity;
import io.confluent.ksql.rest.entity.StreamedRow;
import io.confluent.ksql.schema.ksql.LogicalSchema;
import io.confluent.ksql.util.ConsistencyOffsetVector;
import io.confluent.ksql.util.KsqlException;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.function.Function;

/* loaded from: input_file:io/confluent/ksql/execution/pull/StreamedRowTranslator.class */
public class StreamedRowTranslator implements Function<List<StreamedRow>, List<PullQueryRow>> {
    private final LogicalSchema expectedSchema;
    private final Optional<ConsistencyOffsetVector> offsetVector;

    public StreamedRowTranslator(LogicalSchema logicalSchema, Optional<ConsistencyOffsetVector> optional) {
        this.expectedSchema = logicalSchema;
        this.offsetVector = optional;
    }

    @Override // java.util.function.Function
    public List<PullQueryRow> apply(List<StreamedRow> list) {
        if (list == null || list.isEmpty()) {
            return ImmutableList.of();
        }
        ArrayList arrayList = new ArrayList(list.size());
        for (int i = 0; i < list.size(); i++) {
            StreamedRow streamedRow = list.get(i);
            if (streamedRow.getHeader().isPresent()) {
                handleHeader(streamedRow);
            } else {
                if (streamedRow.getErrorMessage().isPresent()) {
                    throw new KsqlException(((KsqlErrorMessage) streamedRow.getErrorMessage().get()).getMessage());
                }
                if (streamedRow.getRow().isPresent()) {
                    arrayList.add(new PullQueryRow(((StreamedRow.DataRow) streamedRow.getRow().get()).getColumns(), this.expectedSchema, streamedRow.getSourceHost(), Optional.empty()));
                } else {
                    handleNonDataRows(streamedRow, i, this.offsetVector);
                }
            }
        }
        return arrayList;
    }

    private void handleHeader(StreamedRow streamedRow) {
        streamedRow.getHeader().ifPresent(header -> {
            validateSchema(this.expectedSchema, header.getSchema(), streamedRow.getSourceHost());
        });
    }

    private static void handleNonDataRows(StreamedRow streamedRow, int i, Optional<ConsistencyOffsetVector> optional) {
        if (!streamedRow.getConsistencyToken().isPresent()) {
            if (!streamedRow.getFinalMessage().isPresent()) {
                throw new KsqlException("Missing row data on row " + i + " of chunk");
            }
        } else if (optional.isPresent()) {
            optional.get().merge(ConsistencyOffsetVector.deserialize(((ConsistencyToken) streamedRow.getConsistencyToken().get()).getConsistencyToken()));
        }
    }

    private void validateSchema(LogicalSchema logicalSchema, LogicalSchema logicalSchema2, Optional<KsqlHostInfoEntity> optional) {
        if (!logicalSchema2.equals(logicalSchema)) {
            throw new KsqlException(String.format("Schemas %s from host %s differs from schema %s", logicalSchema2, optional.map((v0) -> {
                return v0.getHost();
            }).orElse("unknown"), logicalSchema));
        }
    }
}
