package fish.payara.cloud.connectors.kafka.inbound;

import fish.payara.cloud.connectors.kafka.api.OnRecord;
import fish.payara.cloud.connectors.kafka.api.OnRecords;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.resource.ResourceException;
import javax.resource.spi.UnavailableException;
import javax.resource.spi.endpoint.MessageEndpoint;
import javax.transaction.xa.XAResource;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/* loaded from: input_file:fish/payara/cloud/connectors/kafka/inbound/KafkaSynchWorker.class */
public class KafkaSynchWorker implements KafkaWorker {
    private static final Logger LOGGER = Logger.getLogger(KafkaSynchWorker.class.getName());
    private final EndpointKey key;
    private KafkaConsumer consumer;
    private AtomicBoolean ok = new AtomicBoolean(true);
    private final List<Method> onRecordMethods = new ArrayList();
    private final List<Method> onRecordsMethods = new ArrayList();

    public KafkaSynchWorker(EndpointKey endpointKey) {
        this.key = endpointKey;
        Class endpointClass = endpointKey.getMef().getEndpointClass();
        for (Method method : endpointClass.getMethods()) {
            if (method.isAnnotationPresent(OnRecord.class)) {
                if (method.getParameterCount() == 1 && ConsumerRecord.class.isAssignableFrom(method.getParameterTypes()[0])) {
                    this.onRecordMethods.add(method);
                } else {
                    LOGGER.log(Level.WARNING, "@{0} annotated MDBs must have only one parameter of type {1}. {2}#{3} endpoint will be ignored.", new Object[]{OnRecord.class.getSimpleName(), ConsumerRecord.class.getSimpleName(), endpointClass.getName(), method.getName()});
                }
            }
            if (method.isAnnotationPresent(OnRecords.class)) {
                if (method.getParameterCount() == 1 && ConsumerRecords.class.isAssignableFrom(method.getParameterTypes()[0])) {
                    this.onRecordsMethods.add(method);
                } else {
                    LOGGER.log(Level.WARNING, "@{0} annotated MDBs must have only one parameter of type {1}. {2}#{3} endpoint will be ignored.", new Object[]{OnRecords.class.getSimpleName(), ConsumerRecords.class.getSimpleName(), endpointClass.getName(), method.getName()});
                }
            }
        }
    }

    public void run() {
        try {
            this.consumer = new KafkaConsumer(this.key.getSpec().getConsumerProperties());
            this.consumer.subscribe(Arrays.asList(this.key.getSpec().getTopics().split(",")));
            MessageEndpoint createEndpoint = this.key.getMef().createEndpoint((XAResource) null);
            while (this.ok.get()) {
                ConsumerRecords<Object, Object> poll = this.consumer.poll(Duration.of(this.key.getSpec().getPollInterval().longValue(), ChronoUnit.MILLIS));
                if (!poll.isEmpty()) {
                    for (Method method : this.onRecordsMethods) {
                        OnRecords onRecords = (OnRecords) method.getAnnotation(OnRecords.class);
                        try {
                            deliverRecords(createEndpoint, method, poll);
                        } catch (UnavailableException e) {
                            Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, (String) null, e);
                        }
                        if (!onRecords.matchOtherMethods()) {
                            return;
                        }
                    }
                    Iterator it = poll.iterator();
                    while (it.hasNext()) {
                        ConsumerRecord<Object, Object> consumerRecord = (ConsumerRecord) it.next();
                        for (Method method2 : this.onRecordMethods) {
                            OnRecord onRecord = (OnRecord) method2.getAnnotation(OnRecord.class);
                            if (onRecord.topics().length == 0 || Arrays.binarySearch(onRecord.topics(), consumerRecord.topic()) >= 0) {
                                try {
                                    deliverRecord(createEndpoint, method2, consumerRecord);
                                } catch (UnavailableException e2) {
                                    Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, (String) null, e2);
                                }
                                if (!onRecord.matchOtherMethods()) {
                                    break;
                                }
                            }
                        }
                    }
                    if (this.key.getSpec().getCommitEachPoll().booleanValue()) {
                        this.consumer.commitSync();
                    }
                }
            }
            this.consumer.close();
            createEndpoint.release();
        } catch (UnavailableException e3) {
            Logger.getLogger(KafkaSynchWorker.class.getName()).log(Level.SEVERE, (String) null, e3);
        }
    }

    private void deliverRecords(MessageEndpoint messageEndpoint, Method method, ConsumerRecords<Object, Object> consumerRecords) throws UnavailableException {
        try {
            messageEndpoint.beforeDelivery(method);
            method.invoke(messageEndpoint, consumerRecords);
            messageEndpoint.afterDelivery();
        } catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException | ResourceException e) {
            Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, (String) null, e);
        }
    }

    private void deliverRecord(MessageEndpoint messageEndpoint, Method method, ConsumerRecord<Object, Object> consumerRecord) throws UnavailableException {
        try {
            messageEndpoint.beforeDelivery(method);
            method.invoke(messageEndpoint, consumerRecord);
            messageEndpoint.afterDelivery();
        } catch (IllegalAccessException | IllegalArgumentException | NoSuchMethodException | InvocationTargetException | ResourceException e) {
            Logger.getLogger(KafkaResourceAdapter.class.getName()).log(Level.SEVERE, (String) null, e);
        }
    }

    @Override // fish.payara.cloud.connectors.kafka.inbound.KafkaWorker
    public void stop() {
        this.ok.set(false);
    }

    @Override // fish.payara.cloud.connectors.kafka.inbound.KafkaWorker
    public boolean isStopped() {
        return !this.ok.get();
    }

    public void release() {
        stop();
    }
}
