package com.kingdee.bos.qing.dpp.engine.flink.embed;

import com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.minicluster.MiniCluster;
import org.apache.flink.util.concurrent.FutureUtils;
import org.apache.flink.util.function.FunctionUtils;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/embed/EmbedDispatcherGateway.class */
public class EmbedDispatcherGateway implements IQDppDispatcherGateway {
    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<JobStatus> requestJobStatus(String str, Time time) {
        EmbedJob embedJob = EmbedDppEngine.getInstance().getEmbedJob(str);
        return null == embedJob ? CompletableFuture.completedFuture(JobStatus.FINISHED) : embedJob.getJobStatus();
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<JobResult> requestJobResult(String str, Time time) {
        EmbedJob embedJob = EmbedDppEngine.getInstance().getEmbedJob(str);
        return null == embedJob ? CompletableFuture.completedFuture(new JobResult.Builder().jobId(JobID.fromHexString(str)).applicationStatus(ApplicationStatus.SUCCEEDED).build()) : embedJob.getJobResult();
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<Acknowledge> cancelJob(String str, Time time) {
        EmbedJob embedJob = EmbedDppEngine.getInstance().getEmbedJob(str);
        return null == embedJob ? CompletableFuture.completedFuture(Acknowledge.get()) : embedJob.cancel();
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(String str, JobGraph jobGraph, Time time) {
        try {
            MiniCluster miniCluster = MiniClusterInstance.getInstance().getMiniCluster();
            EmbedJob embedJob = (EmbedJob) miniCluster.submitJob(jobGraph).thenApplyAsync(FunctionUtils.uncheckedFunction(jobSubmissionResult -> {
                ClientUtils.waitUntilJobInitializationFinished(() -> {
                    return (JobStatus) miniCluster.getJobStatus(jobSubmissionResult.getJobID()).get();
                }, () -> {
                    return (JobResult) miniCluster.requestJobResult(jobSubmissionResult.getJobID()).get();
                }, (ClassLoader) null);
                return jobSubmissionResult;
            })).thenApply(jobSubmissionResult2 -> {
                return new EmbedJob(miniCluster, jobSubmissionResult2.getJobID());
            }).whenComplete((embedJob2, th) -> {
                if (th != null) {
                    embedJob2.setSubmitError(th);
                }
            }).thenApply(Function.identity()).get();
            if (embedJob.getSubmitError() != null) {
                return FutureUtils.completedExceptionally(embedJob.getSubmitError());
            }
            EmbedDppEngine.getInstance().cacheEmbedJob(str, embedJob);
            return CompletableFuture.completedFuture(Acknowledge.get());
        } catch (Throwable th2) {
            return FutureUtils.completedExceptionally(th2);
        }
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        return null;
    }
}
