package kd.bos.mq.kafka;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.kafka.KafkaInfo;
import kd.bos.kafka.KafkamqFactory;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.support.QueueManager;

/* loaded from: input_file:kd/bos/mq/kafka/ConsumerFactory.class */
public class ConsumerFactory {
    private static final Log LOGGER = LogFactory.getLog(ConsumerFactory.class);
    private static Map<String, Thread> APPID_POLLTHREAD_MAP = new HashMap(64);
    private static Map<String, Thread> APPID_COMMITTHREAD_MAP = new HashMap(64);
    private static Map<String, org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>> APPID_CONSUMER_MAP = new HashMap(64);

    public static org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> getConsumer(String str, String str2) {
        try {
            KafkaInfo kafkaInfo = KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(str));
            String appid = QueueManager.getQueueDefWithRealQueueName(str, str2).getAppid();
            if (appid == null) {
                appid = KafkaConstants.DEFAULT_APPID;
            }
            org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> kafkaConsumer = APPID_CONSUMER_MAP.get(appid);
            if (kafkaConsumer == null) {
                Properties consumerConfig = KafkaConfig.getConsumerConfig(kafkaInfo);
                consumerConfig.put("group.id", KafkaTopicManager.getGroupId(kafkaInfo.getVhost(), str, str2));
                kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(consumerConfig);
                APPID_CONSUMER_MAP.put(appid, kafkaConsumer);
            }
            return kafkaConsumer;
        } catch (KDException e) {
            LOGGER.error("ConsumerFactory getConsumer failed:", e);
            throw new KDException(e, BosErrorCode.kafkaException, new Object[]{"ConsumerFactory getConsumer failed" + e.getMessage()});
        }
    }

    public static org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> getBroadcastConsumer(String str, String str2, String str3) {
        try {
            KafkaInfo kafkaInfo = KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(str));
            String topicAndGroupName = ProducerFactory.getTopicAndGroupName(kafkaInfo.getVhost(), str2);
            Properties consumerConfig = KafkaConfig.getConsumerConfig(kafkaInfo);
            consumerConfig.put("group.id", str3);
            org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> kafkaConsumer = new org.apache.kafka.clients.consumer.KafkaConsumer<>(consumerConfig);
            kafkaConsumer.subscribe(Collections.singleton(topicAndGroupName));
            KafkaMQTopicUtil.createTopic(kafkaInfo, topicAndGroupName, Integer.parseInt(System.getProperty(KafkaConstants.MQ_KAFKA_TOPIC_PARTITIONS, "4")));
            return kafkaConsumer;
        } catch (KDException e) {
            LOGGER.error("ConsumerFactory getBroadcastConsumer failed:", e);
            throw new KDException(e, BosErrorCode.kafkaException, new Object[]{"ConsumerFactory getBroadcastConsumer failed" + e.getMessage()});
        }
    }

    public static void putAppIdPollThread(String str, Thread thread) {
        APPID_POLLTHREAD_MAP.putIfAbsent(str, thread);
    }

    public static Thread getAppIdPollThread(String str) {
        return APPID_POLLTHREAD_MAP.get(str);
    }

    public static void putAppIdSubmitThread(String str, Thread thread) {
        APPID_COMMITTHREAD_MAP.putIfAbsent(str, thread);
    }

    public static Thread getAppIdSubmitThread(String str) {
        return APPID_COMMITTHREAD_MAP.get(str);
    }
}
