package com.kingdee.bos.qing.dpp.engine.flink.job.execution;

import com.kingdee.bos.qing.dpp.common.types.DataSinkType;
import com.kingdee.bos.qing.dpp.common.types.JobMonitorType;
import com.kingdee.bos.qing.dpp.common.types.TransformType;
import com.kingdee.bos.qing.dpp.engine.flink.job.QDppJobContext;
import com.kingdee.bos.qing.dpp.engine.flink.job.result.JobStatusResultMonitor;
import com.kingdee.bos.qing.dpp.engine.flink.job.result.JobStatusResultPushHandler;
import com.kingdee.bos.qing.dpp.engine.flink.transform.model.TransformVertex;
import com.kingdee.bos.qing.dpp.engine.flink.transform.sink.SinkTransformationProviderFactory;
import com.kingdee.bos.qing.dpp.engine.optimization.Optimizer;
import com.kingdee.bos.qing.dpp.engine.optimization.plan.OptimizeContext;
import com.kingdee.bos.qing.dpp.engine.optimization.util.GraphUtil;
import com.kingdee.bos.qing.dpp.exception.QDataTransformException;
import com.kingdee.bos.qing.dpp.exception.QDppException;
import com.kingdee.bos.qing.dpp.exception.TableBuildException;
import com.kingdee.bos.qing.dpp.job.model.QDppJobExecuteModel;
import com.kingdee.bos.qing.dpp.job.model.QDppJobResult;
import com.kingdee.bos.qing.dpp.job.model.QDppJobStatus;
import com.kingdee.bos.qing.dpp.model.transform.Transformation;
import com.kingdee.bos.qing.dpp.model.transform.settings.SinkSettings;
import com.kingdee.bos.qing.dpp.utils.DppGlobalScheduleExecutor;
import com.kingdee.bos.qing.util.StackTraceUtil;
import java.util.ArrayList;
import java.util.List;
import org.jgrapht.graph.DefaultEdge;
import org.jgrapht.graph.DirectedAcyclicGraph;
import org.jgrapht.traverse.TopologicalOrderIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/job/execution/TransformJob.class */
class TransformJob {
    private static final Logger log = LoggerFactory.getLogger(TransformJob.class);
    private QDppJobExecuteModel jobExecuteModel;

    public TransformJob(QDppJobExecuteModel qDppJobExecuteModel) {
        this.jobExecuteModel = null;
        this.jobExecuteModel = qDppJobExecuteModel;
    }

    public QDppJobResult executeInEmbed() throws QDppException {
        return execute(ExecuteMode.EMBED);
    }

    public QDppJobResult executeInCluster() throws QDppException {
        return execute(ExecuteMode.CLUSTER);
    }

    private QDppJobResult execute(ExecuteMode executeMode) throws QDppException {
        QDppJobResult execute;
        QDppJobContext createJobContext = createJobContext(executeMode);
        QDppJobResult qDppJobResult = null;
        try {
            try {
                initializeTransformVertex(createJobContext);
                DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph = createJobContext.getDirectedAcyclicGraph();
                log.debug("print dag original: " + GraphUtil.printDag(directedAcyclicGraph));
                new Optimizer().optimize(new OptimizeContext(createJobContext, null));
                log.debug("print dag after optimizer : " + GraphUtil.printDag(directedAcyclicGraph));
                reInitAfterOptimized(createJobContext, directedAcyclicGraph);
                doTransform(createJobContext, directedAcyclicGraph);
                List<TransformVertex> lastVertexList = createJobContext.getLastVertexList();
                if (lastVertexList.size() == 1) {
                    execute = createTableOperation(lastVertexList.get(0)).execute(createJobContext);
                } else {
                    ArrayList arrayList = new ArrayList(3);
                    for (TransformVertex transformVertex : lastVertexList) {
                        if (transformVertex.isSinkVertex()) {
                            arrayList.add(transformVertex);
                        }
                    }
                    execute = new MultiSinkOperation(arrayList).execute(createJobContext);
                }
                QDppJobResult qDppJobResult2 = execute;
                if (null != execute) {
                    if (execute.getError() != null) {
                        createJobContext.handleError(execute);
                    } else {
                        registerJobResultHandlerLater(createJobContext, execute);
                    }
                }
                return qDppJobResult2;
            } catch (Exception e) {
                log.error("execute job error,jobName:" + this.jobExecuteModel.getJobName(), e);
                QDppJobResult qDppJobResult3 = new QDppJobResult(this.jobExecuteModel.getJobName());
                qDppJobResult3.setError("job execute error,errMsg:" + StackTraceUtil.getStackTrace(e));
                qDppJobResult3.setJobStatus(QDppJobStatus.FAILED);
                if (null != qDppJobResult3) {
                    if (qDppJobResult3.getError() != null) {
                        createJobContext.handleError(qDppJobResult3);
                    } else {
                        registerJobResultHandlerLater(createJobContext, qDppJobResult3);
                    }
                }
                return qDppJobResult3;
            }
        } catch (Throwable th) {
            if (0 != 0) {
                if (qDppJobResult.getError() != null) {
                    createJobContext.handleError(null);
                } else {
                    registerJobResultHandlerLater(createJobContext, null);
                }
            }
            throw th;
        }
    }

