package org.springframework.integration.ip.tcp;

import java.io.IOException;
import java.time.Instant;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationEventPublisher;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.Expression;
import org.springframework.expression.spel.support.StandardEvaluationContext;
import org.springframework.integration.MessageTimeoutException;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.expression.ExpressionUtils;
import org.springframework.integration.expression.ValueExpression;
import org.springframework.integration.handler.AbstractReplyProducingMessageHandler;
import org.springframework.integration.ip.IpHeaders;
import org.springframework.integration.ip.tcp.connection.AbstractClientConnectionFactory;
import org.springframework.integration.ip.tcp.connection.AbstractConnectionFactory;
import org.springframework.integration.ip.tcp.connection.TcpConnection;
import org.springframework.integration.ip.tcp.connection.TcpConnectionFailedCorrelationEvent;
import org.springframework.integration.ip.tcp.connection.TcpConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpListener;
import org.springframework.integration.ip.tcp.connection.TcpNioConnectionSupport;
import org.springframework.integration.ip.tcp.connection.TcpSender;
import org.springframework.integration.support.management.ManageableLifecycle;
import org.springframework.integration.support.utils.IntegrationUtils;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandlingException;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.support.ErrorMessage;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-ip-6.4.1.jar:org/springframework/integration/ip/tcp/TcpOutboundGateway.class */
public class TcpOutboundGateway extends AbstractReplyProducingMessageHandler implements TcpSender, TcpListener, ManageableLifecycle {
    private static final long DEFAULT_REMOTE_TIMEOUT = 10000;
    private static final int DEFAULT_SECOND_CHANCE_DELAY = 2;
    private AbstractClientConnectionFactory connectionFactory;
    private boolean isSingleUse;
    private boolean evaluationContextSet;
    private boolean closeStreamAfterSend;
    private String unsolicitedMessageChannelName;
    private MessageChannel unsolicitedMessageChannel;
    private final Map<String, AsyncReply> pendingReplies = new ConcurrentHashMap();
    private final Semaphore semaphore = new Semaphore(1, true);
    private Expression remoteTimeoutExpression = new ValueExpression(10000L);
    private long requestTimeout = 10000;
    private EvaluationContext evaluationContext = new StandardEvaluationContext();
    private int secondChanceDelay = 2;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:BOOT-INF/lib/spring-integration-ip-6.4.1.jar:org/springframework/integration/ip/tcp/TcpOutboundGateway$AsyncReply.class */
    public final class AsyncReply {
        private final long remoteTimeout;
        private final TcpConnection connection;
        private final boolean haveSemaphore;
        private final ScheduledFuture<?> noResponseFuture;
        private volatile Message<?> reply;
        private final CompletableFuture<Message<?>> future = new CompletableFuture().thenApply(this::cancelNoResponseFutureIfAny);
        private final CountDownLatch latch = new CountDownLatch(1);
        private final CountDownLatch secondChanceLatch = new CountDownLatch(1);

        AsyncReply(long j, TcpConnection tcpConnection, boolean z, Message<?> message, boolean z2) {
            this.remoteTimeout = j;
            this.connection = tcpConnection;
            this.haveSemaphore = z;
            if (!z2 || j <= 0) {
                this.noResponseFuture = null;
            } else {
                this.noResponseFuture = TcpOutboundGateway.this.getTaskScheduler().schedule(() -> {
                    if (this.future.completeExceptionally(new MessageTimeoutException(message, "Timed out waiting for response"))) {
                        TcpOutboundGateway.this.cleanUp(this.haveSemaphore, this.connection, this.connection.getConnectionId());
                    }
                }, Instant.now().plusMillis(j));
            }
        }

        private Message<?> cancelNoResponseFutureIfAny(Message<?> message) {
            if (this.noResponseFuture != null) {
                this.noResponseFuture.cancel(true);
            }
            return message;
        }

        TcpConnection getConnection() {
            return this.connection;
        }

        boolean isHaveSemaphore() {
            return this.haveSemaphore;
        }

        Message<?> getReply() {
            try {
                if (!this.latch.await(this.remoteTimeout, TimeUnit.MILLISECONDS)) {
                    return null;
                }
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            boolean z = true;
            while (this.reply instanceof ErrorMessage) {
                if (z) {
                    TcpOutboundGateway.this.logger.debug("second chance");
                    try {
                        this.secondChanceLatch.await(TcpOutboundGateway.this.secondChanceDelay, TimeUnit.SECONDS);
                    } catch (InterruptedException e2) {
                        Thread.currentThread().interrupt();
                        doThrowErrorMessagePayload();
                    }
                    z = false;
                } else {
                    doThrowErrorMessagePayload();
                }
            }
            return this.reply;
        }

        CompletableFuture<Message<?>> getFuture() {
            return this.future;
        }

        private void doThrowErrorMessagePayload() {
            if (!(this.reply.getPayload() instanceof MessagingException)) {
                throw new MessagingException("Exception while awaiting reply", (Throwable) this.reply.getPayload());
            }
            throw ((MessagingException) this.reply.getPayload());
        }

        public void setReply(Message<?> message) {
            if (this.reply == null) {
                this.reply = message;
                this.latch.countDown();
            } else if (this.reply instanceof ErrorMessage) {
                this.reply = message;
                this.secondChanceLatch.countDown();
            }
        }
    }

