package com.bes.mq.broker.region;

import com.bes.mq.BESMQMessageAudit;
import com.bes.mq.broker.Broker;
import com.bes.mq.broker.ConnectionContext;
import com.bes.mq.broker.region.cursors.FilePendingMessageCursor;
import com.bes.mq.broker.region.cursors.PendingMessageCursor;
import com.bes.mq.broker.region.cursors.VMPendingMessageCursor;
import com.bes.mq.broker.region.policy.MessageEvictionStrategy;
import com.bes.mq.broker.region.policy.OldestMessageEvictionStrategy;
import com.bes.mq.command.ConsumerControl;
import com.bes.mq.command.ConsumerInfo;
import com.bes.mq.command.Message;
import com.bes.mq.command.MessageAck;
import com.bes.mq.command.MessageDispatch;
import com.bes.mq.command.MessagePull;
import com.bes.mq.command.Response;
import com.bes.mq.org.slf4j.Logger;
import com.bes.mq.org.slf4j.LoggerFactory;
import com.bes.mq.transaction.Synchronization;
import com.bes.mq.transport.TransmitCallback;
import com.bes.mq.usage.SystemUsage;
import com.bes.mq.usage.Usage;
import com.bes.mq.usage.UsageListener;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicLong;
import javax.jms.JMSException;

/* loaded from: input_file:com/bes/mq/broker/region/TopicSubscription.class */
public class TopicSubscription extends AbstractSubscription implements UsageListener {
    private static final Logger LOG = LoggerFactory.getLogger(TopicSubscription.class);
    private static final AtomicLong CURSOR_NAME_COUNTER = new AtomicLong(0);
    private int fullWarnedFrequency;
    private AtomicLong fullWarnedCount;
    protected PendingMessageCursor matched;
    protected final SystemUsage usageManager;
    protected AtomicLong dispatchedCounter;
    boolean singleDestination;
    Destination destination;
    private int maximumPendingMessages;
    private MessageEvictionStrategy messageEvictionStrategy;
    private int discarded;
    private final Object matchedListMutex;
    private final AtomicLong enqueueCounter;
    private final AtomicLong dequeueCounter;
    private int memoryUsageHighWaterMark;
    protected int maxProducersToAudit;
    protected int maxAuditDepth;
    protected boolean enableAudit;
    protected BESMQMessageAudit audit;
    protected volatile boolean active;
    protected volatile boolean dispatchProcessPending;

    public TopicSubscription(Broker broker, ConnectionContext connectionContext, ConsumerInfo consumerInfo, SystemUsage systemUsage) throws Exception {
        super(broker, connectionContext, consumerInfo);
        this.fullWarnedFrequency = 60;
        this.fullWarnedCount = new AtomicLong();
        this.dispatchedCounter = new AtomicLong();
        this.singleDestination = true;
        this.maximumPendingMessages = -1;
        this.messageEvictionStrategy = new OldestMessageEvictionStrategy();
        this.matchedListMutex = new Object();
        this.enqueueCounter = new AtomicLong(0L);
        this.dequeueCounter = new AtomicLong(0L);
        this.memoryUsageHighWaterMark = 95;
        this.maxProducersToAudit = 1024;
        this.maxAuditDepth = 1000;
        this.enableAudit = false;
        this.active = false;
        this.dispatchProcessPending = false;
        this.usageManager = systemUsage;
        String str = "TopicSubscription:" + CURSOR_NAME_COUNTER.getAndIncrement() + "[" + consumerInfo.getConsumerId().toString() + "]";
        if (consumerInfo.getDestination().isTemporary() || broker == null || broker.getTempDataStore() == null) {
            this.matched = new VMPendingMessageCursor(false);
        } else {
            this.matched = new FilePendingMessageCursor(broker, str, false);
        }
    }

    public void init() throws Exception {
        this.matched.setSystemUsage(this.usageManager);
        this.matched.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
        this.matched.start();
        if (this.enableAudit) {
            this.audit = new BESMQMessageAudit(this.maxAuditDepth, this.maxProducersToAudit);
        }
        this.usageManager.getMemoryUsage().addUsageListener(this);
        this.active = true;
    }

