package kd.bos.eye.api.mq.kafka.handler;

import java.lang.reflect.Field;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.eye.api.mq.MqMgr;
import kd.bos.eye.api.mq.kafka.vo.KafkaConsumerDataVO;
import kd.bos.eye.api.mq.kafka.vo.KafkaListDataVO;
import kd.bos.eye.api.mq.kafka.vo.KafkaStatusDataVO;
import kd.bos.eye.api.mq.rabbit.ConfigPicker;
import kd.bos.eye.api.mq.support.constants.MqEyeConstants;
import kd.bos.eye.api.mq.support.vo.MqConsumerVO;
import kd.bos.eye.api.mq.support.vo.MqEyeMeta;
import kd.bos.eye.api.mq.support.vo.MqFieldVO;
import kd.bos.eye.api.mq.support.vo.MqListVO;
import kd.bos.eye.api.mq.support.vo.MqStatusVO;
import kd.bos.kafka.KafkamqFactory;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.kafka.KafkaConfig;
import kd.bos.mq.kafka.KafkaMQTopicUtil;
import kd.bos.util.StringUtils;
import kd.bos.util.resource.Resources;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.ConsumerGroupDescription;
import org.apache.kafka.clients.admin.ConsumerGroupListing;
import org.apache.kafka.clients.admin.MemberDescription;
import org.apache.kafka.clients.admin.TopicDescription;
import org.apache.kafka.clients.admin.TopicListing;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.KafkaFuture;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.TopicPartitionInfo;

/* loaded from: input_file:kd/bos/eye/api/mq/kafka/handler/KafkaMgrHandler.class */
public class KafkaMgrHandler implements MqMgr {
    private static final Log LOGGER = LogFactory.getLog(KafkaMgrHandler.class);
    private static final String SYSTEM_GROUPID = "kafka-eye-system-group";
    private AdminClient ADMIN_CLIENT;
    private String resourceIdPrefix = "RocketmqMgrHandler_";
    private String systemType = "bos-eye-api-enterprise";
    private Map<String, String> TOPIC_GROUPID_MAP = new HashMap();
    private Map<String, Integer> TOPIC_PARTITIONS_MAP = new HashMap();

    @Override // kd.bos.eye.api.mq.MqMgr
    public MqEyeMeta getMqMeta() {
        MqEyeMeta mqEyeMeta = new MqEyeMeta();
        mqEyeMeta.setMqType("kafka");
        mqEyeMeta.setApiNewVersion(true);
        mqEyeMeta.setApiPage(false);
        mqEyeMeta.setQueueFields(Collections.singletonList(new MqFieldVO(MqEyeConstants.FIELDS_TOPIC, Resources.getString("主题", this.resourceIdPrefix + "3", this.systemType, new Object[0]))));
        mqEyeMeta.setOperations(Arrays.asList(new MqFieldVO("status", Resources.getString("状态", this.resourceIdPrefix + "1", this.systemType, new Object[0])), new MqFieldVO(MqEyeConstants.OPERATIONS_CONSUMERS, Resources.getString("消费者", this.resourceIdPrefix + "2", this.systemType, new Object[0]))));
        return mqEyeMeta;
    }

    private AdminClient getMQAdmin() {
        try {
            if (this.ADMIN_CLIENT != null) {
                return this.ADMIN_CLIENT;
            }
            Field declaredField = KafkaMQTopicUtil.class.getDeclaredField("ADMIN_CLIENT");
            declaredField.setAccessible(true);
            this.ADMIN_CLIENT = (AdminClient) declaredField.get(null);
            return this.ADMIN_CLIENT;
        } catch (Exception e) {
            LOGGER.error("error when KafkaMgrHandler getMQAdmin:", e);
            throw new KDException(BosErrorCode.kafkaException, "error when KafkaMgrHandler getMQAdmin", e);
        }
    }

