package kd.bos.mq.support;

import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.io.StringReader;
import java.lang.reflect.InvocationTargetException;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.LockSupport;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.extension.ExtensionFactory;
import kd.bos.instance.AppGroup;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessageConsumer;
import kd.bos.mq.config.ConfigKeys;
import kd.bos.mq.config.ConsumerDef;
import kd.bos.mq.config.QueueDef;
import kd.bos.mq.rabbit.ChannelFactory;
import kd.bos.mq.rabbit.RabbitConsumer;
import kd.bos.mq.support.partition.BroadcastConsumer;
import kd.bos.mq.support.partition.BroadcastConsumerWrapper;
import kd.bos.mq.support.partition.ZKQueueManager;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import kd.bos.util.ConfigurationUtil;
import kd.bos.util.JSONUtils;
import kd.bos.util.StringUtils;
import kd.bos.util.resource.Resources;

/* loaded from: input_file:kd/bos/mq/support/QueueManager.class */
public class QueueManager {
    public static final int DEFAULT_CONSUMER_CONCURRENCY = 1;
    public static final int DEFAULT_QUEUE_MAXlENGTH = 1000000;
    private static final Log logger = LogFactory.getLog(QueueManager.class);
    private static ConcurrentHashMap<String, QueueDef> queueDefs = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, String> queueGroup = new ConcurrentHashMap<>();
    private static ConcurrentHashMap<String, Boolean> queueDeclared = new ConcurrentHashMap<>();
    private static String queueTag = System.getProperty(ConfigKeys.CONFIG_DEBUG_QUEUETAG);
    private static Map<String, Object> consumers = new ConcurrentHashMap();
    private static Map<String, Consumer> deLayConsumers = new ConcurrentHashMap();
    private static Map<String, NoneInitDelayConsumer> noneInitDeLayConsumers = new ConcurrentHashMap();
    private static Map<String, String> mqTypeMap = new ConcurrentHashMap();
    private static Map<String, ThreadPool> poolMap = new ConcurrentHashMap();
    private static Map<String, String> queueNameMap = new ConcurrentHashMap(128);
    private static Map<String, String> queueAppMap = new ConcurrentHashMap(128);
    private static Set<String> log = new HashSet();

    public static String getRealQueueName(String str, String str2) {
        String standardQueueName = getStandardQueueName(str, str2);
        String str3 = queueGroup.get(str + str2);
        if (str3 == null) {
            return standardQueueName;
        }
        String standardQueueName2 = getStandardQueueName(str, getGroupQueueName(str2, str3));
        queueNameMap.putIfAbsent(standardQueueName2, standardQueueName);
        return standardQueueName2;
    }