    private void registerJobResultHandlerLater(final QDppJobContext qDppJobContext, final QDppJobResult qDppJobResult) {
        JobMonitorType monitorType = qDppJobContext.getJobExecuteModel().getMonitorType();
        if (qDppJobContext.getExecuteMode() == ExecuteMode.CLUSTER && monitorType == JobMonitorType.ON_ENGINE && qDppJobResult.getJobStatus() == QDppJobStatus.SUBMITTED) {
            DppGlobalScheduleExecutor.submitLater(new Runnable() { // from class: com.kingdee.bos.qing.dpp.engine.flink.job.execution.TransformJob.1
                @Override // java.lang.Runnable
                public void run() {
                    JobStatusResultMonitor.getInstance().registerJobResultHandler(qDppJobResult.getJobHexId(), qDppJobResult.getJobName(), new JobStatusResultPushHandler(qDppJobContext.getJobExecuteModel()));
                }
            }, 500L);
        }
    }

    private void reInitAfterOptimized(QDppJobContext qDppJobContext, DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph) throws QDataTransformException {
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(directedAcyclicGraph);
        while (topologicalOrderIterator.hasNext()) {
            TransformVertex transformVertex = (TransformVertex) topologicalOrderIterator.next();
            if (!transformVertex.isInitialized()) {
                transformVertex.clear();
                transformVertex.initialize(qDppJobContext);
            }
        }
    }

    private TableOperation createTableOperation(TransformVertex transformVertex) throws TableBuildException {
        if (transformVertex.isSinkVertex()) {
            return new SinkOperation(transformVertex);
        }
        throw new TableBuildException("no sink vertex exist, data can not be sinked");
    }

    private void doTransform(QDppJobContext qDppJobContext, DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph) throws QDataTransformException {
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(directedAcyclicGraph);
        while (topologicalOrderIterator.hasNext()) {
            TransformVertex transformVertex = (TransformVertex) topologicalOrderIterator.next();
            transformVertex.transToTable(qDppJobContext);
            if (isLastVertex(directedAcyclicGraph, transformVertex)) {
                qDppJobContext.addLastVertex(transformVertex);
            }
        }
    }

    private TransformVertex addSinkVertexAndInit(QDppJobContext qDppJobContext, TransformVertex transformVertex, DataSinkType dataSinkType) throws QDataTransformException {
        Transformation prepareSink = SinkTransformationProviderFactory.create(dataSinkType).prepareSink(qDppJobContext, transformVertex);
        checkSinkTransformation(prepareSink);
        TransformVertex transformVertex2 = new TransformVertex();
        transformVertex2.setTransformation(prepareSink);
        qDppJobContext.getDirectedAcyclicGraph().addVertex(transformVertex2);
        qDppJobContext.getDirectedAcyclicGraph().addEdge(transformVertex, transformVertex2);
        return transformVertex2;
    }

    private void checkSinkTransformation(Transformation transformation) throws QDataTransformException {
        if (null == transformation) {
            throw new QDataTransformException("sink transformation to be prepared can not be null");
        }
        if (transformation.getTransformType() != TransformType.SINK_DATA) {
            throw new QDataTransformException("the prepared sink transformation type is not valid sink type");
        }
        if (!(transformation.getTransformSettings() instanceof SinkSettings)) {
            throw new QDataTransformException("the prepared sink transformation setting is not sink setting type");
        }
    }

    private void initializeTransformVertex(QDppJobContext qDppJobContext) throws QDataTransformException {
        DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph = qDppJobContext.getDirectedAcyclicGraph();
        TopologicalOrderIterator topologicalOrderIterator = new TopologicalOrderIterator(directedAcyclicGraph);
        while (topologicalOrderIterator.hasNext()) {
            TransformVertex transformVertex = (TransformVertex) topologicalOrderIterator.next();
            transformVertex.initialize(qDppJobContext);
            if (isLastVertex(directedAcyclicGraph, transformVertex) && !transformVertex.isSinkVertex()) {
                addSinkVertexAndInit(qDppJobContext, transformVertex, this.jobExecuteModel.getSinkType());
            }
        }
    }

    private boolean isLastVertex(DirectedAcyclicGraph<TransformVertex, DefaultEdge> directedAcyclicGraph, TransformVertex transformVertex) {
        return directedAcyclicGraph.outDegreeOf(transformVertex) == 0;
    }

    private QDppJobContext createJobContext(ExecuteMode executeMode) throws QDppException {
        QDppJobContext qDppJobContext = new QDppJobContext();
        qDppJobContext.setExecuteMode(executeMode);
        qDppJobContext.initialize(this.jobExecuteModel, false);
        return qDppJobContext;
    }
}