    public void setConnectionFactory(AbstractClientConnectionFactory abstractClientConnectionFactory) {
        this.connectionFactory = abstractClientConnectionFactory;
        abstractClientConnectionFactory.registerListener(this);
        abstractClientConnectionFactory.registerSender(this);
        this.isSingleUse = abstractClientConnectionFactory.isSingleUse();
    }

    public void setRequestTimeout(long j) {
        this.requestTimeout = j;
    }

    public void setRemoteTimeout(long j) {
        this.remoteTimeoutExpression = new ValueExpression(Long.valueOf(j));
    }

    public void setRemoteTimeoutExpression(Expression expression) {
        this.remoteTimeoutExpression = expression;
    }

    public void setIntegrationEvaluationContext(EvaluationContext evaluationContext) {
        Assert.notNull(evaluationContext, "'evaluationContext' cannot be null");
        this.evaluationContext = evaluationContext;
        this.evaluationContextSet = true;
    }

    public void setReplyChannel(MessageChannel messageChannel) {
        setOutputChannel(messageChannel);
    }

    public void setReplyChannelName(String str) {
        setOutputChannelName(str);
    }

    public void setUnsolicitedMessageChannelName(String str) {
        this.unsolicitedMessageChannelName = str;
    }

    public void setUnsolicitedMessageChannel(MessageChannel messageChannel) {
        this.unsolicitedMessageChannel = messageChannel;
    }

    public void setCloseStreamAfterSend(boolean z) {
        this.closeStreamAfterSend = z;
    }

    public void setSecondChanceDelay(int i) {
        this.secondChanceDelay = i;
    }

