package org.springframework.integration.ip.udp;

import java.io.IOException;
import java.net.DatagramPacket;
import java.net.DatagramSocket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.integration.ip.AbstractInternetProtocolReceivingChannelAdapter;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.MessagingException;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-ip-6.0.6.jar:org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.class */
public class UnicastReceivingChannelAdapter extends AbstractInternetProtocolReceivingChannelAdapter {
    private static final Pattern ADDRESS_PATTERN = Pattern.compile("([^:]*):([0-9]*)");
    private final DatagramPacketMessageMapper mapper;
    private DatagramSocket socket;
    private boolean socketExplicitlySet;
    private int soSendBufferSize;
    private SocketCustomizer socketCustomizer;

    public UnicastReceivingChannelAdapter(int i) {
        super(i);
        this.mapper = new DatagramPacketMessageMapper();
        this.soSendBufferSize = -1;
        this.socketCustomizer = datagramSocket -> {
        };
        this.mapper.setLengthCheck(false);
    }

    public UnicastReceivingChannelAdapter(int i, boolean z) {
        super(i);
        this.mapper = new DatagramPacketMessageMapper();
        this.soSendBufferSize = -1;
        this.socketCustomizer = datagramSocket -> {
        };
        this.mapper.setLengthCheck(z);
    }

    public void setLengthCheck(boolean z) {
        this.mapper.setLengthCheck(z);
    }

    public void setSocketCustomizer(SocketCustomizer socketCustomizer) {
        Assert.notNull(socketCustomizer, "'socketCustomizer' cannot be null");
        this.socketCustomizer = socketCustomizer;
    }

    @Override // org.springframework.scheduling.SchedulingAwareRunnable
    public boolean isLongLived() {
        return true;
    }

