package kd.bos.mq.kafka;

import java.time.Duration;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.instance.Instance;
import kd.bos.kafka.KafkaInfo;
import kd.bos.kafka.KafkamqFactory;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.broadcast.Broadcast;
import kd.bos.mq.broadcast.Configure;
import kd.bos.mq.broadcast.MessageReceive;
import kd.bos.mq.config.ConfigKeys;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.ProducerRecord;

/* loaded from: input_file:kd/bos/mq/kafka/KafkaBroadcast.class */
public class KafkaBroadcast extends Broadcast {
    private static final String BROADCAST_REGION = "broadcast";
    private static final String BROADCAST_QUEUE = "broadcast_queue_" + Instance.getClusterName() + System.getProperty(ConfigKeys.CONFIG_DEBUG_QUEUETAG, "");
    private static final Log logger = LogFactory.getLog(KafkaBroadcast.class);
    private static Map<String, org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]>> MQConsumerMap = new ConcurrentHashMap(8);

    private static void regist() {
        if (MQConsumerMap.get(BROADCAST_QUEUE) != null) {
            return;
        }
        new Thread(() -> {
            org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> broadcastConsumer = ConsumerFactory.getBroadcastConsumer(BROADCAST_REGION, BROADCAST_QUEUE, Instance.getInstanceId());
            MQConsumerMap.put(BROADCAST_QUEUE, broadcastConsumer);
            while (true) {
                ConsumerRecords poll = broadcastConsumer.poll(Duration.ofMillis(100L));
                if (!poll.isEmpty()) {
                    try {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            MessageReceive.instance().onMessage((byte[]) ((ConsumerRecord) it.next()).value());
                        }
                        broadcastConsumer.commitSync();
                    } catch (Throwable th) {
                        broadcastConsumer.commitSync();
                        throw th;
                    }
                }
            }
        }, "kafkaBroadcastConsumer-" + getTopicName(KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(BROADCAST_REGION)).getVhost(), BROADCAST_QUEUE)).start();
    }

    private static void regist(String str) {
        if (MQConsumerMap.get(str) != null) {
            return;
        }
        new Thread(() -> {
            org.apache.kafka.clients.consumer.KafkaConsumer<String, byte[]> broadcastConsumer = ConsumerFactory.getBroadcastConsumer(getAppRegion(str), getBroadcastQueue(str), Instance.getInstanceId() + "_" + str);
            MQConsumerMap.put(str, broadcastConsumer);
            while (true) {
                ConsumerRecords poll = broadcastConsumer.poll(Duration.ofMillis(100L));
                if (!poll.isEmpty()) {
                    try {
                        Iterator it = poll.iterator();
                        while (it.hasNext()) {
                            MessageReceive.instance().onMessage((byte[]) ((ConsumerRecord) it.next()).value());
                        }
                        broadcastConsumer.commitSync();
                    } catch (Throwable th) {
                        broadcastConsumer.commitSync();
                        throw th;
                    }
                }
            }
        }, "kafkaBroadcastConsumer-" + str + "-" + getTopicName(KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(getAppRegion(str))).getVhost(), getBroadcastQueue(str))).start();
    }

    private static String getAppRegion(String str) {
        return "broadcast_" + str;
    }

    private static String getBroadcastQueue(String str) {
        return BROADCAST_QUEUE + '_' + str;
    }

    private static String getTopicName(String str, String str2) {
        return (str + '_' + str2).replace('.', '-');
    }

    @Override // kd.bos.mq.broadcast.Broadcast
    public void registerBroadcastConsumer() {
        String[] appIds;
        regist();
        try {
            if (Instance.isAppSplit() && (appIds = Instance.getAppIds()) != null) {
                Set<String> supportBroadcastAppids = Configure.getSupportBroadcastAppids();
                for (String str : appIds) {
                    if (supportBroadcastAppids.contains(str)) {
                        regist(str);
                    }
                }
            }
        } catch (Exception e) {
            throw new KDException(e, BosErrorCode.kafkaException, new Object[]{"can't init KafkaBroadcastConsumer"});
        }
    }

    @Override // kd.bos.mq.broadcast.Broadcast
    public void broadcastMessage(byte[] bArr) {
        if (broadcastDisable()) {
            return;
        }
        try {
            KafkaInfo kafkaInfo = KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(BROADCAST_REGION));
            ProducerFactory.getProducer(BROADCAST_REGION, BROADCAST_QUEUE, kafkaInfo).send(new ProducerRecord(getTopicName(kafkaInfo.getVhost(), BROADCAST_QUEUE), bArr));
        } catch (Exception e) {
            logger.error(e);
            throw new KDException(e, BosErrorCode.kafkaException, new Object[]{"boradcastMessage can't send" + e.getMessage()});
        }
    }

    @Override // kd.bos.mq.broadcast.Broadcast
    public void broadcastMessage(String str, byte[] bArr) {
        if (broadcastDisable()) {
            return;
        }
        if (!Configure.getSupportBroadcastAppids().contains(str)) {
            throw new KDException(BosErrorCode.mqException, new Object[]{"app" + str + " not support broadcast"});
        }
        try {
            if (Instance.isAppSplit()) {
                String appRegion = getAppRegion(str);
                KafkaInfo kafkaInfo = KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(appRegion));
                ProducerFactory.getProducer(appRegion, getBroadcastQueue(str), kafkaInfo).send(new ProducerRecord(getTopicName(kafkaInfo.getVhost(), getBroadcastQueue(str)), bArr));
            } else {
                KafkaInfo kafkaInfo2 = KafkamqFactory.getKafkaInfo(ProducerFactory.getRegionServerKey(BROADCAST_REGION));
                ProducerFactory.getProducer(BROADCAST_REGION, BROADCAST_QUEUE, kafkaInfo2).send(new ProducerRecord(getTopicName(kafkaInfo2.getVhost(), BROADCAST_QUEUE), bArr));
            }
        } catch (Exception e) {
            logger.error(e);
            throw new KDException(e, BosErrorCode.kafkaException, new Object[]{"boradcastMessage can't send,appid" + str + ",errorMessage" + e.getMessage()});
        }
    }

    private boolean broadcastDisable() {
        return Boolean.getBoolean("boradcast.disable");
    }
}
