package org.apache.nifi.processor.util.listen.dispatcher;

import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.SocketOption;
import java.net.StandardSocketOptions;
import java.nio.ByteBuffer;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.Iterator;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.io.IOUtils;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.util.listen.event.Event;
import org.apache.nifi.processor.util.listen.event.EventFactory;
import org.apache.nifi.processor.util.listen.event.EventFactoryUtil;
import org.apache.nifi.processor.util.listen.event.EventQueue;

/* loaded from: input_file:org/apache/nifi/processor/util/listen/dispatcher/DatagramChannelDispatcher.class */
public class DatagramChannelDispatcher<E extends Event<DatagramChannel>> implements ChannelDispatcher {
    private final EventFactory<E> eventFactory;
    private final ByteBufferSource bufferSource;
    private final EventQueue<E> events;
    private final ComponentLog logger;
    private final String sendingHost;
    private final Integer sendingPort;
    private Selector selector;
    private DatagramChannel datagramChannel;
    private volatile boolean stopped;

    public DatagramChannelDispatcher(EventFactory<E> eventFactory, ByteBufferSource byteBufferSource, BlockingQueue<E> blockingQueue, ComponentLog componentLog) {
        this(eventFactory, byteBufferSource, blockingQueue, componentLog, null, null);
    }

    public DatagramChannelDispatcher(EventFactory<E> eventFactory, ByteBufferSource byteBufferSource, BlockingQueue<E> blockingQueue, ComponentLog componentLog, String str, Integer num) {
        this.stopped = false;
        this.eventFactory = eventFactory;
        this.bufferSource = byteBufferSource;
        this.logger = componentLog;
        this.sendingHost = str;
        this.sendingPort = num;
        this.events = new EventQueue<>(blockingQueue, componentLog);
    }

    @Override // org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher
    public void open(InetAddress inetAddress, int i, int i2) throws IOException {
        this.stopped = false;
        this.datagramChannel = DatagramChannel.open();
        this.datagramChannel.configureBlocking(false);
        if (i2 > 0) {
            this.datagramChannel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_RCVBUF, (SocketOption) Integer.valueOf(i2));
            int intValue = ((Integer) this.datagramChannel.getOption(StandardSocketOptions.SO_RCVBUF)).intValue();
            if (intValue < i2) {
                this.logger.warn("Attempted to set Socket Buffer Size to " + i2 + " bytes but could only set to " + intValue + "bytes. You may want to consider changing the Operating System's maximum receive buffer");
            }
        }
        this.datagramChannel.setOption((SocketOption<SocketOption>) StandardSocketOptions.SO_REUSEADDR, (SocketOption) true);
        this.datagramChannel.socket().bind(new InetSocketAddress(inetAddress, i));
        if (this.sendingHost != null && this.sendingPort != null) {
            this.datagramChannel.connect(new InetSocketAddress(this.sendingHost, this.sendingPort.intValue()));
        }
        this.selector = Selector.open();
        this.datagramChannel.register(this.selector, 1);
    }

    @Override // java.lang.Runnable
    public void run() {
        SocketAddress receive;
        ByteBuffer acquire = this.bufferSource.acquire();
        while (!this.stopped) {
            try {
                if (this.selector.select() > 0 && !this.stopped) {
                    Iterator<SelectionKey> it = this.selector.selectedKeys().iterator();
                    while (it.hasNext() && !this.stopped) {
                        SelectionKey next = it.next();
                        it.remove();
                        if (next.isValid()) {
                            DatagramChannel datagramChannel = (DatagramChannel) next.channel();
                            acquire.clear();
                            while (!this.stopped && (receive = datagramChannel.receive(acquire)) != null) {
                                String str = "";
                                if (receive instanceof InetSocketAddress) {
                                    str = ((InetSocketAddress) receive).getAddress().toString();
                                }
                                acquire.flip();
                                byte[] bArr = new byte[acquire.limit()];
                                acquire.get(bArr, 0, acquire.limit());
                                this.events.offer(this.eventFactory.create(bArr, EventFactoryUtil.createMapWithSender(str), null));
                                acquire.clear();
                            }
                        }
                    }
                }
            } catch (IOException e) {
                this.logger.error("Error reading from DatagramChannel", e);
            } catch (InterruptedException e2) {
                this.stopped = true;
                Thread.currentThread().interrupt();
            }
        }
        if (acquire != null) {
            this.bufferSource.release(acquire);
        }
    }

    @Override // org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher
    public int getPort() {
        if (this.datagramChannel == null) {
            return 0;
        }
        return this.datagramChannel.socket().getLocalPort();
    }

    @Override // org.apache.nifi.processor.util.listen.dispatcher.ChannelDispatcher
    public void close() {
        this.stopped = true;
        if (this.selector != null) {
            this.selector.wakeup();
        }
        IOUtils.closeQuietly(this.selector);
        IOUtils.closeQuietly(this.datagramChannel);
    }
}
