package kd.bos.schedule.message.mq;

import java.lang.reflect.InvocationTargetException;
import java.security.SecureRandom;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.dataentity.serialization.SerializationUtils;
import kd.bos.dataentity.utils.StringUtils;
import kd.bos.exception.ErrorCode;
import kd.bos.exception.KDException;
import kd.bos.instance.Instance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import kd.bos.mq.MessagePublisher;
import kd.bos.schedule.api.JobInfo;
import kd.bos.schedule.api.JobNotifyListener;
import kd.bos.schedule.api.MessageInfo;
import kd.bos.schedule.api.MessageSender;
import kd.bos.schedule.api.MessageType;
import kd.bos.schedule.api.ObjectFactory;
import kd.bos.schedule.api.ScheduleMsgInfo;
import kd.bos.schedule.api.TaskResult;

/* loaded from: input_file:kd/bos/schedule/message/mq/MQMessageSender.class */
public class MQMessageSender implements MessageSender {
    protected ObjectFactory objectFactory = null;
    private SecureRandom random = new SecureRandom();
    private Date lastUpdateTime;
    private static Log log = LogFactory.getLog(MQMessageSender.class);
    private static Map<MessageType, MessagePublisher> commonMsgPub = new ConcurrentHashMap();
    private static Map<String, List<MessagePublisher>> appSplitJobMsgPubs = new ConcurrentHashMap();
    private static Map<MessageType, List<MessagePublisher>> commonMsgPubs = new ConcurrentHashMap();

    public ObjectFactory getObjectFactory() {
        return this.objectFactory;
    }

    public void setObjectFactory(ObjectFactory objectFactory) {
        this.objectFactory = objectFactory;
    }

    private boolean cachePublisher() {
        return Boolean.getBoolean("schedule.deprecated.msgsender.cache.enable");
    }

    public void send(MessageInfo messageInfo) throws KDException {
        MessagePublisher messagePublisher = null;
        try {
            String jsonString = SerializationUtils.toJsonString(messageInfo);
            if (messageInfo.getMessageType() == MessageType.BIZJOB || messageInfo.getMessageType() == MessageType.REALTIMEJOB || messageInfo.getMessageType() == MessageType.WorkFlowJOB) {
                JobInfo fetchJobInfo = messageInfo.fetchJobInfo();
                Log log2 = log;
                Object[] objArr = new Object[5];
                objArr[0] = messageInfo;
                objArr[1] = fetchJobInfo == null ? "null" : fetchJobInfo.getAppId();
                objArr[2] = fetchJobInfo == null ? "null" : fetchJobInfo.getId();
                objArr[3] = fetchJobInfo == null ? "null" : fetchJobInfo.getNumber();
                objArr[4] = fetchJobInfo == null ? "null" : fetchJobInfo.getScheduleId();
                log2.info("后台事务-发送消息到MQ ：{},appId={},jobId={},jobNumber={},scheduleId={} ", objArr);
                messagePublisher = scheduleTargetMQ(messageInfo);
                messagePublisher.publish(jsonString);
            } else if (messageInfo.getMessageType() == MessageType.BIZ_TASK_FEEDBACK || messageInfo.getMessageType() == MessageType.WorkFlow_TASK_FEEDBACK) {
                messagePublisher = scheduleTargetMQ(messageInfo);
                messagePublisher.publish(jsonString);
                udpateStorage(messageInfo);
            }
            if (messagePublisher == null || cachePublisher()) {
                return;
            }
            messagePublisher.close();
        } catch (Throwable th) {
            if (0 != 0 && !cachePublisher()) {
                messagePublisher.close();
            }
            throw th;
        }
    }

    private void udpateStorage(MessageInfo messageInfo) {
        TaskResult fectchTaskResult = messageInfo.fectchTaskResult();
        try {
            if (TaskResult.ResultTypeEnum.STATUS == fectchTaskResult.getResultType()) {
                getObjectFactory().getTaskDao().updateStatus(messageInfo.getTaskId(), fectchTaskResult.getStatus());
                if ("COMPLETED".equals(fectchTaskResult.getStatus()) || "FAILED".equals(fectchTaskResult.getStatus()) || "TIMEOUT".equals(fectchTaskResult.getStatus())) {
                    sendNotify(messageInfo);
                }
                if ("BEGIN".equals(fectchTaskResult.getStatus())) {
                    getObjectFactory().getTaskDao().updateRunAt(messageInfo.getTaskId(), messageInfo.getTarget(), messageInfo.getInstanceId(), messageInfo.getTraceId());
                }
            } else if (TaskResult.ResultTypeEnum.PROGRESS == fectchTaskResult.getResultType()) {
                Date date = new Date();
                if (fectchTaskResult.getProgress() >= 100 || this.lastUpdateTime == null || date.getTime() - this.lastUpdateTime.getTime() >= 1000) {
                    this.lastUpdateTime = date;
                    getObjectFactory().getTaskDao().updateProgress(messageInfo.getTaskId(), fectchTaskResult.getProgress(), fectchTaskResult.getDesc(), fectchTaskResult.getCustomData());
                }
            } else if (TaskResult.ResultTypeEnum.CUSTOMDATA == fectchTaskResult.getResultType()) {
                getObjectFactory().getTaskDao().updateCustomData(messageInfo.getTaskId(), fectchTaskResult.getCustomData());
            }
        } catch (Exception e) {
            log.error("后台事务更新存储信息异常: " + String.format(ResManager.loadKDString("后台事务异常,AppName: %1$s,InstanceId: %2$s,taskId: %3$s ,errorInfo: %4$s", "MQMessageSender_0", "bos-schedule-message", new Object[0]), Instance.getAppName(), Instance.getInstanceId(), messageInfo.getTaskId(), e.getMessage()), e);
        }
    }

