package kd.bos.dts.init.async;

import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.LockSupport;
import kd.bos.dts.define.DestinationRuleConfig;
import kd.bos.dts.exception.ExceptionLogger;
import kd.bos.dts.init.AbstractRowGenerator;
import kd.bos.dts.init.QueryGenRow;
import kd.bos.dts.latch.BatchLatch;
import kd.bos.dts.latch.BatchLatchFactory;
import kd.bos.dts.log.DtsStatusReporterFactory;
import kd.bos.dts.log.DtsStatusType;
import kd.bos.dts.oplog.Oplog;
import kd.bos.dts.oplog.Status;
import kd.bos.dts.retry.Retry;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.orm.datasync.DestinationTransRule;
import kd.bos.orm.datasync.OperationType;

/* loaded from: input_file:kd/bos/dts/init/async/AsyncInitializeImportConsumer.class */
public class AsyncInitializeImportConsumer implements MessageConsumer {
    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) {
        Map map = (Map) obj;
        String str2 = (String) map.get(AsyncInitializeImport.BATCHID);
        List<AbstractRowGenerator> list = (List) map.get(AsyncInitializeImport.ROWGENERATORS);
        String entityNumber = getEntityNumber(list);
        DestinationTransRule destinationTransRule = (DestinationTransRule) map.get(AsyncInitializeImport.DESTINATIONTYPE);
        DestinationRuleConfig destinationRuleConfig = DestinationRuleConfig.get(entityNumber, destinationTransRule);
        String str3 = (String) map.get(AsyncInitializeImport.ENTITYKEY);
        int intValue = ((Integer) map.get(AsyncInitializeImport.BATCHCOUNT)).intValue();
        BatchLatch batchLatch = BatchLatchFactory.get(str3);
        if (!batchLatch.existsPorduceBatchId(str2)) {
            int i = 1;
            while (!batchLatch.existsPorduceBatchId(str2)) {
                LockSupport.parkNanos(100000000L);
                i++;
                if (i > 200) {
                    break;
                }
            }
            if (i > 200) {
                return;
            }
        }
        if (batchLatch.existsConsumeBatchId(str2)) {
            return;
        }
        try {
            if (batchLatch.existsPreConsumeBatchId(str2)) {
                Iterator<AbstractRowGenerator> it = list.iterator();
                while (true) {
                    if (!it.hasNext()) {
                        break;
                    }
                    AbstractRowGenerator next = it.next();
                    if (next instanceof QueryGenRow) {
                        QueryGenRow queryGenRow = (QueryGenRow) next;
                        Retry.get().retry(destinationTransRule, queryGenRow.getEntityNumber(), queryGenRow.getPKList());
                        break;
                    }
                }
            } else {
                batchLatch.preCountConsume(str2, intValue);
                AsyncInitializeImport.get().apply(list, destinationTransRule, str3, intValue);
            }
            callend(messageAcker, str, batchLatch, str2, intValue, str3, destinationTransRule, destinationRuleConfig);
        } catch (Exception e) {
            messageAcker.deny(str);
            Oplog.get().error(destinationTransRule.getType().getName() + "-" + destinationTransRule.getRegion(), "async init import", str3, intValue, ExceptionLogger.getStack(e));
            DtsStatusReporterFactory.get().confInitCountReportError(e, destinationRuleConfig, intValue, "async init import");
        }
    }

    private String getEntityNumber(List<AbstractRowGenerator> list) {
        for (AbstractRowGenerator abstractRowGenerator : list) {
            if (abstractRowGenerator != null) {
                return ((QueryGenRow) abstractRowGenerator).getEntityNumber();
            }
        }
        return null;
    }

    private void callend(MessageAcker messageAcker, String str, BatchLatch batchLatch, String str2, int i, String str3, DestinationTransRule destinationTransRule, DestinationRuleConfig destinationRuleConfig) {
        long countConsume = batchLatch.countConsume(str2, i);
        messageAcker.ack(str);
        batchLatch.clear(str2);
        boolean isFinish = batchLatch.isFinish();
        if (isFinish) {
            batchLatch.clear();
        }
        String name = OperationType.INSERT.getName();
        Oplog.get().recordInitImport(destinationTransRule.getType().getName() + "-" + destinationTransRule.getRegion(), name, str3, (int) countConsume, Status.have_write_initdata + ";by asycn importer");
        DtsStatusReporterFactory.get().confInitReport(destinationRuleConfig, name, DtsStatusType.CONFIG_DOINIT_COUNT_ASYNC);
        if (isFinish) {
            Oplog.get().recordInitImport(destinationTransRule.getType().getName() + "-" + destinationTransRule.getRegion(), name, str3, (int) countConsume, Status.end_write_task + ";by asycn importer");
            DtsStatusReporterFactory.get().confInitReport(destinationRuleConfig, name, DtsStatusType.CONFIG_FINISH_INIT_ASYNC);
        }
    }
}
