package com.kingdee.bos.qing.dpp.engine.flink.job.execution;

import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.engine.flink.entrypoint.QDppDispatcherResourceMgrComponent;
import com.kingdee.bos.qing.dpp.engine.flink.job.QDppJobContext;
import com.kingdee.bos.qing.dpp.engine.flink.util.SerializableThrowableUtils;
import com.kingdee.bos.qing.dpp.job.exception.JobExecuteException;
import com.kingdee.bos.qing.dpp.job.model.QDppJobResult;
import com.kingdee.bos.qing.dpp.job.model.QDppJobStatus;
import com.kingdee.bos.qing.util.StackTraceUtil;
import java.net.MalformedURLException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.table.operations.ModifyOperation;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/job/execution/TableOperation.class */
abstract class TableOperation {
    public abstract QDppJobResult execute(QDppJobContext qDppJobContext);

    /* JADX INFO: Access modifiers changed from: protected */
    public QDppJobResult submitJob(QDppJobContext qDppJobContext, List<ModifyOperation> list) throws MalformedURLException, ExecutionException, InterruptedException, JobExecuteException {
        String jobName = qDppJobContext.getJobExecuteModel().getJobName();
        StreamExecutionEnvironment streamEnv = qDppJobContext.getStreamEnv();
        streamEnv.configure(qDppJobContext.getTableEnv().getConfig().getConfiguration());
        streamEnv.getConfig().enableObjectReuse();
        qDppJobContext.getTableEnv().getPlanner().translate(list).forEach(transformation -> {
            streamEnv.addOperator(transformation);
        });
        StreamGraph streamGraph = streamEnv.getStreamGraph();
        streamGraph.setJobName(jobName);
        JobGraph jobGraph = PipelineExecutorUtils.getJobGraph(streamGraph, streamEnv.getConfiguration());
        CompletableFuture<Acknowledge> submitJob = QDppDispatcherResourceMgrComponent.get().getQDppDispatcherGateway().submitJob(jobName, jobGraph, Time.milliseconds(((Integer) QDppOptions.JOB_TASK_SUBMIT_TIMEOUT.getValue()).intValue()));
        if (null != submitJob) {
            String hexString = jobGraph.getJobID().toHexString();
            return (QDppJobResult) submitJob.handle((acknowledge, th) -> {
                QDppJobResult qDppJobResult = new QDppJobResult(jobName);
                if (null != th) {
                    qDppJobResult.setError(StackTraceUtil.getStackTrace(SerializableThrowableUtils.convertToQDppError(th)));
                    qDppJobResult.setJobStatus(QDppJobStatus.FAILED);
                } else {
                    qDppJobResult.setJobStatus(QDppJobStatus.SUBMITTED);
                }
                qDppJobResult.setJobHexId(hexString);
                return qDppJobResult;
            }).get();
        }
        QDppJobResult qDppJobResult = new QDppJobResult(jobName);
        qDppJobResult.setJobHexId(jobGraph.getJobID().toHexString());
        qDppJobResult.setError("leader gate way not ready,jobName:" + jobName);
        qDppJobResult.setJobStatus(QDppJobStatus.FAILED);
        return qDppJobResult;
    }
}
