package org.apache.nifi.processor.util.listen;

import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.remote.io.socket.NetworkUtils;

/* loaded from: input_file:org/apache/nifi/processor/util/listen/AbstractListenEventProcessor.class */
public abstract class AbstractListenEventProcessor<E extends Event> extends AbstractProcessor {
    public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder().name("Port").description("The port to listen on for communication.").required(true).expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY).addValidator(StandardValidators.PORT_VALIDATOR).build();
    public static final PropertyDescriptor CHARSET = new PropertyDescriptor.Builder().name("Character Set").description("Specifies the character set of the received data.").required(true).defaultValue("UTF-8").addValidator(StandardValidators.CHARACTER_SET_VALIDATOR).build();
    public static final PropertyDescriptor RECV_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Receive Buffer Size").description("The size of each buffer used to receive messages. Adjust this value appropriately based on the expected size of the incoming messages.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("65507 B").required(true).build();
    public static final PropertyDescriptor MAX_SOCKET_BUFFER_SIZE = new PropertyDescriptor.Builder().name("Max Size of Socket Buffer").description("The maximum size of the socket buffer that should be used. This is a suggestion to the Operating System to indicate how big the socket buffer should be. If this value is set too low, the buffer may fill up before the data can be read, and incoming data will be dropped.").addValidator(StandardValidators.DATA_SIZE_VALIDATOR).defaultValue("1 MB").required(true).build();
    public static final PropertyDescriptor MAX_MESSAGE_QUEUE_SIZE = new PropertyDescriptor.Builder().name("Max Size of Message Queue").description("The maximum size of the internal queue used to buffer messages being transferred from the underlying channel to the processor. Setting this value higher allows more messages to be buffered in memory during surges of incoming messages, but increases the total memory used by the processor.").addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR).defaultValue("10000").required(true).build();
    public static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder().name("Max Number of TCP Connections").description("The maximum number of concurrent TCP connections to accept.").addValidator(StandardValidators.createLongValidator(1, 65535, true)).defaultValue("2").required(true).build();
    public static final Relationship REL_SUCCESS = new Relationship.Builder().name("success").description("Messages received successfully will be sent out this relationship.").build();
    public static final int POLL_TIMEOUT_MS = 20;
    protected Set<Relationship> relationships;
    protected List<PropertyDescriptor> descriptors;
    protected volatile int port;
    protected volatile Charset charset;
    protected volatile ChannelDispatcher dispatcher;
    protected volatile BlockingQueue<E> events;
    protected volatile BlockingQueue<E> errorEvents = new LinkedBlockingQueue();

    protected void init(ProcessorInitializationContext processorInitializationContext) {
        ArrayList arrayList = new ArrayList();
        arrayList.add(ListenerProperties.NETWORK_INTF_NAME);
        arrayList.add(PORT);
        arrayList.add(RECV_BUFFER_SIZE);
        arrayList.add(MAX_MESSAGE_QUEUE_SIZE);
        arrayList.add(MAX_SOCKET_BUFFER_SIZE);
        arrayList.add(CHARSET);
        arrayList.addAll(getAdditionalProperties());
        this.descriptors = Collections.unmodifiableList(arrayList);
        HashSet hashSet = new HashSet();
        hashSet.add(REL_SUCCESS);
        hashSet.addAll(getAdditionalRelationships());
        this.relationships = Collections.unmodifiableSet(hashSet);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<Relationship> getAdditionalRelationships() {
        return Collections.EMPTY_LIST;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public List<PropertyDescriptor> getAdditionalProperties() {
        return Collections.EMPTY_LIST;
    }

    public final Set<Relationship> getRelationships() {
        return this.relationships;
    }

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return this.descriptors;
    }

    @OnScheduled
    public void onScheduled(ProcessContext processContext) throws IOException {
        this.charset = Charset.forName(processContext.getProperty(CHARSET).getValue());
        this.port = processContext.getProperty(PORT).evaluateAttributeExpressions().asInteger().intValue();
        this.events = new LinkedBlockingQueue(processContext.getProperty(MAX_MESSAGE_QUEUE_SIZE).asInteger().intValue());
        InetAddress interfaceAddress = NetworkUtils.getInterfaceAddress(processContext.getProperty(ListenerProperties.NETWORK_INTF_NAME).evaluateAttributeExpressions().getValue());
        int intValue = processContext.getProperty(MAX_SOCKET_BUFFER_SIZE).asDataSize(DataUnit.B).intValue();
        this.dispatcher = createDispatcher(processContext, this.events);
        this.dispatcher.open(interfaceAddress, this.port, intValue);
        Thread thread = new Thread(this.dispatcher);
        thread.setName(getClass().getName() + " [" + getIdentifier() + "]");
        thread.setDaemon(true);
        thread.start();
    }

    protected abstract ChannelDispatcher createDispatcher(ProcessContext processContext, BlockingQueue<E> blockingQueue) throws IOException;

    public final int getDispatcherPort() {
        if (this.dispatcher == null) {
            return 0;
        }
        return this.dispatcher.getPort();
    }

    public int getErrorQueueSize() {
        return this.errorEvents.size();
    }

    public int getQueueSize() {
        if (this.events == null) {
            return 0;
        }
        return this.events.size();
    }

    @OnStopped
    public void closeDispatcher() {
        if (this.dispatcher != null) {
            this.dispatcher.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public E getMessage(boolean z, boolean z2, ProcessSession processSession) {
        E e = null;
        if (z2) {
            e = this.errorEvents.poll();
        }
        if (e != null) {
            return e;
        }
        try {
            E poll = z ? this.events.poll(getLongPollTimeout(), TimeUnit.MILLISECONDS) : this.events.poll();
            if (poll != null) {
                processSession.adjustCounter("Messages Received", 1L, false);
            }
            return poll;
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    protected long getLongPollTimeout() {
        return 20L;
    }
}
