package kd.bos.mq.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.broadcast.Broadcast;
import kd.bos.mq.broadcast.BroadcastService;
import kd.bos.mq.broadcast.Configure;
import kd.bos.mq.config.ConfigKeys;
import kd.bos.mq.jms.JMSSessionFactory;
import kd.bos.mq.support.QueueManager;

/* loaded from: input_file:kd/bos/mq/rabbit/RabbitBroadcast.class */
public class RabbitBroadcast extends Broadcast {
    protected static final String EXCHANGE_REGION = "broadcast";
    protected static final String EXCHANGE_TYPE = "fanout";
    private static Channel consumerChannel;
    private Channel prodecerChannel;
    protected static final String EXCHANGE_NAME = "exchange_fanout_" + Instance.getClusterName() + System.getProperty(ConfigKeys.CONFIG_DEBUG_QUEUETAG);
    private static final Log logger = LogFactory.getLog(RabbitBroadcast.class);
    private static Map<String, Channel> appConsumersChannels = new HashMap();
    private static Map<String, Channel> appPordecersChannels = new HashMap();

    /* JADX INFO: Access modifiers changed from: private */
    public static synchronized void regist(String str) throws IOException {
        Channel channel = ChannelFactory.getChannel(EXCHANGE_REGION);
        String appExchangeName = getAppExchangeName(str);
        channel.exchangeDeclare(appExchangeName, EXCHANGE_TYPE);
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue, appExchangeName, "");
        channel.basicConsume(queue, false, new RabbitBroadcastConsumer(channel));
        appConsumersChannels.put(str, channel);
    }

    private static final String getAppExchangeName(String str) {
        return EXCHANGE_NAME + JMSSessionFactory.SPLIT + str;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static synchronized void regist() throws IOException {
        Channel channel = ChannelFactory.getChannel(EXCHANGE_REGION);
        channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
        String queue = channel.queueDeclare().getQueue();
        channel.queueBind(queue, EXCHANGE_NAME, "");
        channel.basicConsume(queue, false, new RabbitBroadcastConsumer(channel));
        consumerChannel = channel;
    }

    @Override // kd.bos.mq.broadcast.Broadcast
    public void registerBroadcastConsumer() {
        String[] appIds;
        try {
            regist();
            if (Instance.isAppSplit() && (appIds = Instance.getAppIds()) != null) {
                Set<String> supportBroadcastAppids = Configure.getSupportBroadcastAppids();
                for (String str : appIds) {
                    if (supportBroadcastAppids.contains(str)) {
                        regist(str);
                    }
                }
            }
        } catch (Exception e) {
            throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{"can't init mqchannel"});
        }
    }

    @Override // kd.bos.mq.broadcast.Broadcast
    public void broadcastMessage(byte[] bArr) {
        if (broadcastDisable()) {
            return;
        }
        Channel channel = null;
        try {
            channel = getBoradcastProducerChannel();
            channel.basicPublish(EXCHANGE_NAME, "", (AMQP.BasicProperties) null, bArr);
        } catch (IOException e) {
            if (channel != null) {
                try {
                    channel.getConnection().abort();
                } catch (Exception e2) {
                    logger.error("broadcast Message exception", e);
                    throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{"boradcast message error "});
                }
            }
            throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{"boradcast message error "});
        }
    }

    private boolean broadcastDisable() {
        return Boolean.getBoolean("boradcast.disable");
    }

    private Channel getBoradcastProducerChannel() {
        if (!ChannelFactory.isChannelNeedReBuild(this.prodecerChannel)) {
            return this.prodecerChannel;
        }
        synchronized (BroadcastService.class) {
            if (ChannelFactory.isChannelNeedReBuild(this.prodecerChannel)) {
                try {
                    Channel channel = ChannelFactory.getChannel(EXCHANGE_REGION);
                    if (channel == null) {
                        throw new KDException(BosErrorCode.mqException, new Object[]{"get mqchannel  is null"});
                    }
                    channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
                    this.prodecerChannel = channel;
                } catch (IOException e) {
                    throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{"can't create channel"});
                }
            }
        }
        return this.prodecerChannel;
    }

    @Override // kd.bos.mq.broadcast.Broadcast
    public void broadcastMessage(String str, byte[] bArr) {
        if (broadcastDisable()) {
            return;
        }
        if (!Configure.getSupportBroadcastAppids().contains(str)) {
            throw new KDException(BosErrorCode.mqException, new Object[]{"app " + str + " not support broadcast"});
        }
        try {
            if (Instance.isAppSplit()) {
                getBoradcastProducerChannel(str).basicPublish(getAppExchangeName(str), "", (AMQP.BasicProperties) null, bArr);
            } else {
                getBoradcastProducerChannel().basicPublish(EXCHANGE_NAME, "", (AMQP.BasicProperties) null, bArr);
            }
        } catch (IOException e) {
            try {
                getBoradcastProducerChannel(str).getConnection().abort();
            } catch (Exception e2) {
                logger.error("broadcast Message exception", e2);
            }
            throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{str + " appboradcast message error "});
        }
    }

    private Channel getBoradcastProducerChannel(String str) {
        Channel channel = appPordecersChannels.get(str);
        if (!ChannelFactory.isChannelNeedReBuild(channel)) {
            return channel;
        }
        synchronized (BroadcastService.class) {
            if (ChannelFactory.isChannelNeedReBuild(channel)) {
                try {
                    Channel channel2 = ChannelFactory.getChannel(EXCHANGE_REGION);
                    if (channel2 == null) {
                        throw new KDException(BosErrorCode.mqException, new Object[]{str + " get app mqchannel  is null"});
                    }
                    channel2.exchangeDeclare(getAppExchangeName(str), EXCHANGE_TYPE);
                    channel = channel2;
                    appPordecersChannels.put(str, channel);
                } catch (IOException e) {
                    throw new KDException(e, BosErrorCode.rabbitmqException, new Object[]{str + " can't create channel"});
                }
            }
        }
        return channel;
    }

    static {
        if ("rabbitmq".equals(QueueManager.getMQType(EXCHANGE_REGION))) {
            new Timer("MQConsumerChannelMonitor").scheduleAtFixedRate(new TimerTask() { // from class: kd.bos.mq.rabbit.RabbitBroadcast.1
                @Override // java.util.TimerTask, java.lang.Runnable
                public void run() {
                    if (ChannelFactory.isChannelNeedReBuild(RabbitBroadcast.consumerChannel)) {
                        try {
                            RabbitBroadcast.logger.error("broadcast consumer mqchannel is closed,re registe ");
                            RabbitBroadcast.regist();
                        } catch (Exception e) {
                            RabbitBroadcast.logger.error("registe broadcast consumer error,can't init mqchannel", e);
                        }
                    }
                    if (Instance.isAppSplit()) {
                        RabbitBroadcast.appConsumersChannels.forEach((str, channel) -> {
                            if (ChannelFactory.isChannelNeedReBuild(channel)) {
                                try {
                                    RabbitBroadcast.regist(str);
                                } catch (Exception e2) {
                                    RabbitBroadcast.logger.error("registe app broadcast consumer error,can't init mqchannel,app is " + str, e2);
                                }
                            }
                        });
                    }
                }
            }, 300000L, 60000L);
        }
    }
}