    public static String getGroupQueueName(String str, String str2) {
        return str + "." + str2;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static String getStandardQueueName(String str, String str2) {
        return str + "." + (queueTag == null ? str2 : str2 + "." + queueTag);
    }

    public static String getRealQueueNameWithAppid(String str, String str2, String str3) {
        return str3 + "." + getRealQueueName(str, str2);
    }

    public static boolean queueHasDefine(String str) {
        return queueDefs.containsKey(str);
    }

    public static void add(String str, QueueDef queueDef) {
        add(str, queueDef, true);
    }

    public static void add(String str, QueueDef queueDef, boolean z) {
        queueGroup.computeIfAbsent(str + queueDef.getName(), str2 -> {
            if (queueDef.getAppid() == null || !AppGroup.isCurrentGrayNode()) {
                return null;
            }
            return AppGroup.getCurrentGrayAppGroup(queueDef.getAppid());
        });
        String realQueueName = getRealQueueName(str, queueDef.getName());
        queueDefs.put(realQueueName, queueDef);
        if (z) {
            initQueueConsumers(str, realQueueName, queueDef);
        }
    }

    public static void addWithAppidTags(String str, QueueDef queueDef) {
        String[] appIds = Instance.getAppIds();
        if (appIds == null) {
            throw new KDException(BosErrorCode.configNotFound, new Object[]{String.format(Resources.getString("没有配置appIds,%s", "QueueManager_0", "bos-mq", new Object[0]), new Object[0]), System.getProperty("appName")});
        }
        for (String str2 : appIds) {
            QueueDef cloneQueueDef = cloneQueueDef(queueDef);
            if (StringUtils.isEmpty(cloneQueueDef.getAppid())) {
                cloneQueueDef.setAppid(str2);
            }
            String realQueueNameWithAppid = getRealQueueNameWithAppid(str, cloneQueueDef.getName(), str2);
            queueDefs.put(realQueueNameWithAppid, cloneQueueDef);
            initQueueConsumers(str, realQueueNameWithAppid, cloneQueueDef);
        }
    }

    private static QueueDef cloneQueueDef(QueueDef queueDef) {
        QueueDef queueDef2 = new QueueDef();
        queueDef2.setName(queueDef.getName());
        queueDef2.setTransactional(queueDef.isTransactional());
        queueDef2.setDuration(queueDef.isDuration());
        queueDef2.setMaxQueueLength(queueDef.getMaxQueueLength());
        queueDef2.setAppid(queueDef.getAppid());
        queueDef2.setLazyInit(queueDef.isLazyInit());
        queueDef2.setSequential(queueDef.isSequential());
        queueDef2.setConsumers(queueDef.getConsumers());
        queueDef2.setPartition(queueDef.isPartition());
        return queueDef2;
    }

    public static QueueDef getQueueDefWithQueueDefNameAndAppid(String str, String str2, String str3) {
        return queueDefs.get(getRealQueueNameWithAppid(str, str2, str3));
    }

    public static QueueDef get(String str, String str2) {
        return queueDefs.get(getRealQueueName(str, str2));
    }

    public static QueueDef getQueueDefWithRealQueueName(String str, String str2) {
        return queueDefs.get(str2);
    }

    public static void declareIfNeed(Channel channel, String str, String str2, int i) {
        HashMap hashMap = new HashMap();
        if (i <= 0 || i > 1000000) {
            i = 1000000;
        }
        hashMap.put("x-max-length", Integer.valueOf(i));
        if (Boolean.TRUE.equals(queueDeclared.get(str2))) {
            return;
        }
        QueueDeclare.get(getMQType(str)).queueDeclare(str, str2, queueDefs.get(str2).isDuration(), hashMap);
        queueDeclared.putIfAbsent(str2, true);
    }

    public static void broadcastConsumerIfNeed(String str, QueueDef queueDef, String str2, String str3) {
        String property = System.getProperty("mq.queue.consumer.broadcast.type", "http");
        if (ZKQueueManager.existMqConsumer(str2)) {
            property = "mq";
            if (BroadcastConsumerWrapper.validBroadcast(str2) && ZKQueueManager.existQueueMeta(str, queueDef.getName(), str3)) {
                if (log.add(str2)) {
                    logger.info("queue:[" + str2 + "] all node consumer started");
                    return;
                }
                return;
            } else if (BroadcastConsumerWrapper.needBroadcastSync(str2)) {
                property = "http";
            }
        }
        ((BroadcastConsumer) ExtensionFactory.getExtensionFacotry(BroadcastConsumer.class).getExtension(property)).broadRegisterConsumer(str, queueDef.getName(), str2, str3);
    }

    private static void initQueueConsumers(String str, String str2, QueueDef queueDef) {
        if (ConfigForInitConsumer.isConsumerRegionEnable(str)) {
            List<ConsumerDef> consumers2 = queueDef.getConsumers();
            if (consumers2 == null || consumers2.isEmpty()) {
                logger.error("mq.usage config consumers is empty for " + str + "/" + queueDef.getName());
                return;
            }
            ConsumerDef consumerDef = consumers2.get(0);
            if (ConfigForInitConsumer.isNotInitConsumer()) {
                return;
            }
            if (ConfigForInitConsumer.needWaitingInitConsumer()) {
                poolMap.computeIfAbsent(QueueManager.class.getName(), str3 -> {
                    return ThreadPools.newFixedThreadPool("Mq-InitConsumer", 1);
                }).execute(() -> {
                    while (ConfigForInitConsumer.needWaitingInitConsumer()) {
                        LockSupport.parkNanos(1000000000L);
                    }
                    try {
                        initConsumer(str, str2, queueDef, consumerDef);
                    } catch (Exception e) {
                        logger.warn("Not deploy mq for  queue " + str + "/" + str2 + " ," + e.getMessage(), e);
                    }
                });
            } else {
                initConsumer(str, str2, queueDef, consumerDef);
            }
            for (int i = 1; i < consumers2.size(); i++) {
                logger.warn("mq.usage consumer " + consumers2.get(i).getClassName() + " ignored. only one cunsumer allowed.");
            }
        }
    }

    private static void initConsumer(String str, String str2, QueueDef queueDef, ConsumerDef consumerDef) {
        String className = consumerDef.getClassName();
        String str3 = str + str2;
        if (consumers.containsKey(str3)) {
            return;
        }
        synchronized (QueueManager.class) {
            if (consumers.containsKey(str3)) {
                return;
            }
            try {
                String appid = queueDef.getAppid();
                if (appid != null) {
                    queueAppMap.putIfAbsent(str2, appid);
                }
                String mQType = getMQType(str);
                MessageConsumer messageConsumer = (MessageConsumer) Class.forName(className).newInstance();
                if (Instance.isLightWeightDeploy() || "fakemq".equals(mQType)) {
                    consumers.put(str3, Class.forName("kd.bos.fake.mq.RabbitConsumerFake").getConstructor(String.class, String.class, Boolean.TYPE, Integer.TYPE, MessageConsumer.class, Integer.TYPE).newInstance(str, str2, Boolean.valueOf(consumerDef.isAutoAck()), Integer.valueOf(getConcurrency(consumerDef)), messageConsumer, Integer.valueOf(queueDef.getMaxQueueLength())));
                } else {
                    Consumer consumer = QueueDeclare.get(mQType).getConsumer(str, str2, queueDef, consumerDef, messageConsumer);
                    consumers.put(str3, consumer);
                    if (queueDef.isLazyInit() || queueDef.isSequential()) {
                        String realQueueName = getRealQueueName(str, queueDef.getName());
                        deLayConsumers.put(realQueueName, consumer);
                        NoneInitDelayConsumer noneInitDelayConsumer = noneInitDeLayConsumers.get(realQueueName);
                        if (noneInitDelayConsumer != null && noneInitDelayConsumer.isStarted()) {
                            consumer.start();
                        }
                    } else {
                        consumer.start();
                    }
                }
                try {
                    ZKQueueManager.registDynamicMqConsumer(queueDef.isPartition(), str2);
                } catch (Exception e) {
                    logger.warn("registryMqConsumer fail for queue {}", e, str2);
                }
            } catch (ClassNotFoundException | Error | IllegalAccessException | IllegalArgumentException | InstantiationException | NoSuchMethodException | SecurityException | InvocationTargetException e2) {
                throw new KDException(e2, BosErrorCode.mqException, new Object[]{"Can't init consumer by class " + className});
            }
        }
    }

    public static Map<String, Object> getConsumers() {
        return Collections.unmodifiableMap(consumers);
    }

    public static Map<String, String> getQueueNameMap() {
        return Collections.unmodifiableMap(queueNameMap);
    }

    public static Map<String, QueueDef> getAllQueueDefs() {
        return Collections.unmodifiableMap(queueDefs);
    }

    public static Map<String, String> getQueueAppMap() {
        return Collections.unmodifiableMap(queueAppMap);
    }

    public static Consumer getLazyInitConsumer(String str, String str2) {
        String realQueueName = getRealQueueName(str, str2);
        return ConfigForInitConsumer.isConsumerEnable() ? deLayConsumers.get(realQueueName) : noneInitDeLayConsumers.computeIfAbsent(realQueueName, str3 -> {
            return new NoneInitDelayConsumer(realQueueName);
        });
    }

    public static int getConcurrency(ConsumerDef consumerDef) {
        int concurrency = consumerDef.getConcurrency();
        if (concurrency == -1) {
            concurrency = ConfigurationUtil.getInteger(ConfigKeys.MQ_CONSUMER_CONCURRENCY_KEY, 1).intValue();
        }
        return concurrency;
    }

    public static void resetConsumer(String str) {
        updateConsumer(str, -1);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public static void updateConsumer(String str, Integer num) {
        Object obj = consumers.get(str);
        if (obj instanceof Consumer) {
            Consumer consumer = (Consumer) obj;
            if (num.intValue() > 0) {
                consumer.setConcurrency(num.intValue());
            }
            if (consumer.isStarted()) {
                try {
                    consumer.$$stop();
                    if (obj instanceof RabbitConsumer) {
                        RabbitConsumer rabbitConsumer = (RabbitConsumer) consumer;
                        rabbitConsumer.setChannel(ChannelFactory.getChannel(rabbitConsumer.getRegion()));
                    }
                } finally {
                    consumer.start();
                }
            }
        }
    }

    public static void registryConsumerByAppId(String str, String str2, String str3) {
        if (ConsumerSupport.isNeedDeploy(str3)) {
            String realQueueName = getRealQueueName(str, str2);
            QueueDef queueDef = queueDefs.get(realQueueName);
            if (queueDef == null) {
                throw new KDException(BosErrorCode.mqException, new Object[]{"dynamicRegistryConsumer fail: queue " + realQueueName + " has not define or do not support dynamic"});
            }
            String str4 = StringUtils.isNotEmpty(str3) ? str2 + "." + str3 : str2;
            String realQueueName2 = getRealQueueName(str, str4);
            Object obj = consumers.get(str + realQueueName2);
            if (!(obj instanceof Consumer) || !((Consumer) obj).isStarted()) {
                add(str, redefineQueueDef(queueDef, str3, str4));
            } else if (ZKQueueManager.registryConsumerFailCache.contains(realQueueName2) || !ZKQueueManager.existMqConsumer(realQueueName2, true)) {
                ZKQueueManager.registDynamicMqConsumer(true, realQueueName2);
            }
        }
    }

    public static QueueDef redefineQueueDef(QueueDef queueDef, String str, String str2) {
        try {
            QueueDef queueDef2 = (QueueDef) JSONUtils.cast(JSONUtils.toString(queueDef, false), QueueDef.class);
            queueDef2.setAppid(str);
            queueDef2.setName(str2);
            queueDef2.setPartition(true);
            return queueDef2;
        } catch (IOException e) {
            throw new KDException(e, BosErrorCode.mqException, new Object[]{"copy queueDef fail" + e.getMessage()});
        }
    }

    public static void putConsumerForGray(String str, Consumer consumer) {
        consumers.putIfAbsent(str, consumer);
    }

    public static void putQueueDefForGray(String str, QueueDef queueDef) {
        queueDefs.putIfAbsent(str, queueDef);
    }

    public static String getMQType(String str) {
        String str2 = mqTypeMap.get(str);
        if (str2 != null) {
            return str2;
        }
        String property = System.getProperty("mq.server." + str);
        try {
            if (property != null) {
                Properties properties = new Properties();
                properties.load(new StringReader(property));
                str2 = (String) Objects.requireNonNull(properties.getProperty("type"), "type can't be empty.");
            } else {
                String str3 = mqTypeMap.get(ConfigKeys.MQ_SERVER_KEY);
                if (str3 != null) {
                    return str3;
                }
                String property2 = System.getProperty(ConfigKeys.MQ_SERVER_KEY);
                if (property2 == null) {
                    throw new KDException(BosErrorCode.mqServerConfiguration, new Object[]{"mq server not config for " + property2});
                }
                Properties properties2 = new Properties();
                properties2.load(new StringReader(property2));
                str2 = (String) Objects.requireNonNull(properties2.getProperty("type"), "QueueManager.getMQType() type can't be empty.");
                str = ConfigKeys.MQ_SERVER_KEY;
            }
        } catch (IOException e) {
            logger.error("get mq type exception", e);
        }
        if (Arrays.asList(ConfigKeys.JMS_PROVIDERS).contains(str2)) {
            str2 = ConfigKeys.JMS;
        }
        mqTypeMap.put(str, str2);
        if (str2 != null) {
            return str2.trim();
        }
        return null;
    }

    public static Map<String, String> getMqTypeMap() {
        return Collections.unmodifiableMap(mqTypeMap);
    }
}
