package kd.bos.xdb.task.service.enablemove;

import java.io.PrintWriter;
import java.io.StringWriter;
import java.sql.SQLException;
import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Future;
import kd.bos.bundle.BosRes;
import kd.bos.db.DB;
import kd.bos.db.DBRoute;
import kd.bos.db.sharding.ShardTaskRuntime;
import kd.bos.db.sharding.ShardingManager;
import kd.bos.xdb.XDBConfig;
import kd.bos.xdb.XDBManagerUtil;
import kd.bos.xdb.entity.ShardProgressEntity;
import kd.bos.xdb.entity.ShardTaskEntity;
import kd.bos.xdb.enums.ShardTaskNodeEnum;
import kd.bos.xdb.enums.ShardTaskStatusEnum;
import kd.bos.xdb.exception.ExceptionUtil;
import kd.bos.xdb.mq.ShardLogPublish;
import kd.bos.xdb.repository.ShardProgressRepository;
import kd.bos.xdb.repository.ShardTaskRepository;
import kd.bos.xdb.service.ActionUtil;
import kd.bos.xdb.service.ShardTaskConfig;
import kd.bos.xdb.service.action.parallel.ShardThreadPool;
import kd.bos.xdb.sharding.config.MainTableConfig;
import kd.bos.xdb.sharding.config.ShardingConfig;
import kd.bos.xdb.tablemanager.TableManager;
import kd.bos.xdb.tablemanager.TableName;
import kd.bos.xdb.task.config.Configuration;
import kd.bos.xdb.task.progress.SubProgress;
import kd.bos.xdb.task.service.ShardingTaskServiceAbst;
import kd.bos.xdb.task.service.enablemove.work.EnableMoveWork;
import kd.bos.xdb.task.service.enablemove.work.EnableMoveWorkRunner;
import kd.bos.xdb.util.Threads;

/* loaded from: input_file:kd/bos/xdb/task/service/enablemove/ShardingDataMoveService.class */
public final class ShardingDataMoveService extends ShardingTaskServiceAbst {
    public ShardingDataMoveService(ShardTaskEntity shardTaskEntity, Configuration configuration) {
        super(shardTaskEntity, configuration, ShardTaskNodeEnum.DATAMOVE);
    }