    private MessagePublisher scheduleTargetMQ(MessageInfo messageInfo) {
        MessagePublisher messagePublisher = null;
        List<MessagePublisher> list = null;
        MessageType messageType = messageInfo.getMessageType();
        if (messageType == MessageType.BIZJOB || messageType == MessageType.REALTIMEJOB || messageType == MessageType.WorkFlowJOB) {
            JobInfo fetchJobInfo = messageInfo.fetchJobInfo();
            ensureJobAppId(fetchJobInfo);
            if (Instance.isAppSplit()) {
                if (cachePublisher()) {
                    list = appSplitJobMsgPubs.get(fetchJobInfo.getAppId() + messageType.name());
                    if (list == null || list.size() < 1) {
                        list = MQHelper.getJobMessagePulisherList(messageInfo);
                        appSplitJobMsgPubs.put(fetchJobInfo.getAppId() + messageType.name(), list);
                    }
                } else {
                    list = MQHelper.getJobMessagePulisherList(messageInfo);
                }
            } else if (cachePublisher()) {
                list = commonMsgPubs.get(messageType);
                if (list == null || list.size() < 1) {
                    list = MQHelper.getJobMessagePulisherList(messageInfo);
                    commonMsgPubs.put(messageType, list);
                }
            } else {
                list = MQHelper.getJobMessagePulisherList(messageInfo);
            }
        } else if (messageType == MessageType.BIZ_TASK_FEEDBACK || messageType == MessageType.WorkFlow_TASK_FEEDBACK) {
            if (cachePublisher()) {
                messagePublisher = commonMsgPub.get(messageType);
                if (messagePublisher == null) {
                    messagePublisher = MQHelper.getTaskStausPulisher();
                    commonMsgPub.put(messageType, messagePublisher);
                }
            } else {
                messagePublisher = MQHelper.getTaskStausPulisher();
            }
        }
        if (list != null) {
            messagePublisher = getRandomPublisher(list);
        }
        if (messagePublisher == null) {
            throw new KDException(new ErrorCode("DISPATCH_JOB_FAILED", ResManager.loadKDString("该类型的消息无法找到对应的发布者:%s", "MQMessageSender_1", "bos-schedule-message", new Object[0])), new Object[]{messageInfo.toString()});
        }
        return messagePublisher;
    }

    private MessagePublisher getRandomPublisher(List<MessagePublisher> list) {
        int size = list.size();
        if (size == 1) {
            return list.get(0);
        }
        if (size < 1) {
            return null;
        }
        return list.get(this.random.nextInt(size));
    }

    private void ensureJobAppId(JobInfo jobInfo) {
        if (StringUtils.isBlank(jobInfo.getAppId())) {
            jobInfo.setAppId("bos");
        }
    }

    private void sendNotify(MessageInfo messageInfo) {
        JobInfo fetchJobInfo = messageInfo.fetchJobInfo();
        ScheduleMsgInfo scheduleMsgInfo = fetchJobInfo.getScheduleMsgInfo();
        boolean z = false;
        boolean z2 = false;
        if (fetchJobInfo != null) {
            z = fetchJobInfo.isFailNotify() || fetchJobInfo.isSuccessNotify() || fetchJobInfo.isOverTime();
        }
        if (scheduleMsgInfo != null) {
            z2 = scheduleMsgInfo.isFailNotify() || scheduleMsgInfo.isSuccessNotify() || scheduleMsgInfo.isTimeOut();
        }
        if (z || z2) {
            jobProcessNotify(messageInfo);
        }
    }

    private void jobProcessNotify(MessageInfo messageInfo) {
        try {
            Class<?> cls = Class.forName("kd.bos.schedule.notify.JobProcessorNotify");
            MessageInfo messageInfo2 = new MessageInfo();
            messageInfo2.setNotifyListener((JobNotifyListener) cls.getConstructor(new Class[0]).newInstance(new Object[0]));
            messageInfo2.send(messageInfo);
        } catch (ClassNotFoundException e) {
            log.error(e.getMessage());
        } catch (IllegalAccessException e2) {
            log.error(e2.getMessage());
        } catch (InstantiationException e3) {
            log.error(e3.getMessage());
        } catch (NoSuchMethodException e4) {
            log.error(e4.getMessage());
        } catch (InvocationTargetException e5) {
            log.error(e5.getMessage());
        }
    }
}