    @Override // kd.bos.eye.api.mq.MqMgr
    public MqListVO queueList(Map<String, String> map) {
        MqListVO mqListVO = new MqListVO();
        if (getMQAdmin() == null) {
            return mqListVO;
        }
        try {
            String str = map.get(MqEyeConstants.FIELDS_NAME);
            Collection<TopicListing> collection = (Collection) this.ADMIN_CLIENT.listTopics().listings().get();
            ArrayList arrayList = new ArrayList(collection.size());
            mqListVO.setItems(arrayList);
            ConfigPicker configPicker = new ConfigPicker();
            if (StringUtils.isNotEmpty(str)) {
                for (TopicListing topicListing : collection) {
                    if (topicListing.name().startsWith(configPicker.getVhost()) && topicListing.name().contains(str)) {
                        arrayList.add(new KafkaListDataVO(topicListing.name()));
                    }
                }
            } else {
                for (TopicListing topicListing2 : collection) {
                    if (topicListing2.name().startsWith(configPicker.getVhost())) {
                        arrayList.add(new KafkaListDataVO(topicListing2.name()));
                    }
                }
            }
            mqListVO.setTotal_count(collection.size());
            int size = arrayList.size();
            mqListVO.setFiltered_count(size);
            int parseInt = Integer.parseInt(map.get("page_size"));
            mqListVO.setPage_size(parseInt);
            mqListVO.setPage_count((size / parseInt) + (size % parseInt == 0 ? 0 : 1));
            mqListVO.setItem_count(size);
            return mqListVO;
        } catch (Exception e) {
            LOGGER.error("error when KafkaMgrHandler queueList:", e);
            throw new KDException(BosErrorCode.kafkaException, "error when KafkaMgrHandler queueList", e);
        }
    }

