package kd.bos.dts;

import java.util.Map;
import kd.bos.dts.consume.Consumer;
import kd.bos.dts.consume.ConsumerFactory;
import kd.bos.dts.define.DestinationRuleConfig;
import kd.bos.dts.exception.ExceptionLogger;
import kd.bos.dts.ksql.DataSqlSyncValue;
import kd.bos.dts.log.DtsStatusReporterFactory;
import kd.bos.dts.log.DtsStatusType;
import kd.bos.dts.oplog.Oplog;
import kd.bos.mq.MessageAcker;
import kd.bos.mq.MessageConsumer;
import kd.bos.orm.datasync.DataSyncValue;
import kd.bos.orm.datasync.DestinationTransRule;

/* loaded from: input_file:kd/bos/dts/SyncDataConsumer.class */
public class SyncDataConsumer implements MessageConsumer {
    private Consumer consumer = ConsumerFactory.getConsumer();

    public void onMessage(Object obj, String str, boolean z, MessageAcker messageAcker) {
        Map<String, Object> map = (Map) obj;
        String str2 = (String) map.get("optype");
        DestinationTransRule destinationTransRule = (DestinationTransRule) map.get("destination");
        DataSyncValue dataSyncValue = (DataSyncValue) map.get("datasyncvalue");
        DestinationRuleConfig destinationRuleConfig = DestinationRuleConfig.get(dataSyncValue.getEntityNumber(), destinationTransRule);
        try {
            this.consumer.consum(map);
            DtsStatusReporterFactory.get().realtimeReport(dataSyncValue.getGid(), destinationRuleConfig, (dataSyncValue instanceof DataSqlSyncValue ? "Sql " : Constant.EMPTY_STRING) + str2, dataSyncValue.getCount(), DtsStatusType.DATA_SYNCED, System.currentTimeMillis() - dataSyncValue.getTimestap());
            String entityNumber = dataSyncValue.getEntityNumber();
            String businessType = destinationTransRule.getBusinessType();
            StringBuilder sb = new StringBuilder("kd.bos.dts.DtsMsgConsumer#SyncDataConsumer:");
            sb.append(",businessType#").append(businessType);
            sb.append(",entityNumber#").append(entityNumber);
            sb.append(",type#").append(str2);
            DtsUtils.logInfo(sb.toString());
        } catch (Exception e) {
            Oplog.get().error(destinationTransRule.getType().getName() + "-" + destinationTransRule.getRegion(), str2, dataSyncValue.getEntityNumber(), 0, ExceptionLogger.getStack(e));
            DtsStatusReporterFactory.get().realtimeExceptionReport(e, dataSyncValue.getGid(), destinationRuleConfig, str2, dataSyncValue.getCount(), DtsStatusType.DATA_SYNCED_EXCEPTION, System.currentTimeMillis() - dataSyncValue.getTimestap());
            if (!z) {
                messageAcker.deny(str);
            } else {
                messageAcker.discard(str);
                ExceptionLogger.error(getClass(), "dtsmessage consumer error", e);
            }
        }
    }
}
