package kd.bos.ext.tmc.task.impl;

import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import kd.bos.context.RequestContext;
import kd.bos.dataentity.OperateOption;
import kd.bos.dataentity.resource.ResManager;
import kd.bos.entity.operate.result.OperationResult;
import kd.bos.ext.tmc.dao.FormDesignDao;
import kd.bos.ext.tmc.enums.ScheduleExecuteStatus;
import kd.bos.ext.tmc.model.ScheduleExceOperInfo;
import kd.bos.ext.tmc.prop.BaseDataProp;
import kd.bos.ext.tmc.task.AbstractScheduleExecute;
import kd.bos.ext.tmc.task.ScheduleExecuteContext;
import kd.bos.ext.tmc.utils.ScheduleExecuteUtil;
import kd.bos.orm.query.QFilter;
import kd.bos.servicehelper.QueryServiceHelper;
import kd.bos.servicehelper.operation.OperationServiceHelper;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;

/* loaded from: input_file:kd/bos/ext/tmc/task/impl/ScheduleOperationExecute.class */
public class ScheduleOperationExecute extends AbstractScheduleExecute {
    private static final int THREAD_TOTAL = 20;
    private static final int THREAD_DATA_LIMIT = 1000;
    private static final int THREAD_DATA_LIMIT_TOTAL = 10000;
    private final Object syncObj;
    private int processThreadCount;
    private static final ThreadPool threadPoolIntellPlan = ThreadPools.newFixedThreadPool("tmc/fcs/scheduleOperation", 5);

    public ScheduleOperationExecute(ScheduleExecuteContext scheduleExecuteContext) {
        super(scheduleExecuteContext);
        this.syncObj = new Object();
        this.processThreadCount = 0;
    }

    @Override // kd.bos.ext.tmc.task.AbstractScheduleExecute
    protected void doExecute() {
        try {
            ScheduleExceOperInfo exceOperInfo = this.ctx.getSchemeExecInfo().getExceOperInfo();
            Long schemaId = exceOperInfo.getSchemaId();
            Date execstartdate = this.ctx.getExecstartdate();
            long sumLogId = this.ctx.getSumLogId();
            logger.info(schemaId + "开始获取目标数据...");
            List<QFilter> exceDataCollectionFilter = getExceDataCollectionFilter();
            logger.info("过滤条件:" + exceDataCollectionFilter);
            Set set = (Set) QueryServiceHelper.queryPrimaryKeys(exceOperInfo.getBussiness(), (QFilter[]) exceDataCollectionFilter.toArray(new QFilter[0]), BaseDataProp.ID, -1).stream().map(obj -> {
                return (Long) obj;
            }).collect(Collectors.toSet());
            int size = set.size();
            this.ctx.getSchemeExecInfo().appendRecordTC(size);
            try {
                logger.info(schemaId + "目标数据" + size);
                if (set.size() == 0) {
                    logger.info("任务调度主线程退出--批处理数据:0");
                    exceOperInfo.setExecDetails(String.format(ResManager.loadKDString("未获取到符合该方案【%1$s%2$s】操作的前置条件的数据。", "ScheduleOperationExecute_1", "bos-ext-tmc", new Object[0]), FormDesignDao.getDesignFormName(exceOperInfo.getBussiness()), exceOperInfo.getOperName()));
                    exceOperInfo.setExecuteStatus(ScheduleExecuteStatus.FAIL);
                    return;
                }
                if (size > THREAD_DATA_LIMIT_TOTAL) {
                    ArrayList arrayList = new ArrayList();
                    synchronized (this.syncObj) {
                        ArrayList arrayList2 = new ArrayList(set);
                        int ceil = (int) Math.ceil((size * 1.0d) / 1000.0d);
                        for (int i = 0; i < ceil; i++) {
                            if (this.processThreadCount >= THREAD_TOTAL) {
                                logger.info("任务调度主线程等待");
                                this.syncObj.wait();
                            }
                            List<Long> subList = THREAD_DATA_LIMIT * (i + 1) < size ? arrayList2.subList(THREAD_DATA_LIMIT * i, THREAD_DATA_LIMIT * (i + 1)) : arrayList2.subList(THREAD_DATA_LIMIT * i, size);
                            logger.info("1000一批任务调度主线程开始分发子线程");
                            threadCountAdd();
                            arrayList.add(mutiThreadExecOperation(schemaId, size, execstartdate, exceOperInfo, this, subList, getBatchSize(exceOperInfo), this.syncObj, Long.valueOf(sumLogId)));
                            if (ScheduleExecuteUtil.isStopExcute(this.ctx.getSchemeExecInfo())) {
                                break;
                            }
                        }
                    }
                    Iterator it = arrayList.iterator();
                    while (it.hasNext()) {
                        if (ScheduleExecuteStatus.FAIL == ((ScheduleExecuteStatus) ((Future) it.next()).get())) {
                            this.ctx.getSchemeExecInfo().getExceOperInfo().setExecuteStatus(ScheduleExecuteStatus.FAIL);
                        }
                    }
                } else {
                    ScheduleExecuteStatus exeOperation = exeOperation(new ArrayList(set), schemaId.longValue(), size, execstartdate, exceOperInfo, getBatchSize(exceOperInfo));
                    this.ctx.getSchemeExecInfo().getExceOperInfo().setExecuteStatus(exeOperation);
                    this.ctx.getSchemeExecInfo().setExecuteStatus(exeOperation);
                }
                logger.info("任务调度主线程运行完成");
            } catch (Exception e) {
                throw new Exception(e);
            }
        } catch (Exception e2) {
            logger.error(e2.getMessage(), e2);
        }
    }