    @Override // kd.bos.eye.api.mq.MqMgr
    public MqStatusVO status(String str) {
        MqStatusVO mqStatusVO = new MqStatusVO(str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_TOPIC, Resources.getString("主题", this.resourceIdPrefix + "3", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_PARTITION, Resources.getString("分区", this.resourceIdPrefix + "4", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_LOGSIZE, Resources.getString("消息总数", this.resourceIdPrefix + "5", this.systemType, new Object[0])));
        mqStatusVO.setStatusFields(arrayList);
        if (getMQAdmin() == null) {
            return mqStatusVO;
        }
        KafkaConsumer<String, byte[]> kafkaConsumer = null;
        try {
            try {
                Map values = this.ADMIN_CLIENT.describeTopics(Collections.singletonList(str)).values();
                ArrayList arrayList2 = new ArrayList();
                mqStatusVO.setItems(arrayList2);
                kafkaConsumer = getKafkaConsumer();
                Iterator it = values.entrySet().iterator();
                while (it.hasNext()) {
                    List<TopicPartitionInfo> partitions = ((TopicDescription) ((KafkaFuture) ((Map.Entry) it.next()).getValue()).get()).partitions();
                    this.TOPIC_PARTITIONS_MAP.putIfAbsent(str, Integer.valueOf(partitions.size()));
                    for (TopicPartitionInfo topicPartitionInfo : partitions) {
                        KafkaStatusDataVO kafkaStatusDataVO = new KafkaStatusDataVO();
                        kafkaStatusDataVO.setTopic(str);
                        int partition = topicPartitionInfo.partition();
                        kafkaStatusDataVO.setPartition(partition);
                        TopicPartition topicPartition = new TopicPartition(str, partition);
                        kafkaConsumer.assign(Collections.singleton(topicPartition));
                        kafkaStatusDataVO.setLogSize(((Long) kafkaConsumer.endOffsets(Collections.singleton(topicPartition), Duration.ofMillis(3000L)).get(topicPartition)).longValue());
                        arrayList2.add(kafkaStatusDataVO);
                    }
                }
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                return mqStatusVO;
            } catch (Exception e) {
                LOGGER.error("error when KafkaMgrHandler status:", e);
                throw new KDException(BosErrorCode.kafkaException, "error when KafkaMgrHandler status", e);
            }
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }

    @Override // kd.bos.eye.api.mq.MqMgr
    public MqConsumerVO consumers(String str) {
        MqConsumerVO mqConsumerVO = new MqConsumerVO(str);
        ArrayList arrayList = new ArrayList();
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_SUBSCRIPTIONGROUP, Resources.getString("订阅组", this.resourceIdPrefix + "6", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_PARTITION, Resources.getString("分区", this.resourceIdPrefix + "4", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_BROKEROFFSET, Resources.getString("代理者位点", this.resourceIdPrefix + "7", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_CONSUMEROFFSET, Resources.getString("消费者位点", this.resourceIdPrefix + "8", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_DIFFTOTAL, Resources.getString("差值", this.resourceIdPrefix + "9", this.systemType, new Object[0])));
        arrayList.add(new MqFieldVO(MqEyeConstants.FIELDS_GROUPACTIVETHREADS, Resources.getString("组内活跃线程数", this.resourceIdPrefix + "10", this.systemType, new Object[0])));
        mqConsumerVO.setConsumersFields(arrayList);
        if (getMQAdmin() == null) {
            return mqConsumerVO;
        }
        KafkaConsumer<String, byte[]> kafkaConsumer = null;
        try {
            try {
                ArrayList arrayList2 = new ArrayList(5);
                mqConsumerVO.setItems(arrayList2);
                String groupIdByTopic = getGroupIdByTopic(str);
                Collection members = ((ConsumerGroupDescription) ((KafkaFuture) this.ADMIN_CLIENT.describeConsumerGroups(Collections.singletonList(groupIdByTopic)).describedGroups().get(groupIdByTopic)).get()).members();
                kafkaConsumer = getKafkaConsumer();
                Map map = (Map) this.ADMIN_CLIENT.listConsumerGroupOffsets(groupIdByTopic).partitionsToOffsetAndMetadata().get();
                HashMap hashMap = new HashMap(map.size());
                for (Map.Entry entry : map.entrySet()) {
                    TopicPartition topicPartition = (TopicPartition) entry.getKey();
                    if (topicPartition.topic().equals(str)) {
                        OffsetAndMetadata offsetAndMetadata = (OffsetAndMetadata) entry.getValue();
                        KafkaConsumerDataVO kafkaConsumerDataVO = new KafkaConsumerDataVO();
                        kafkaConsumerDataVO.setSubscriptionGroup(groupIdByTopic);
                        kafkaConsumerDataVO.setPartition(topicPartition.partition());
                        TopicPartition topicPartition2 = new TopicPartition(str, topicPartition.partition());
                        kafkaConsumer.assign(Collections.singleton(topicPartition2));
                        long longValue = ((Long) kafkaConsumer.endOffsets(Collections.singleton(topicPartition2), Duration.ofMillis(3000L)).get(topicPartition2)).longValue();
                        kafkaConsumerDataVO.setBrokerOffset(longValue);
                        kafkaConsumerDataVO.setConsumerOffset(offsetAndMetadata.offset());
                        kafkaConsumerDataVO.setDiffTotal(longValue - offsetAndMetadata.offset());
                        kafkaConsumerDataVO.setGroupActiveThreads(members.size());
                        hashMap.put(Integer.valueOf(topicPartition.partition()), kafkaConsumerDataVO);
                    }
                }
                int partitionsByTopic = getPartitionsByTopic(str);
                for (int i = 0; i < partitionsByTopic; i++) {
                    KafkaConsumerDataVO kafkaConsumerDataVO2 = (KafkaConsumerDataVO) hashMap.get(Integer.valueOf(i));
                    if (kafkaConsumerDataVO2 == null) {
                        kafkaConsumerDataVO2 = new KafkaConsumerDataVO();
                        kafkaConsumerDataVO2.setSubscriptionGroup(groupIdByTopic);
                        kafkaConsumerDataVO2.setPartition(i);
                        TopicPartition topicPartition3 = new TopicPartition(str, i);
                        kafkaConsumer.assign(Collections.singleton(topicPartition3));
                        long longValue2 = ((Long) kafkaConsumer.endOffsets(Collections.singleton(topicPartition3), Duration.ofMillis(3000L)).get(topicPartition3)).longValue();
                        kafkaConsumerDataVO2.setBrokerOffset(longValue2);
                        kafkaConsumerDataVO2.setConsumerOffset(0L);
                        kafkaConsumerDataVO2.setDiffTotal(longValue2);
                        kafkaConsumerDataVO2.setGroupActiveThreads(members.size());
                    }
                    arrayList2.add(kafkaConsumerDataVO2);
                }
                if (kafkaConsumer != null) {
                    kafkaConsumer.close();
                }
                return mqConsumerVO;
            } catch (Exception e) {
                LOGGER.error("error when KafkaMgrHandler consumers:", e);
                throw new KDException(BosErrorCode.kafkaException, "error when KafkaMgrHandler consumers", e);
            }
        } catch (Throwable th) {
            if (kafkaConsumer != null) {
                kafkaConsumer.close();
            }
            throw th;
        }
    }

    private KafkaConsumer<String, byte[]> getKafkaConsumer() {
        Properties consumerConfig = KafkaConfig.getConsumerConfig(KafkamqFactory.getKafkaInfo("mq.server"));
        consumerConfig.put("group.id", SYSTEM_GROUPID);
        return new KafkaConsumer<>(consumerConfig);
    }

    private String getGroupIdByTopic(String str) {
        if (this.TOPIC_GROUPID_MAP.containsKey(str)) {
            return this.TOPIC_GROUPID_MAP.get(str);
        }
        String str2 = "";
        boolean z = false;
        try {
            Iterator it = ((Collection) this.ADMIN_CLIENT.listConsumerGroups().all().get()).iterator();
            while (it.hasNext()) {
                String groupId = ((ConsumerGroupListing) it.next()).groupId();
                Iterator it2 = ((ConsumerGroupDescription) ((KafkaFuture) this.ADMIN_CLIENT.describeConsumerGroups(Collections.singletonList(groupId)).describedGroups().get(groupId)).get()).members().iterator();
                while (it2.hasNext()) {
                    Iterator it3 = ((MemberDescription) it2.next()).assignment().topicPartitions().iterator();
                    while (true) {
                        if (!it3.hasNext()) {
                            break;
                        }
                        if (((TopicPartition) it3.next()).topic().equals(str)) {
                            str2 = groupId;
                            z = true;
                            this.TOPIC_GROUPID_MAP.put(str, str2);
                            break;
                        }
                    }
                    if (z) {
                        break;
                    }
                }
                if (z) {
                    break;
                }
            }
            return str2;
        } catch (Exception e) {
            LOGGER.error("error when KafkaMgrHandler getGroupIdByTopic:", e);
            throw new KDException(BosErrorCode.kafkaException, "error when KafkaMgrHandler getGroupIdByTopic", e);
        }
    }

    private int getPartitionsByTopic(String str) {
        if (this.TOPIC_PARTITIONS_MAP.containsKey(str)) {
            return this.TOPIC_PARTITIONS_MAP.get(str).intValue();
        }
        int i = 0;
        try {
            Iterator it = this.ADMIN_CLIENT.describeTopics(Collections.singletonList(str)).values().entrySet().iterator();
            while (it.hasNext()) {
                List partitions = ((TopicDescription) ((KafkaFuture) ((Map.Entry) it.next()).getValue()).get()).partitions();
                this.TOPIC_PARTITIONS_MAP.putIfAbsent(str, Integer.valueOf(partitions.size()));
                i = partitions.size();
            }
            return i;
        } catch (Exception e) {
            LOGGER.error("error when KafkaMgrHandler getPartitionsByTopic:", e);
            throw new KDException(BosErrorCode.kafkaException, "error when KafkaMgrHandler getPartitionsByTopic", e);
        }
    }
}
