package kd.bos.mq.dlx;

import com.alibaba.fastjson.JSONObject;
import java.util.ArrayList;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.db.RequestContextInfo;
import kd.bos.dc.utils.AccountUtils;
import kd.bos.elect.ElectFactory;
import kd.bos.elect.Elector;
import kd.bos.elect.ElectorListener;
import kd.bos.mq.rabbit.ExceptionLogger;
import kd.bos.mq.support.Message;
import kd.bos.mq.support.MessageSerde;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/mq/dlx/DLXService.class */
public class DLXService {
    private static final Timer messageScan = new Timer("mq-dlx-messageScanTask");
    private static AtomicBoolean isStarted = new AtomicBoolean();

    /* loaded from: input_file:kd/bos/mq/dlx/DLXService$ScanTask.class */
    private static class ScanTask extends TimerTask {
        private static final String SCAN_SQL = "select top 10 fid,fmessage,fstatus,fregion,fqueue from t_dlx_message where fstatus = 0 and fcount < " + DLXConfig.SEND_ERROR_COUNT;

        private ScanTask() {
        }

        @Override // java.util.TimerTask, java.lang.Runnable
        public void run() {
            ThreadPools.executeOnce("message_scan", () -> {
                AccountUtils.getAllAccountsOfCurrentEnv().forEach(account -> {
                    try {
                        AutoCloseable autoCloseable = new RequestContextInfo(account.getTenantId(), account.getAccountId()).setupThreadRequestContext();
                        Throwable th = null;
                        try {
                            ArrayList<JSONObject> arrayList = new ArrayList(10);
                            DB.query(DBRoute.basedata, SCAN_SQL, resultSet -> {
                                while (resultSet.next()) {
                                    JSONObject jSONObject = new JSONObject();
                                    jSONObject.put("region", resultSet.getString("fregion"));
                                    jSONObject.put("queue", resultSet.getString("fqueue"));
                                    jSONObject.put("fid", Integer.valueOf(resultSet.getInt("fid")));
                                    jSONObject.put("message", resultSet.getBytes("fmessage"));
                                    arrayList.add(jSONObject);
                                }
                                return null;
                            });
                            for (JSONObject jSONObject : arrayList) {
                                try {
                                    Message decode = MessageSerde.get().decode(jSONObject.getBytes("message"));
                                    DLXMesPubFactory.getDLXMessagePublisher().sendMessage(jSONObject.getString("region"), jSONObject.getString("queue"), decode, "", null);
                                } catch (Exception e) {
                                }
                            }
                            if (autoCloseable != null) {
                                if (0 != 0) {
                                    try {
                                        autoCloseable.close();
                                    } catch (Throwable th2) {
                                        th.addSuppressed(th2);
                                    }
                                } else {
                                    autoCloseable.close();
                                }
                            }
                        } finally {
                        }
                    } catch (Exception e2) {
                        ExceptionLogger.log("message scan task error", e2);
                    }
                });
            });
        }
    }

    public static void start() {
        final Elector elector = ElectFactory.getElector("dlx-message");
        elector.registerListener(new ElectorListener() { // from class: kd.bos.mq.dlx.DLXService.1
            public void notifyMaster() {
                if (elector.isMaster() && DLXService.isStarted.compareAndSet(false, true)) {
                    DLXService.messageScan.scheduleAtFixedRate(new ScanTask(), 500000L, 300000L);
                }
            }

            public void notifyLostMaster() {
                if (DLXService.isStarted.compareAndSet(true, false)) {
                    DLXService.messageScan.cancel();
                }
            }
        });
        elector.start();
    }
}