    @Override // org.springframework.integration.ip.AbstractInternetProtocolReceivingChannelAdapter
    public int getPort() {
        return this.socket == null ? super.getPort() : this.socket.getLocalPort();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint, org.springframework.integration.context.IntegrationObjectSupport
    public void onInit() {
        super.onInit();
        this.mapper.setBeanFactory(getBeanFactory());
    }

    @Override // java.lang.Runnable
    public void run() {
        getSocket();
        ApplicationEventPublisher applicationEventPublisher = getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent) new UdpServerListeningEvent(this, getPort()));
        }
        this.logger.debug(() -> {
            return "UDP Receiver running on port: " + getPort();
        });
        setListening(true);
        while (isActive()) {
            try {
                asyncSendMessage(receive());
            } catch (SocketException e) {
                stop();
            } catch (SocketTimeoutException e2) {
            } catch (Exception e3) {
                if (!(e3 instanceof MessagingException)) {
                    throw new MessagingException("failed to receive DatagramPacket", e3);
                }
                throw ((MessagingException) e3);
            }
        }
        setListening(false);
    }

    protected void sendAck(Message<byte[]> message) {
        MessageHeaders headers = message.getHeaders();
        Object obj = headers.get(IpHeaders.ACK_ID);
        if (obj == null) {
            this.logger.error(() -> {
                return "No ip_ackId header; cannot send ack";
            });
            return;
        }
        byte[] bytes = obj.toString().getBytes();
        String trim = ((String) headers.get(IpHeaders.ACK_ADDRESS, String.class)).trim();
        Matcher matcher = ADDRESS_PATTERN.matcher(trim);
        if (!matcher.matches()) {
            throw new MessagingException((Message<?>) message, "Ack requested but could not decode acknowledgment address: " + trim);
        }
        InetSocketAddress inetSocketAddress = new InetSocketAddress(matcher.group(1), Integer.parseInt(matcher.group(2)));
        this.logger.debug(() -> {
            return "Sending ack for " + obj + " to " + trim;
        });
        try {
            DatagramPacket datagramPacket = new DatagramPacket(bytes, bytes.length, inetSocketAddress);
            DatagramSocket datagramSocket = new DatagramSocket();
            if (this.soSendBufferSize > 0) {
                datagramSocket.setSendBufferSize(this.soSendBufferSize);
            }
            this.socketCustomizer.configure(datagramSocket);
            datagramSocket.send(datagramPacket);
            datagramSocket.close();
        } catch (IOException e) {
            throw new MessagingException(message, "Failed to send acknowledgment to: " + trim, e);
        }
    }

    protected boolean asyncSendMessage(DatagramPacket datagramPacket) {
        Executor taskExecutor = getTaskExecutor();
        if (taskExecutor == null) {
            return true;
        }
        try {
            taskExecutor.execute(() -> {
                doSend(datagramPacket);
            });
            return true;
        } catch (RejectedExecutionException e) {
            this.logger.debug("Adapter stopped, sending on main thread");
            doSend(datagramPacket);
            return true;
        }
    }

    protected void doSend(DatagramPacket datagramPacket) {
        Message<byte[]> message = null;
        try {
            message = this.mapper.toMessage(datagramPacket);
            this.logger.debug(() -> {
                return "Received: " + message;
            });
        } catch (Exception e) {
            this.logger.error(e, "Failed to map packet to message ");
        }
        if (message != null) {
            if (message.getHeaders().containsKey(IpHeaders.ACK_ADDRESS)) {
                sendAck(message);
            }
            try {
                sendMessage(message);
            } catch (Exception e2) {
                this.logger.error(e2, "Failed to send message " + message);
            }
        }
    }

    protected DatagramPacket receive() throws IOException {
        byte[] bArr = new byte[getReceiveBufferSize()];
        DatagramPacket datagramPacket = new DatagramPacket(bArr, bArr.length);
        getSocket().receive(datagramPacket);
        return datagramPacket;
    }

    public void setSocket(DatagramSocket datagramSocket) {
        this.socket = datagramSocket;
        this.socketExplicitlySet = true;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Nullable
    public DatagramSocket getTheSocket() {
        return this.socket;
    }

    public synchronized DatagramSocket getSocket() {
        DatagramSocket datagramSocket;
        if (this.socket == null) {
            try {
                String localAddress = getLocalAddress();
                int port = super.getPort();
                if (localAddress == null) {
                    datagramSocket = port == 0 ? new DatagramSocket() : new DatagramSocket(port);
                } else {
                    datagramSocket = new DatagramSocket(new InetSocketAddress(InetAddress.getByName(localAddress), port));
                }
                setSocketAttributes(datagramSocket);
                this.socket = datagramSocket;
            } catch (IOException e) {
                throw new MessagingException("failed to create DatagramSocket", e);
            }
        }
        return this.socket;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void setSocketAttributes(DatagramSocket datagramSocket) throws SocketException {
        datagramSocket.setSoTimeout(getSoTimeout());
        int soReceiveBufferSize = getSoReceiveBufferSize();
        if (soReceiveBufferSize > 0) {
            datagramSocket.setReceiveBufferSize(soReceiveBufferSize);
        }
        this.socketCustomizer.configure(datagramSocket);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.ip.AbstractInternetProtocolReceivingChannelAdapter, org.springframework.integration.endpoint.MessageProducerSupport, org.springframework.integration.endpoint.AbstractEndpoint
    public void doStop() {
        super.doStop();
        try {
            DatagramSocket datagramSocket = this.socket;
            if (!this.socketExplicitlySet) {
                this.socket = null;
            }
            datagramSocket.close();
        } catch (Exception e) {
        }
    }

    @Override // org.springframework.integration.ip.CommonSocketOptions
    public void setSoSendBufferSize(int i) {
        this.soSendBufferSize = i;
    }

    public void setLookupHost(boolean z) {
        this.mapper.setLookupHost(z);
    }

    @Override // org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "ip:udp-inbound-channel-adapter";
    }
}