    @Override // org.springframework.integration.handler.MessageHandlerSupport, org.springframework.integration.context.IntegrationObjectSupport, org.springframework.integration.support.context.NamedComponent
    public String getComponentType() {
        return "ip:tcp-outbound-gateway";
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    public void doInit() {
        super.doInit();
        if (!this.evaluationContextSet) {
            this.evaluationContext = ExpressionUtils.createStandardEvaluationContext(getBeanFactory());
        }
        Assert.state(!this.closeStreamAfterSend || this.isSingleUse, "Single use connection needed with closeStreamAfterSend");
    }

    @Override // org.springframework.integration.handler.AbstractReplyProducingMessageHandler
    protected Object handleRequestMessage(Message<?> message) {
        Assert.notNull(this.connectionFactory, (Supplier<String>) () -> {
            return getClass().getName() + " requires a client connection factory";
        });
        boolean isAsync = isAsync();
        try {
            try {
                try {
                    boolean acquireSemaphoreIfNeeded = acquireSemaphoreIfNeeded(message);
                    TcpConnectionSupport connection = this.connectionFactory.getConnection();
                    checkAsync(connection, isAsync);
                    AsyncReply asyncReply = new AsyncReply(getRemoteTimeout(message).longValue(), connection, acquireSemaphoreIfNeeded, message, isAsync);
                    String connectionId = connection.getConnectionId();
                    this.pendingReplies.put(connectionId, asyncReply);
                    this.logger.debug(() -> {
                        return "Added pending reply " + connectionId;
                    });
                    try {
                        connection.send(message);
                        if (this.closeStreamAfterSend) {
                            connection.shutdownOutput();
                        }
                        if (isAsync) {
                            CompletableFuture<Message<?>> future = asyncReply.getFuture();
                            if (!isAsync) {
                                cleanUp(acquireSemaphoreIfNeeded, connection, connectionId);
                            }
                            return future;
                        }
                        Message<?> reply = getReply(message, connection, connectionId, asyncReply);
                        if (!isAsync) {
                            cleanUp(acquireSemaphoreIfNeeded, connection, connectionId);
                        }
                        return reply;
                    } catch (Exception e) {
                        if (isAsync) {
                            cleanUp(acquireSemaphoreIfNeeded, connection, connectionId);
                        }
                        throw e;
                    }
                } catch (IOException | RuntimeException e2) {
                    this.logger.error(e2, "Tcp Gateway exception");
                    throw IntegrationUtils.wrapInHandlingExceptionIfNecessary(message, () -> {
                        return "Failed to send or receive";
                    }, e2);
                }
            } catch (InterruptedException e3) {
                Thread.currentThread().interrupt();
                throw new MessageHandlingException(message, "Interrupted in the [" + this + "]", e3);
            }
        } catch (Throwable th) {
            if (!isAsync) {
                cleanUp(false, null, null);
            }
            throw th;
        }
    }

    private void checkAsync(TcpConnection tcpConnection, boolean z) {
        if (z && (tcpConnection instanceof TcpNioConnectionSupport)) {
            setAsync(false);
            this.logger.warn("Async replies are not supported with NIO; see the reference manual");
        }
    }

    private boolean acquireSemaphoreIfNeeded(Message<?> message) throws InterruptedException {
        if (this.isSingleUse) {
            return false;
        }
        this.logger.debug("trying semaphore");
        if (!this.semaphore.tryAcquire(this.requestTimeout, TimeUnit.MILLISECONDS)) {
            throw new MessageTimeoutException(message, "Timed out waiting for connection");
        }
        this.logger.debug("got semaphore");
        return true;
    }

    private Long getRemoteTimeout(Message<?> message) {
        Long l = (Long) this.remoteTimeoutExpression.getValue(this.evaluationContext, message, Long.class);
        if (l == null) {
            l = 10000L;
            this.logger.warn(() -> {
                return "remoteTimeoutExpression evaluated to null; falling back to default for message " + message;
            });
        }
        return l;
    }

    private Message<?> getReply(Message<?> message, TcpConnection tcpConnection, String str, AsyncReply asyncReply) {
        Message<?> reply = asyncReply.getReply();
        if (reply != null) {
            this.logger.debug(() -> {
                return "Response " + reply;
            });
            return reply;
        }
        this.logger.debug(() -> {
            return "Remote Timeout on " + str;
        });
        this.connectionFactory.forceClose(tcpConnection);
        String componentName = getComponentName();
        throw new MessageTimeoutException(message, "Timed out waiting for response" + (componentName == null ? "" : "; component: " + componentName));
    }

    private void cleanUp(boolean z, TcpConnection tcpConnection, String str) {
        if (str != null) {
            this.pendingReplies.remove(str);
            this.logger.debug(() -> {
                return "Removed pending reply " + str;
            });
            if (this.isSingleUse) {
                tcpConnection.close();
            }
        }
        if (z) {
            this.semaphore.release();
            this.logger.debug("released semaphore");
        }
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpListener
    public boolean onMessage(Message<?> message) {
        String str = (String) message.getHeaders().get(IpHeaders.CONNECTION_ID, String.class);
        if (str == null) {
            if (unsolicitedSupported(message)) {
                return false;
            }
            this.logger.error("Cannot correlate response - no connection id");
            publishNoConnectionEvent(message, null, "Cannot correlate response - no connection id");
            return false;
        }
        this.logger.trace(() -> {
            return "onMessage: " + str + "(" + message + ")";
        });
        AsyncReply asyncReply = this.pendingReplies.get(str);
        if (asyncReply != null) {
            if (!isAsync()) {
                asyncReply.setReply(message);
                return false;
            }
            asyncReply.getFuture().complete(message);
            cleanUp(asyncReply.isHaveSemaphore(), asyncReply.getConnection(), str);
            return false;
        }
        if ((message instanceof ErrorMessage) || unsolicitedSupported(message)) {
            return false;
        }
        String str2 = "Cannot correlate response - no pending reply for " + str;
        this.logger.error(str2);
        publishNoConnectionEvent(message, str, str2);
        return false;
    }

    private boolean unsolicitedSupported(Message<?> message) {
        String str = this.unsolicitedMessageChannelName;
        if (str != null) {
            this.unsolicitedMessageChannel = getChannelResolver().resolveDestination(str);
            this.unsolicitedMessageChannelName = null;
        }
        if (this.unsolicitedMessageChannel == null) {
            return false;
        }
        try {
            this.messagingTemplate.send((MessagingTemplate) this.unsolicitedMessageChannel, message);
            return true;
        } catch (Exception e) {
            this.logger.error(e, "Failed to send unsolicited message " + message);
            return true;
        }
    }

    private void publishNoConnectionEvent(Message<?> message, String str, String str2) {
        ApplicationEventPublisher applicationEventPublisher = this.connectionFactory.getApplicationEventPublisher();
        if (applicationEventPublisher != null) {
            applicationEventPublisher.publishEvent((ApplicationEvent) new TcpConnectionFailedCorrelationEvent(this, str, new MessagingException(message, str2)));
        }
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpSender
    public void addNewConnection(TcpConnection tcpConnection) {
    }

    @Override // org.springframework.integration.ip.tcp.connection.TcpSender
    public void removeDeadConnection(TcpConnection tcpConnection) {
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public void start() {
        this.connectionFactory.start();
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public void stop() {
        this.connectionFactory.stop();
    }

    @Override // org.springframework.integration.support.management.ManageableLifecycle, org.springframework.context.Lifecycle
    public boolean isRunning() {
        return this.connectionFactory.isRunning();
    }

    protected AbstractConnectionFactory getConnectionFactory() {
        return this.connectionFactory;
    }
}