    @Override // kd.bos.ext.tmc.task.AbstractScheduleExecute
    protected OperationResult invokeOperation(Object[] objArr, OperateOption operateOption) {
        ScheduleExceOperInfo exceOperInfo = this.ctx.getSchemeExecInfo().getExceOperInfo();
        return OperationServiceHelper.executeOperate(exceOperInfo.getOper(), exceOperInfo.getBussiness(), objArr, operateOption);
    }

    private Future<ScheduleExecuteStatus> mutiThreadExecOperation(Long l, int i, Date date, ScheduleExceOperInfo scheduleExceOperInfo, ScheduleOperationExecute scheduleOperationExecute, List<Long> list, int i2, Object obj, Long l2) {
        return threadPoolIntellPlan.submit(new Callable(l, i, date, scheduleExceOperInfo, list, i2, scheduleOperationExecute, obj, l2) { // from class: kd.bos.ext.tmc.task.impl.ScheduleOperationExecute.1OperationRunnable
            private Long scheduleSchemaId;
            private int count;
            private Date execstartdate;
            private ScheduleExceOperInfo exceOperInfo;
            private List<Long> newList;
            private int pointsDataLimit;
            private ScheduleOperationExecute execPlan;
            private final Object syncObj;
            private Long sumLogId;

            {
                this.scheduleSchemaId = l;
                this.count = i;
                this.execstartdate = date;
                this.exceOperInfo = scheduleExceOperInfo;
                this.newList = list;
                this.pointsDataLimit = i2;
                this.execPlan = scheduleOperationExecute;
                this.syncObj = obj;
                this.sumLogId = l2;
            }

            @Override // java.util.concurrent.Callable
            public ScheduleExecuteStatus call() {
                ScheduleExecuteStatus scheduleExecuteStatus = ScheduleExecuteStatus.SUCCESS;
                try {
                    try {
                        RequestContext.get().setTraceId(ScheduleOperationExecute.this.ctx.getTraceId());
                        ScheduleOperationExecute.logger.info("任务调度进入单个线程调度:{};traceId:{}", this.scheduleSchemaId, ScheduleOperationExecute.this.ctx.getTraceId());
                        long currentTimeMillis = System.currentTimeMillis();
                        scheduleExecuteStatus = ScheduleOperationExecute.this.exeOperation(this.newList, this.scheduleSchemaId.longValue(), this.count, this.execstartdate, this.exceOperInfo, this.pointsDataLimit);
                        ScheduleOperationExecute.logger.info("任务调度进行的线程数:{}", Integer.valueOf(this.execPlan.processThreadCount));
                        ScheduleOperationExecute.logger.info("任务调度完成单个线程调度: {};耗时:{}ms", this.scheduleSchemaId, Long.valueOf(System.currentTimeMillis() - currentTimeMillis));
                        this.execPlan.threadCountSub();
                        if (this.execPlan.processThreadCount == 10) {
                            synchronized (this.syncObj) {
                                ScheduleOperationExecute.logger.info("任务调度主线程唤醒");
                                this.syncObj.notifyAll();
                            }
                        }
                    } catch (Exception e) {
                        ScheduleOperationExecute.logger.error("任务调度单个线程发生报错:" + e);
                        this.execPlan.threadCountSub();
                        if (this.execPlan.processThreadCount == 10) {
                            synchronized (this.syncObj) {
                                ScheduleOperationExecute.logger.info("任务调度主线程唤醒");
                                this.syncObj.notifyAll();
                            }
                        }
                    }
                    return scheduleExecuteStatus;
                } catch (Throwable th) {
                    this.execPlan.threadCountSub();
                    if (this.execPlan.processThreadCount == 10) {
                        synchronized (this.syncObj) {
                            ScheduleOperationExecute.logger.info("任务调度主线程唤醒");
                            this.syncObj.notifyAll();
                        }
                    }
                    throw th;
                }
            }
        });
    }

    protected ScheduleExecuteStatus exeOperation(List<Long> list, long j, int i, Date date, ScheduleExceOperInfo scheduleExceOperInfo, int i2) {
        ScheduleExecuteStatus scheduleExecuteStatus = ScheduleExecuteStatus.SUCCESS;
        StringBuffer stringBuffer = new StringBuffer();
        List<Long> list2 = null;
        logger.info("批量执行：scheduleSchemaId：{}, batchSize: {}", Long.valueOf(j), Integer.valueOf(i2));
        HashSet hashSet = new HashSet(3);
        for (int i3 = 0; i3 < list.size(); i3++) {
            list2 = (List) Optional.ofNullable(list2).orElseGet(ArrayList::new);
            list2.add(list.get(i3));
            if (list2.size() == i2 || i3 == list.size() - 1) {
                scheduleExecuteStatus = startExecOperation(j, i, date, stringBuffer, list2, false);
                list2 = null;
                hashSet.add(scheduleExecuteStatus);
            }
        }
        if (hashSet.contains(ScheduleExecuteStatus.STOP)) {
            scheduleExecuteStatus = ScheduleExecuteStatus.STOP;
        } else if (hashSet.contains(ScheduleExecuteStatus.FAIL)) {
            scheduleExecuteStatus = ScheduleExecuteStatus.FAIL;
        }
        scheduleExceOperInfo.setExecDetails(stringBuffer.toString());
        return scheduleExecuteStatus;
    }

    public synchronized void threadCountAdd() {
        this.processThreadCount++;
    }

    public synchronized void threadCountSub() {
        this.processThreadCount--;
    }

    private int getBatchSize(ScheduleExceOperInfo scheduleExceOperInfo) {
        if (scheduleExceOperInfo.isSingle()) {
            return 1;
        }
        return scheduleExceOperInfo.getEachbatchsize();
    }
}
