package org.springframework.integration.util;

import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.locks.LockSupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.reactivestreams.Publisher;
import org.springframework.integration.StaticMessageHeaderAccessor;
import org.springframework.integration.acks.AckUtils;
import org.springframework.integration.core.MessageSource;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;
import reactor.util.retry.Retry;
import reactor.util.retry.RetrySpec;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.0.6.jar:org/springframework/integration/util/IntegrationReactiveUtils.class */
public final class IntegrationReactiveUtils {
    public static final String DELAY_WHEN_EMPTY_KEY = "DELAY_WHEN_EMPTY_KEY";
    private static final Log LOGGER = LogFactory.getLog(IntegrationReactiveUtils.class);
    public static final Duration DEFAULT_DELAY_WHEN_EMPTY = Duration.ofSeconds(1);

    private IntegrationReactiveUtils() {
    }

    public static <T> Flux<Message<T>> messageSourceToFlux(MessageSource<T> messageSource) {
        Flux<T> repeat = Mono.create(monoSink -> {
            monoSink.onRequest(j -> {
                monoSink.success(messageSource.receive());
            });
        }).doOnSuccess(message -> {
            if (message != null) {
                AckUtils.autoAck(StaticMessageHeaderAccessor.getAcknowledgmentCallback(message));
            }
        }).doOnError(MessagingException.class, messagingException -> {
            Message<?> failedMessage = messagingException.getFailedMessage();
            if (failedMessage != null) {
                AckUtils.autoNack(StaticMessageHeaderAccessor.getAcknowledgmentCallback(failedMessage));
            }
            LOGGER.error("Error from Flux for : " + messageSource, messagingException);
        }).subscribeOn(Schedulers.boundedElastic()).repeatWhenEmpty(flux -> {
            return flux.flatMap(l -> {
                return Mono.deferContextual(contextView -> {
                    return Mono.delay((Duration) contextView.getOrDefault(DELAY_WHEN_EMPTY_KEY, DEFAULT_DELAY_WHEN_EMPTY));
                });
            });
        }).repeat();
        RetrySpec indefinitely = Retry.indefinitely();
        Class<MessagingException> cls = MessagingException.class;
        Objects.requireNonNull(MessagingException.class);
        return repeat.retryWhen(indefinitely.filter((v1) -> {
            return r2.isInstance(v1);
        }));
    }

    public static <T> Flux<Message<T>> messageChannelToFlux(MessageChannel messageChannel) {
        if (messageChannel instanceof Publisher) {
            return Flux.from((Publisher) messageChannel);
        }
        if (messageChannel instanceof SubscribableChannel) {
            return adaptSubscribableChannelToPublisher((SubscribableChannel) messageChannel);
        }
        if (messageChannel instanceof PollableChannel) {
            return messageSourceToFlux(() -> {
                return ((PollableChannel) messageChannel).receive(0L);
            });
        }
        throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, SubscribableChannel or PollableChannel, not: " + messageChannel);
    }

    private static <T> Flux<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel subscribableChannel) {
        return Flux.defer(() -> {
            Sinks.Many onBackpressureError = Sinks.many().unicast().onBackpressureError();
            MessageHandler messageHandler = message -> {
                while (true) {
                    switch (onBackpressureError.tryEmitNext(message)) {
                        case FAIL_NON_SERIALIZED:
                        case FAIL_OVERFLOW:
                            LockSupport.parkNanos(1000L);
                        case FAIL_ZERO_SUBSCRIBER:
                            throw new IllegalStateException("The [" + onBackpressureError + "] doesn't have subscribers to accept messages");
                        case FAIL_TERMINATED:
                        case FAIL_CANCELLED:
                            throw new IllegalStateException("Cannot emit messages into the cancelled or terminated sink for message channel: " + subscribableChannel);
                        default:
                            return;
                    }
                }
            };
            subscribableChannel.subscribe(messageHandler);
            return onBackpressureError.asFlux().doOnCancel(() -> {
                subscribableChannel.unsubscribe(messageHandler);
            });
        });
    }
}
