/*
 * Decompiled with CFR 0.152.
 */
package io.confluent.ksql.logging.processing;

import io.confluent.common.logging.StructuredLogger;
import io.confluent.ksql.logging.processing.ProcessingLogConfig;
import io.confluent.ksql.logging.processing.ProcessingLogMessageSchema;
import io.confluent.ksql.logging.processing.ProcessingLogger;
import java.util.function.Function;
import java.util.function.Supplier;
import org.apache.kafka.connect.data.SchemaAndValue;

public class ProcessingLoggerImpl
implements ProcessingLogger {
    private final StructuredLogger inner;
    private final ProcessingLogConfig config;

    public ProcessingLoggerImpl(ProcessingLogConfig config, StructuredLogger inner) {
        this.config = config;
        this.inner = inner;
    }

    @Override
    public void error(Function<ProcessingLogConfig, SchemaAndValue> msgFactory) {
        this.inner.error((Supplier)new ProcessingLogMessage(this.config, msgFactory));
    }

    private static class ProcessingLogMessage
    implements Supplier<SchemaAndValue> {
        final ProcessingLogConfig config;
        final Function<ProcessingLogConfig, SchemaAndValue> msgFactory;

        ProcessingLogMessage(ProcessingLogConfig config, Function<ProcessingLogConfig, SchemaAndValue> msgFactory) {
            this.config = config;
            this.msgFactory = msgFactory;
        }

        @Override
        public SchemaAndValue get() {
            SchemaAndValue msg = this.msgFactory.apply(this.config);
            if (msg.schema().equals(ProcessingLogMessageSchema.PROCESSING_LOG_SCHEMA)) {
                return msg;
            }
            throw new RuntimeException("Received message with invalid schema");
        }
    }
}

