package com.bes.mq.store.memory;

import com.bes.mq.broker.ConnectionContext;
import com.bes.mq.command.BESMQDestination;
import com.bes.mq.command.Message;
import com.bes.mq.command.MessageAck;
import com.bes.mq.command.MessageId;
import com.bes.mq.command.SubscriptionInfo;
import com.bes.mq.store.MessageRecoveryListener;
import com.bes.mq.store.TopicMessageStore;
import com.bes.mq.util.LRUCache;
import com.bes.mq.util.SubscriptionKey;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

/* loaded from: input_file:com/bes/mq/store/memory/MemoryTopicMessageStore.class */
public class MemoryTopicMessageStore extends MemoryMessageStore implements TopicMessageStore {
    private Map<SubscriptionKey, SubscriptionInfo> subscriberDatabase;
    private Map<SubscriptionKey, MemoryTopicSub> topicSubMap;

    public MemoryTopicMessageStore(BESMQDestination bESMQDestination) {
        this(bESMQDestination, new LRUCache(100, 100, 0.75f, false), makeSubscriptionInfoMap());
    }

    public MemoryTopicMessageStore(BESMQDestination bESMQDestination, Map<MessageId, Message> map, Map<SubscriptionKey, SubscriptionInfo> map2) {
        super(bESMQDestination, map);
        this.subscriberDatabase = map2;
        this.topicSubMap = makeSubMap();
    }

    protected static Map<SubscriptionKey, SubscriptionInfo> makeSubscriptionInfoMap() {
        return Collections.synchronizedMap(new HashMap());
    }

    protected static Map<SubscriptionKey, MemoryTopicSub> makeSubMap() {
        return Collections.synchronizedMap(new HashMap());
    }

    @Override // com.bes.mq.store.memory.MemoryMessageStore, com.bes.mq.store.MessageStore
    public synchronized void addMessage(ConnectionContext connectionContext, Message message) throws IOException {
        super.addMessage(connectionContext, message);
        Iterator<MemoryTopicSub> it = this.topicSubMap.values().iterator();
        while (it.hasNext()) {
            it.next().addMessage(message.getMessageId(), message);
        }
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized void acknowledge(ConnectionContext connectionContext, String str, String str2, MessageId messageId, MessageAck messageAck) throws IOException {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.removeMessage(messageId);
        }
        Message message = this.messageTable.get(messageId);
        if (message == null || !message.isExpired()) {
            return;
        }
        this.messageTable.remove(messageId);
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized SubscriptionInfo lookupSubscription(String str, String str2) throws IOException {
        return this.subscriberDatabase.get(new SubscriptionKey(str, str2));
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized void addSubsciption(SubscriptionInfo subscriptionInfo, boolean z) throws IOException {
        SubscriptionKey subscriptionKey = new SubscriptionKey(subscriptionInfo);
        MemoryTopicSub memoryTopicSub = new MemoryTopicSub();
        this.topicSubMap.put(subscriptionKey, memoryTopicSub);
        if (z) {
            for (Map.Entry<MessageId, Message> entry : this.messageTable.entrySet()) {
                memoryTopicSub.addMessage(entry.getKey(), entry.getValue());
            }
        }
        this.subscriberDatabase.put(subscriptionKey, subscriptionInfo);
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized void deleteSubscription(String str, String str2) {
        SubscriptionKey subscriptionKey = new SubscriptionKey(str, str2);
        this.subscriberDatabase.remove(subscriptionKey);
        this.topicSubMap.remove(subscriptionKey);
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized void recoverSubscription(String str, String str2, MessageRecoveryListener messageRecoveryListener) throws Exception {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.recoverSubscription(messageRecoveryListener);
        }
    }

    @Override // com.bes.mq.store.memory.MemoryMessageStore
    public synchronized void delete() {
        super.delete();
        this.subscriberDatabase.clear();
        this.topicSubMap.clear();
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public SubscriptionInfo[] getAllSubscriptions() throws IOException {
        return (SubscriptionInfo[]) this.subscriberDatabase.values().toArray(new SubscriptionInfo[this.subscriberDatabase.size()]);
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized int getMessageCount(String str, String str2) throws IOException {
        int i = 0;
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            i = memoryTopicSub.size();
        }
        return i;
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public synchronized void recoverNextMessages(String str, String str2, int i, MessageRecoveryListener messageRecoveryListener) throws Exception {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.recoverNextMessages(i, messageRecoveryListener);
        }
    }

    @Override // com.bes.mq.store.TopicMessageStore
    public void resetBatching(String str, String str2) {
        MemoryTopicSub memoryTopicSub = this.topicSubMap.get(new SubscriptionKey(str, str2));
        if (memoryTopicSub != null) {
            memoryTopicSub.resetBatching();
        }
    }
}
