package org.springframework.integration.channel;

import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.springframework.integration.core.MessageSelector;
import org.springframework.integration.support.management.metrics.GaugeFacade;
import org.springframework.integration.support.management.metrics.MetricsCaptor;
import org.springframework.lang.Nullable;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;

/* loaded from: input_file:BOOT-INF/lib/spring-integration-core-6.3.2.jar:org/springframework/integration/channel/QueueChannel.class */
public class QueueChannel extends AbstractPollableChannel implements QueueChannelOperations {
    private final Queue<Message<?>> queue;
    protected final Semaphore queueSemaphore;

    @Nullable
    private GaugeFacade sizeGauge;

    @Nullable
    private GaugeFacade remainingCapacityGauge;

    public QueueChannel(Queue<Message<?>> queue) {
        this.queueSemaphore = new Semaphore(0);
        Assert.notNull(queue, "'queue' must not be null");
        this.queue = queue;
    }

    public QueueChannel(int i) {
        this.queueSemaphore = new Semaphore(0);
        Assert.isTrue(i > 0, "The capacity must be a positive integer. For a zero-capacity alternative, consider using a 'RendezvousChannel'.");
        this.queue = new LinkedBlockingQueue(i);
    }

    public QueueChannel() {
        this(new LinkedBlockingQueue());
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.support.management.IntegrationManagement
    public void registerMetricsCaptor(MetricsCaptor metricsCaptor) {
        super.registerMetricsCaptor(metricsCaptor);
        this.sizeGauge = metricsCaptor.gaugeBuilder("spring.integration.channel.queue.size", this, obj -> {
            return getQueueSize();
        }).tag("name", getComponentName() == null ? "unknown" : getComponentName()).tag("type", "channel").description("The size of the queue channel").build();
        this.remainingCapacityGauge = metricsCaptor.gaugeBuilder("spring.integration.channel.queue.remaining.capacity", this, obj2 -> {
            return getRemainingCapacity();
        }).tag("name", getComponentName() == null ? "unknown" : getComponentName()).tag("type", "channel").description("The remaining capacity of the queue channel").build();
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.channel.AbstractMessageChannel
    public boolean doSend(Message<?> message, long j) {
        Assert.notNull(message, "'message' must not be null");
        try {
            if (!(this.queue instanceof BlockingQueue)) {
                try {
                    boolean offer = this.queue.offer(message);
                    this.queueSemaphore.release();
                    return offer;
                } catch (Throwable th) {
                    this.queueSemaphore.release();
                    throw th;
                }
            }
            BlockingQueue blockingQueue = (BlockingQueue) this.queue;
            if (j > 0) {
                return blockingQueue.offer(message, j, TimeUnit.MILLISECONDS);
            }
            if (j == 0) {
                return blockingQueue.offer(message);
            }
            blockingQueue.put(message);
            return true;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return false;
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.springframework.integration.channel.AbstractPollableChannel
    @Nullable
    public Message<?> doReceive(long j) {
        try {
            if (j > 0) {
                return this.queue instanceof BlockingQueue ? (Message) ((BlockingQueue) this.queue).poll(j, TimeUnit.MILLISECONDS) : pollNonBlockingQueue(j);
            }
            if (j == 0) {
                return this.queue.poll();
            }
            if (this.queue instanceof BlockingQueue) {
                return (Message) ((BlockingQueue) this.queue).take();
            }
            Message<?> poll = this.queue.poll();
            while (poll == null) {
                this.queueSemaphore.tryAcquire(50L, TimeUnit.MILLISECONDS);
                poll = this.queue.poll();
            }
            return poll;
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            return null;
        }
    }

    @Nullable
    private Message<?> pollNonBlockingQueue(long j) throws InterruptedException {
        Message<?> poll = this.queue.poll();
        if (poll == null) {
            long nanos = TimeUnit.MILLISECONDS.toNanos(j);
            long nanoTime = System.nanoTime() + nanos;
            while (poll == null && nanos > 0) {
                this.queueSemaphore.tryAcquire(nanos, TimeUnit.NANOSECONDS);
                poll = this.queue.poll();
                if (poll == null) {
                    nanos = nanoTime - System.nanoTime();
                }
            }
        }
        return poll;
    }

    @Override // org.springframework.integration.channel.QueueChannelOperations
    public List<Message<?>> clear() {
        ArrayList arrayList = new ArrayList();
        if (!(this.queue instanceof BlockingQueue)) {
            while (true) {
                Message<?> poll = this.queue.poll();
                if (poll == null) {
                    break;
                }
                arrayList.add(poll);
            }
        } else {
            ((BlockingQueue) this.queue).drainTo(arrayList);
        }
        return arrayList;
    }

    @Override // org.springframework.integration.channel.QueueChannelOperations
    public List<Message<?>> purge(@Nullable MessageSelector messageSelector) {
        if (messageSelector == null) {
            return clear();
        }
        ArrayList arrayList = new ArrayList();
        for (Object obj : this.queue.toArray()) {
            Message<?> message = (Message) obj;
            if (!messageSelector.accept(message) && this.queue.remove(message)) {
                arrayList.add(message);
            }
        }
        return arrayList;
    }

    @Override // org.springframework.integration.channel.QueueChannelOperations
    public int getQueueSize() {
        return this.queue.size();
    }

    public int getRemainingCapacity() {
        if (this.queue instanceof BlockingQueue) {
            return ((BlockingQueue) this.queue).remainingCapacity();
        }
        return Integer.MAX_VALUE;
    }

    @Override // org.springframework.integration.channel.AbstractMessageChannel, org.springframework.integration.support.management.IntegrationManagement, org.springframework.beans.factory.DisposableBean
    public void destroy() {
        super.destroy();
        if (this.sizeGauge != null) {
            this.sizeGauge.remove();
        }
        if (this.remainingCapacityGauge != null) {
            this.remainingCapacityGauge.remove();
        }
    }
}