    @Override // kd.bos.xdb.task.service.ShardingTaskServiceAbst
    public boolean doSharding() throws Exception {
        TableManager tableManager = XDBConfig.getTableManager();
        DBRoute route = this.configuration.getRoute();
        for (ShardingConfig shardingConfig : this.configuration.getShardingConfigs()) {
            if (shardingConfig != this.configuration.getMainShardingConfig()) {
                shardingConfig.getShardingStrategy().ensureTableInited();
            }
        }
        boolean isIndexPK = this.configuration.getMainShardingConfig().isIndexPK();
        TableName of = TableName.of(this.configuration.getMainTable());
        if (isIndexPK && !tableManager.existTable(of.getPKTable())) {
            tableManager.createPKTable(of.getPKTable(), XDBManagerUtil.getPkTypeEnum(this.taskEntity.getEntitynumber()), this.configuration.getMainShardingConfig().getOptions().getDataRowsRange(), this.configuration.getMainShardingConfig().getOptions().getIndexDefines());
        }
        if (ShardTaskConfig.isEnableMovingParallel()) {
            this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "DataMove_1", "并行迁移数据", new Object[0]));
        } else {
            this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "DataMove_2", "串行迁移数据", new Object[0]));
        }
        this.mainProgress.setTotalRecord(this.taskEntity.getTotalRecord());
        this.mainProgress.store(true);
        List<ShardProgressEntity> loadUnexecutedProgressList = ShardProgressRepository.get().loadUnexecutedProgressList(this.taskEntity.getId(), null);
        boolean z = false;
        if (loadUnexecutedProgressList.isEmpty()) {
            if (ShardProgressRepository.get().countProgressUnclosed(this.taskEntity.getId()) > 0) {
                throw ExceptionUtil.wrap(BosRes.get("bos-xdb-manager", "DataMove_3", "存在未完成的并行迁移任务", new Object[0]));
            }
            XDBManagerUtil.logInfo(MessageFormat.format("ShardTaskMovingHandler datamove doDataMoving end,entitynumber:{0}, taskId:{1}, progressCount:{2}", this.taskEntity.getEntitynumber(), Long.valueOf(this.taskEntity.getId()), Integer.valueOf(loadUnexecutedProgressList.size())));
        } else if (!ShardTaskConfig.isEnableMovingParallel()) {
            Iterator<ShardProgressEntity> it = loadUnexecutedProgressList.iterator();
            while (true) {
                if (!it.hasNext()) {
                    break;
                }
                ShardProgressEntity next = it.next();
                if (this.shardTaskRuntime.isTaskPaused(this.configuration.getMainTable())) {
                    z = true;
                    break;
                }
                SubProgress of2 = SubProgress.of(next.getId(), this.taskEntity.getEntitynumber(), next.getProgresssign());
                of2.setMovingTable(next.getShardTable());
                of2.setParentSp(this.mainProgress);
                z = new EnableMoveWork(next, this.taskEntity, this.configuration, of2).doWork();
                if (z) {
                    break;
                }
                this.mainProgress.setExecSql(null);
                this.mainProgress.store(false);
            }
        } else {
            ShardThreadPool shardThreadPool = new ShardThreadPool();
            Throwable th = null;
            try {
                try {
                    shardThreadPool.setTable(this.taskEntity.getEntitynumber());
                    shardThreadPool.setName("XDB-MoveParallelThread-");
                    shardThreadPool.start();
                    ArrayList<Future> arrayList = new ArrayList(loadUnexecutedProgressList.size());
                    for (ShardProgressEntity shardProgressEntity : loadUnexecutedProgressList) {
                        SubProgress of3 = SubProgress.of(shardProgressEntity.getId(), this.taskEntity.getEntitynumber(), shardProgressEntity.getProgresssign());
                        of3.setMovingTable(shardProgressEntity.getShardTable());
                        of3.setParentSp(this.mainProgress);
                        arrayList.add(shardThreadPool.submit(Threads.wrapCallable(new EnableMoveWorkRunner(new EnableMoveWork(shardProgressEntity, this.taskEntity, this.configuration, of3), this.configuration.getRoute(), shardProgressEntity))));
                    }
                    boolean z2 = false;
                    String str = "";
                    for (Future future : arrayList) {
                        if (z2) {
                            try {
                                future.cancel(true);
                            } catch (Throwable th2) {
                                StringWriter stringWriter = new StringWriter();
                                th2.printStackTrace(new PrintWriter(stringWriter));
                                String format = MessageFormat.format("ShardTaskMovingHandler ShardingDataMoveService future.get error,entitynumber:{0}, taskId:{1}, isException:{2}, errorinfo:{3}", this.taskEntity.getEntitynumber(), Long.valueOf(this.taskEntity.getId()), Boolean.valueOf(z2), stringWriter.toString());
                                XDBManagerUtil.logError(format, th2);
                                ShardTaskRuntime.get().setTaskInterruptedCurrentNode(true, this.configuration.getMainTable());
                                ShardLogPublish.get().publishOperationLog(this.taskEntity.getId(), this.taskEntity.getEntitynumber(), format, getProgressType());
                                if (!z2) {
                                    str = stringWriter.toString();
                                    z2 = true;
                                }
                            }
                        } else if (z) {
                            future.get();
                        } else {
                            z = ((Boolean) future.get()).booleanValue();
                        }
                    }
                    if (z2) {
                        shardThreadPool.shutdown();
                        while (!shardThreadPool.isTerminated()) {
                            Thread.sleep(200L);
                        }
                        throw ExceptionUtil.wrap(str);
                    }
                    if (shardThreadPool != null) {
                        if (0 != 0) {
                            try {
                                shardThreadPool.close();
                            } catch (Throwable th3) {
                                th.addSuppressed(th3);
                            }
                        } else {
                            shardThreadPool.close();
                        }
                    }
                } finally {
                }
            } catch (Throwable th4) {
                if (shardThreadPool != null) {
                    if (th != null) {
                        try {
                            shardThreadPool.close();
                        } catch (Throwable th5) {
                            th.addSuppressed(th5);
                        }
                    } else {
                        shardThreadPool.close();
                    }
                }
                throw th4;
            }
        }
        long countProgressUnclosed = ShardProgressRepository.get().countProgressUnclosed(this.taskEntity.getId());
        if (z) {
            if (countProgressUnclosed != 0) {
                ShardTaskRepository.get().setTaskSuspended(this.taskEntity.getId());
                XDBManagerUtil.logInfo(MessageFormat.format("ArchiveTaskHandler ShardingDataMoveService doSharding paused,entitynumber:{0}, taskId:{1}", this.taskEntity.getEntitynumber(), Long.valueOf(this.taskEntity.getId())));
                ShardLogPublish.get().publishOperationLog(this.taskEntity.getId(), this.taskEntity.getEntitynumber(), BosRes.get("bos-xdb-manager", "ShardActionDisableDataMove_0", "任务暂停", new Object[0]), getProgressType());
                return true;
            }
            ShardTaskRepository.get().setNextTaskstatus(this.taskEntity.getId(), ShardTaskStatusEnum.PAUSE, ShardTaskStatusEnum.EXECUTING);
            ShardingManager.get().notifyLimitTaskPaused(false, this.configuration.getMainTable());
        } else if (ShardTaskRuntime.get().isTaskPaused(this.configuration.getMainTable())) {
            ShardTaskRepository.get().setNextTaskstatus(this.taskEntity.getId(), ShardTaskStatusEnum.PAUSE, ShardTaskStatusEnum.EXECUTING);
            ShardingManager.get().notifyLimitTaskPaused(false, this.configuration.getMainTable());
        }
        if (countProgressUnclosed > 0) {
            throw new RuntimeException(BosRes.get("bos-xdb-manager", "DataMove_3", "存在未完成的并行迁移任务", new Object[0]));
        }
        this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardActionEnableDataMove_7", "删除表头中间表", new Object[0]));
        this.mainProgress.store(true);
        String[] movingTable = getMovingTable();
        for (String str2 : movingTable) {
            ActionUtil.dropTable(route, str2);
            tableManager.removeCahce(str2);
        }
        this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardActionEnableDataMove_8", "删除子表中间表", new Object[0]));
        this.mainProgress.store(true);
        Iterator it2 = this.configuration.getMultimap().asMap().entrySet().iterator();
        while (it2.hasNext()) {
            ShardingConfig shardingConfig2 = (ShardingConfig) ((Map.Entry) it2.next()).getKey();
            if (!(shardingConfig2 instanceof MainTableConfig)) {
                for (String str3 : movingTable) {
                    ActionUtil.dropTable(route, TableName.of(shardingConfig2.getTable()).getMovingTable(TableName.of(str3).getMovingIndex()));
                    tableManager.removeCahce(str3);
                }
            }
        }
        if (ShardTaskConfig.isEnableMovingBackup()) {
            for (int movingShardingIndex = ((int) this.mainProgress.getMovingShardingIndex()) + 1; movingShardingIndex < this.configuration.getShardingConfigs().size(); movingShardingIndex++) {
                String originalName = TableName.of(this.configuration.getShardingConfigs().get(movingShardingIndex).getTable()).getOriginalName();
                if (DB.exitsTable(route, originalName)) {
                    if (DB.exitsTable(route, originalName + "$bak")) {
                        ActionUtil.dropTable(route, originalName + "$bak");
                    }
                    ActionUtil.backupTable(route, originalName);
                }
                this.mainProgress.setMovingShardingIndex(movingShardingIndex);
                this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "ShardActionDisableDataMove_1", "备份", new Object[0]) + originalName);
                this.mainProgress.store(true);
            }
        }
        this.mainProgress.setProgressDesc_1(BosRes.get("bos-xdb-manager", "DataMove_4", "数据迁移完成", new Object[0]));
        this.mainProgress.store(true);
        return false;
    }

    private String[] getMovingTable() throws SQLException {
        return getSortTables(XDBConfig.getTableManager().getMovingTable(this.configuration.getMainTable()), "$m");
    }

    private String[] getSortTables(String[] strArr, String str) {
        String upperCase = str.toUpperCase();
        if (strArr.length <= 1) {
            return strArr;
        }
        HashMap hashMap = new HashMap(strArr.length);
        String[] strArr2 = new String[strArr.length];
        long[] jArr = new long[strArr.length];
        int lastIndexOf = strArr[0].toUpperCase().lastIndexOf(upperCase);
        for (int i = 0; i < strArr.length; i++) {
            String str2 = strArr[i];
            long parseLong = Long.parseLong(str2.substring(lastIndexOf + upperCase.length()));
            jArr[i] = parseLong;
            hashMap.put(Long.valueOf(parseLong), str2);
        }
        Arrays.sort(jArr);
        for (int i2 = 0; i2 < jArr.length; i2++) {
            strArr2[i2] = (String) hashMap.get(Long.valueOf(jArr[i2]));
        }
        return strArr2;
    }
}