    /* JADX WARN: Code restructure failed: missing block: B:42:0x01f0, code lost:
    
        if (r5.maximumPendingMessages <= 0) goto L87;
     */
    /* JADX WARN: Code restructure failed: missing block: B:43:0x01f3, code lost:
    
        r11 = r5.messageEvictionStrategy.getEvictExpiredMessagesHighWatermark();
     */
    /* JADX WARN: Code restructure failed: missing block: B:44:0x0202, code lost:
    
        if (r5.maximumPendingMessages <= 0) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:46:0x020b, code lost:
    
        if (r5.maximumPendingMessages >= r11) goto L58;
     */
    /* JADX WARN: Code restructure failed: missing block: B:47:0x020e, code lost:
    
        r11 = r5.maximumPendingMessages;
     */
    /* JADX WARN: Code restructure failed: missing block: B:49:0x021d, code lost:
    
        if (r5.matched.isEmpty() != false) goto L113;
     */
    /* JADX WARN: Code restructure failed: missing block: B:51:0x022b, code lost:
    
        if (r5.matched.size() <= r11) goto L113;
     */
    /* JADX WARN: Code restructure failed: missing block: B:52:0x022e, code lost:
    
        removeExpiredMessages();
     */
    /* JADX WARN: Code restructure failed: missing block: B:55:0x023b, code lost:
    
        if (r5.matched.isEmpty() != false) goto L112;
     */
    /* JADX WARN: Code restructure failed: missing block: B:57:0x024b, code lost:
    
        if (r5.matched.size() <= r5.maximumPendingMessages) goto L111;
     */
    /* JADX WARN: Code restructure failed: missing block: B:58:0x024e, code lost:
    
        r0 = java.lang.Math.max(1000, r5.matched.size() - r5.maximumPendingMessages);
        r0 = r5.matched;
     */
    /* JADX WARN: Code restructure failed: missing block: B:59:0x0275, code lost:
    
        monitor-enter(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:61:0x0276, code lost:
    
        r0 = r5.matched.pageInList(r0);
        r0 = r5.messageEvictionStrategy.evictMessages(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:62:0x0292, code lost:
    
        monitor-exit(r0);
     */
    /* JADX WARN: Code restructure failed: missing block: B:64:0x029e, code lost:
    
        r15 = 0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:65:0x02a3, code lost:
    
        if (r0 == null) goto L83;
     */
    /* JADX WARN: Code restructure failed: missing block: B:66:0x02a6, code lost:
    
        r15 = r0.length;
        r16 = 0;
     */
    /* JADX WARN: Code restructure failed: missing block: B:68:0x02b2, code lost:
    
        if (r16 >= r15) goto L114;
     */
    /* JADX WARN: Code restructure failed: missing block: B:69:0x02b5, code lost:
    
        discard(r0[r16]);
        r16 = r16 + 1;
     */
    /* JADX WARN: Code restructure failed: missing block: B:72:0x02ca, code lost:
    
        if (r15 != 0) goto L86;
     */
    /* JADX WARN: Code restructure failed: missing block: B:75:0x02cd, code lost:
    
        com.bes.mq.broker.region.TopicSubscription.LOG.warn("No messages to evict returned for " + r5.destination + " from eviction strategy: " + r5.messageEvictionStrategy + " out of " + r0.size() + " candidates");
     */
    /* JADX WARN: Code restructure failed: missing block: B:84:0x030f, code lost:
    
        dispatchPending();
     */
    @Override // com.bes.mq.broker.region.Subscription
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void add(com.bes.mq.broker.region.MessageReference r6) throws java.lang.Exception {
        /*
            Method dump skipped, instructions count: 808
            To view this dump add '--comments-level debug' option
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bes.mq.broker.region.TopicSubscription.add(com.bes.mq.broker.region.MessageReference):void");
    }

    private boolean isDuplicate(MessageReference messageReference) {
        boolean z = false;
        if (this.enableAudit && this.audit != null) {
            z = this.audit.isDuplicate(messageReference);
            if (LOG.isDebugEnabled() && z) {
                LOG.debug(this + ", ignoring duplicate add: " + messageReference.getMessageId());
            }
        }
        return z;
    }

    /* JADX WARN: Code restructure failed: missing block: B:9:0x002c, code lost:
    
        r5.matched.remove();
        r5.dispatchedCounter.incrementAndGet();
        r0.decrementReferenceCount();
        r0.getRegionDestination().getDestinationStatistics().getExpired().increment();
        r5.broker.messageExpired(getContext(), r0, r5);
     */
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    protected void removeExpiredMessages() throws java.io.IOException {
        /*
            r5 = this;
            r0 = r5
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L70
            r0.reset()     // Catch: java.lang.Throwable -> L70
        L9:
            r0 = r5
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L70
            boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> L70
            if (r0 == 0) goto L6a
            r0 = r5
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L70
            com.bes.mq.broker.region.MessageReference r0 = r0.next()     // Catch: java.lang.Throwable -> L70
            r6 = r0
            r0 = r5
            com.bes.mq.broker.Broker r0 = r0.broker     // Catch: java.lang.Throwable -> L70
            r1 = r6
            boolean r0 = r0.isExpired(r1)     // Catch: java.lang.Throwable -> L70
            if (r0 == 0) goto L67
            r0 = r5
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L70
            r0.remove()     // Catch: java.lang.Throwable -> L70
            r0 = r5
            java.util.concurrent.atomic.AtomicLong r0 = r0.dispatchedCounter     // Catch: java.lang.Throwable -> L70
            long r0 = r0.incrementAndGet()     // Catch: java.lang.Throwable -> L70
            r0 = r6
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L70
            r0 = r6
            com.bes.mq.broker.region.Destination r0 = r0.getRegionDestination()     // Catch: java.lang.Throwable -> L70
            com.bes.mq.broker.region.DestinationStatistics r0 = r0.getDestinationStatistics()     // Catch: java.lang.Throwable -> L70
            com.bes.mq.management.CountStatisticImpl r0 = r0.getExpired()     // Catch: java.lang.Throwable -> L70
            r0.increment()     // Catch: java.lang.Throwable -> L70
            r0 = r5
            com.bes.mq.broker.Broker r0 = r0.broker     // Catch: java.lang.Throwable -> L70
            r1 = r5
            com.bes.mq.broker.ConnectionContext r1 = r1.getContext()     // Catch: java.lang.Throwable -> L70
            r2 = r6
            r3 = r5
            r0.messageExpired(r1, r2, r3)     // Catch: java.lang.Throwable -> L70
            goto L6a
        L67:
            goto L9
        L6a:
            r0 = jsr -> L76
        L6d:
            goto L82
        L70:
            r7 = move-exception
            r0 = jsr -> L76
        L74:
            r1 = r7
            throw r1
        L76:
            r8 = r0
            r0 = r5
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched
            r0.release()
            ret r8
        L82:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bes.mq.broker.region.TopicSubscription.removeExpiredMessages():void");
    }

    /* JADX WARN: Code restructure failed: missing block: B:11:0x003d, code lost:
    
        r3.matched.remove();
        r3.dispatchedCounter.incrementAndGet();
        r0.decrementReferenceCount();
     */
    @Override // com.bes.mq.broker.region.Subscription
    /*
        Code decompiled incorrectly, please refer to instructions dump.
        To view partially-correct add '--show-bad-code' argument
    */
    public void processMessageDispatchNotification(com.bes.mq.command.MessageDispatchNotification r4) {
        /*
            r3 = this;
            r0 = r3
            java.lang.Object r0 = r0.matchedListMutex
            r1 = r0
            r5 = r1
            monitor-enter(r0)
            r0 = r3
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r0.reset()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
        L10:
            r0 = r3
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            boolean r0 = r0.hasNext()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            if (r0 == 0) goto L5b
            r0 = r3
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            com.bes.mq.broker.region.MessageReference r0 = r0.next()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r6 = r0
            r0 = r6
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r0 = r6
            com.bes.mq.command.MessageId r0 = r0.getMessageId()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r1 = r4
            com.bes.mq.command.MessageId r1 = r1.getMessageId()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            boolean r0 = r0.equals(r1)     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            if (r0 == 0) goto L58
            r0 = r3
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r0.remove()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r0 = r3
            java.util.concurrent.atomic.AtomicLong r0 = r0.dispatchedCounter     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            long r0 = r0.incrementAndGet()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            r0 = r6
            int r0 = r0.decrementReferenceCount()     // Catch: java.lang.Throwable -> L61 java.lang.Throwable -> L7b
            goto L5b
        L58:
            goto L10
        L5b:
            r0 = jsr -> L69
        L5e:
            goto L76
        L61:
            r7 = move-exception
            r0 = jsr -> L69
        L66:
            r1 = r7
            throw r1     // Catch: java.lang.Throwable -> L7b
        L69:
            r8 = r0
            r0 = r3
            com.bes.mq.broker.region.cursors.PendingMessageCursor r0 = r0.matched     // Catch: java.lang.Throwable -> L7b
            r0.release()     // Catch: java.lang.Throwable -> L7b
            ret r8     // Catch: java.lang.Throwable -> L7b
        L76:
            r1 = r5
            monitor-exit(r1)     // Catch: java.lang.Throwable -> L7b
            goto L82
        L7b:
            r9 = move-exception
            r0 = r5
            monitor-exit(r0)     // Catch: java.lang.Throwable -> L7b
            r0 = r9
            throw r0
        L82:
            return
        */
        throw new UnsupportedOperationException("Method not decompiled: com.bes.mq.broker.region.TopicSubscription.processMessageDispatchNotification(com.bes.mq.command.MessageDispatchNotification):void");
    }

    @Override // com.bes.mq.broker.region.Subscription
    public synchronized void acknowledge(ConnectionContext connectionContext, final MessageAck messageAck) throws Exception {
        if (messageAck.isStandardAck() || messageAck.isPoisonAck() || messageAck.isIndividualAck()) {
            if (connectionContext.isInTransaction()) {
                connectionContext.getTransaction().addSynchronization(new Synchronization() { // from class: com.bes.mq.broker.region.TopicSubscription.1
                    @Override // com.bes.mq.transaction.Synchronization
                    public void afterCommit() throws Exception {
                        synchronized (TopicSubscription.this) {
                            if (TopicSubscription.this.singleDestination && TopicSubscription.this.destination != null) {
                                TopicSubscription.this.destination.getDestinationStatistics().getDequeues().add(messageAck.getMessageCount());
                            }
                        }
                        TopicSubscription.this.dequeueCounter.addAndGet(messageAck.getMessageCount());
                        TopicSubscription.this.dispatchPending();
                    }
                });
            } else {
                if (this.singleDestination && this.destination != null) {
                    this.destination.getDestinationStatistics().getDequeues().add(messageAck.getMessageCount());
                    this.destination.getDestinationStatistics().getInflight().subtract(messageAck.getMessageCount());
                }
                this.dequeueCounter.addAndGet(messageAck.getMessageCount());
            }
            dispatchPending();
            return;
        }
        if (!messageAck.isDeliveredAck()) {
            if (!messageAck.isRedeliveredAck()) {
                throw new JMSException("Invalid acknowledgment: " + messageAck);
            }
            return;
        }
        if (this.destination != null && !messageAck.isInTransaction()) {
            this.destination.getDestinationStatistics().getDequeues().add(messageAck.getMessageCount());
            this.destination.getDestinationStatistics().getInflight().subtract(messageAck.getMessageCount());
        }
        this.dequeueCounter.addAndGet(messageAck.getMessageCount());
        dispatchPending();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public Response pullMessage(ConnectionContext connectionContext, MessagePull messagePull) throws Exception {
        return null;
    }

    @Override // com.bes.mq.broker.region.Subscription
    public int getPendingQueueSize() {
        return matched();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public int getDispatchedQueueSize() {
        return (int) (this.dispatchedCounter.get() - (this.dequeueCounter.get() - discarded()));
    }

    public int getMaximumPendingMessages() {
        return this.maximumPendingMessages;
    }

    @Override // com.bes.mq.broker.region.Subscription
    public long getDispatchedCounter() {
        return this.dispatchedCounter.get();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public long getEnqueueCounter() {
        return this.enqueueCounter.get();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public long getDequeueCounter() {
        return this.dequeueCounter.get();
    }

    public int discarded() {
        int i;
        synchronized (this.matchedListMutex) {
            i = this.discarded;
        }
        return i;
    }

    public int matched() {
        int size;
        synchronized (this.matchedListMutex) {
            size = this.matched.size();
        }
        return size;
    }

    public void setMaximumPendingMessages(int i) {
        this.maximumPendingMessages = i;
    }

    public MessageEvictionStrategy getMessageEvictionStrategy() {
        return this.messageEvictionStrategy;
    }

    public void setMessageEvictionStrategy(MessageEvictionStrategy messageEvictionStrategy) {
        this.messageEvictionStrategy = messageEvictionStrategy;
    }

    public int getMaxProducersToAudit() {
        return this.maxProducersToAudit;
    }

    public synchronized void setMaxProducersToAudit(int i) {
        this.maxProducersToAudit = i;
        if (this.audit != null) {
            this.audit.setMaximumNumberOfProducersToTrack(i);
        }
    }

    public int getMaxAuditDepth() {
        return this.maxAuditDepth;
    }

    public synchronized void setMaxAuditDepth(int i) {
        this.maxAuditDepth = i;
        if (this.audit != null) {
            this.audit.setAuditDepth(i);
        }
    }

    public boolean isEnableAudit() {
        return this.enableAudit;
    }

    public synchronized void setEnableAudit(boolean z) {
        this.enableAudit = z;
        if (z && this.audit == null) {
            this.audit = new BESMQMessageAudit(this.maxAuditDepth, this.maxProducersToAudit);
        }
    }

    @Override // com.bes.mq.broker.region.Subscription
    public boolean isFull() {
        return getDispatchedQueueSize() >= this.info.getPrefetchSize();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public int getInFlightSize() {
        return getDispatchedQueueSize();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public boolean isLowWaterMark() {
        return ((double) getDispatchedQueueSize()) <= ((double) this.info.getPrefetchSize()) * 0.4d;
    }

    @Override // com.bes.mq.broker.region.Subscription
    public boolean isHighWaterMark() {
        return ((double) getDispatchedQueueSize()) >= ((double) this.info.getPrefetchSize()) * 0.9d;
    }

    public void setMemoryUsageHighWaterMark(int i) {
        this.memoryUsageHighWaterMark = i;
    }

    public int getMemoryUsageHighWaterMark() {
        return this.memoryUsageHighWaterMark;
    }

    public SystemUsage getUsageManager() {
        return this.usageManager;
    }

    public PendingMessageCursor getMatched() {
        return this.matched;
    }

    public void setMatched(PendingMessageCursor pendingMessageCursor) {
        this.matched = pendingMessageCursor;
    }

    @Override // com.bes.mq.broker.region.Subscription
    public void updateConsumerPrefetch(int i) {
        if (this.context == null || this.context.getConnection() == null || !this.context.getConnection().isManageable()) {
            return;
        }
        ConsumerControl consumerControl = new ConsumerControl();
        consumerControl.setConsumerId(this.info.getConsumerId());
        consumerControl.setPrefetch(i);
        this.context.getConnection().dispatchAsync(consumerControl);
    }

    @Override // com.bes.mq.broker.region.Subscription
    public void dispatchPending() throws IOException {
        this.dispatchProcessPending = false;
        boolean z = false;
        synchronized (this.matchedListMutex) {
            if (!this.matched.isEmpty() && !isFull()) {
                try {
                    this.matched.reset();
                    while (this.matched.hasNext() && !isFull()) {
                        MessageReference next = this.matched.next();
                        this.matched.remove();
                        if (next.isExpired()) {
                            discard(next);
                        } else {
                            dispatch(next);
                            z = true;
                        }
                    }
                    if (!z && getDispatchedQueueSize() == 0 && this.matched.size() > 0) {
                        this.dispatchProcessPending = true;
                    }
                } finally {
                    this.matched.release();
                }
            }
        }
    }

    private void dispatch(final MessageReference messageReference) throws IOException {
        MessageDispatch messageDispatch = new MessageDispatch();
        messageDispatch.setMessage((Message) messageReference);
        messageDispatch.setConsumerId(this.info.getConsumerId());
        messageDispatch.setDestination(messageReference.getRegionDestination().getBESMQDestination());
        this.dispatchedCounter.incrementAndGet();
        if (this.singleDestination) {
            if (this.destination == null) {
                this.destination = messageReference.getRegionDestination();
            } else if (this.destination != messageReference.getRegionDestination()) {
                this.singleDestination = false;
            }
        }
        if (this.info.isDispatchAsync()) {
            if (messageReference != null) {
                messageDispatch.setTransmitCallback(new TransmitCallback() { // from class: com.bes.mq.broker.region.TopicSubscription.2
                    @Override // com.bes.mq.transport.TransmitCallback
                    public void onSuccess() {
                        Destination regionDestination = messageReference.getRegionDestination();
                        regionDestination.getDestinationStatistics().getDispatched().increment();
                        regionDestination.getDestinationStatistics().getInflight().increment();
                        messageReference.decrementReferenceCount();
                    }

                    @Override // com.bes.mq.transport.TransmitCallback
                    public void onFailure() {
                        Destination regionDestination = messageReference.getRegionDestination();
                        regionDestination.getDestinationStatistics().getDispatched().increment();
                        regionDestination.getDestinationStatistics().getInflight().increment();
                        messageReference.decrementReferenceCount();
                    }
                });
            }
            this.context.getConnection().dispatchAsync(messageDispatch);
        } else {
            this.context.getConnection().dispatchSync(messageDispatch);
            messageReference.getRegionDestination().getDestinationStatistics().getDispatched().increment();
            messageReference.getRegionDestination().getDestinationStatistics().getInflight().increment();
            messageReference.decrementReferenceCount();
        }
    }

    private void discard(MessageReference messageReference) {
        messageReference.decrementReferenceCount();
        this.matched.remove(messageReference);
        this.discarded++;
        if (this.destination != null) {
            this.destination.getDestinationStatistics().getDequeues().increment();
        }
        this.dequeueCounter.incrementAndGet();
        if (LOG.isDebugEnabled()) {
            LOG.debug(this + ", discarding message " + messageReference);
        }
        Destination regionDestination = messageReference.getRegionDestination();
        if (regionDestination != null) {
            regionDestination.messageDiscarded(getContext(), this, messageReference);
        }
        this.broker.getRoot().sendToDeadLetterQueue(getContext(), messageReference, this, new Throwable("Messsage is discarded by TopicSubscription [" + this.info.getConsumerId() + "]"));
    }

    public String toString() {
        return "TopicSubscription: consumer=" + this.info.getConsumerId() + ", destinations=" + this.destinations.size() + ", dispatched=" + getDispatchedQueueSize() + ", delivered=" + getDequeueCounter() + ", matched=" + matched() + ", discarded=" + discarded();
    }

    @Override // com.bes.mq.broker.region.Subscription
    public void destroy() {
        this.active = false;
        this.usageManager.getMemoryUsage().removeUsageListener(this);
        synchronized (this.matchedListMutex) {
            try {
                this.matched.destroy();
            } catch (Exception e) {
                LOG.warn("Failed to destroy cursor", (Throwable) e);
            }
        }
        setSlowConsumer(false);
    }

    @Override // com.bes.mq.broker.region.AbstractSubscription, com.bes.mq.broker.region.Subscription
    public int getPrefetchSize() {
        return this.info.getPrefetchSize();
    }

    @Override // com.bes.mq.broker.region.AbstractSubscription
    public void setPrefetchSize(int i) {
        this.info.setPrefetchSize(i);
        try {
            dispatchPending();
        } catch (Exception e) {
            LOG.trace("Caught exception on dispatch after prefetch size change.");
        }
    }

    public void setFullWarnedFrequency(int i) {
        this.fullWarnedFrequency = i;
    }

    public int getFullWarnedFrequency() {
        return this.fullWarnedFrequency;
    }

    @Override // com.bes.mq.usage.UsageListener
    public void onUsageChanged(Usage usage, int i, int i2) {
        if (this.dispatchProcessPending && this.active && i2 < getCursorMemoryHighWaterMark()) {
            try {
                dispatchPending();
            } catch (IOException e) {
                LOG.warn("Error occurred while dispatching pending messages of " + this + ", caused by: ", (Throwable) e);
            }
        }
    }
}
