package com.kingdee.bos.qing.dpp.engine.flink.job.dispather;

import com.kingdee.bos.qing.dpp.job.exception.JobExecuteException;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.webmonitor.ClusterOverview;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.util.concurrent.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/job/dispather/ClusterDispatcherGateway.class */
public class ClusterDispatcherGateway implements IQDppDispatcherGateway {
    private static final Logger log = LoggerFactory.getLogger(ClusterDispatcherGateway.class);
    private LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever;

    public ClusterDispatcherGateway(LeaderGatewayRetriever<DispatcherGateway> leaderGatewayRetriever) {
        this.dispatcherGatewayRetriever = leaderGatewayRetriever;
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<JobStatus> requestJobStatus(String str, Time time) {
        DispatcherGateway retryableGetDispatcherGateway = retryableGetDispatcherGateway(time.toMilliseconds());
        if (null != retryableGetDispatcherGateway) {
            return retryableGetDispatcherGateway.requestJobStatus(JobID.fromHexString(str), time);
        }
        log.warn("dispatcher gate way is not ready");
        return CompletableFuture.completedFuture(null);
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<JobResult> requestJobResult(String str, Time time) {
        DispatcherGateway retryableGetDispatcherGateway = retryableGetDispatcherGateway(time.toMilliseconds());
        if (null != retryableGetDispatcherGateway) {
            return retryableGetDispatcherGateway.requestJobResult(JobID.fromHexString(str), time);
        }
        log.warn("dispatcher gate way is not ready");
        JobResult.Builder builder = new JobResult.Builder();
        builder.jobId(JobID.fromHexString(str)).applicationStatus(ApplicationStatus.UNKNOWN);
        return CompletableFuture.completedFuture(builder.build());
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<Acknowledge> cancelJob(String str, Time time) {
        DispatcherGateway retryableGetDispatcherGateway = retryableGetDispatcherGateway(time.toMilliseconds());
        if (null != retryableGetDispatcherGateway) {
            return retryableGetDispatcherGateway.cancelJob(JobID.fromHexString(str), time);
        }
        log.warn("dispatcher gate way not ready");
        return null;
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<Acknowledge> submitJob(String str, JobGraph jobGraph, Time time) {
        DispatcherGateway retryableGetDispatcherGateway = retryableGetDispatcherGateway(time.toMilliseconds());
        return null == retryableGetDispatcherGateway ? FutureUtils.completedExceptionally(new JobExecuteException("dispatcher gate way not ready,submit job failed")) : retryableGetDispatcherGateway.submitJob(jobGraph, time);
    }

    @Override // com.kingdee.bos.qing.dpp.engine.flink.job.dispather.IQDppDispatcherGateway
    public CompletableFuture<ClusterOverview> requestClusterOverview(Time time) {
        DispatcherGateway retryableGetDispatcherGateway = retryableGetDispatcherGateway(time.toMilliseconds());
        if (null != retryableGetDispatcherGateway) {
            return retryableGetDispatcherGateway.requestClusterOverview(time);
        }
        log.warn("dispatcher gate way not ready");
        return null;
    }

    private DispatcherGateway retryableGetDispatcherGateway(long j) {
        Optional now = this.dispatcherGatewayRetriever.getNow();
        long j2 = 0;
        long j3 = j;
        while (true) {
            long j4 = j3;
            if (now.isPresent() || j4 <= 0) {
                break;
            }
            sleep4Awhile(200L);
            now = this.dispatcherGatewayRetriever.getNow();
            j2 += 200;
            j3 = j - j2;
        }
        if (now.isPresent()) {
            return (DispatcherGateway) now.get();
        }
        return null;
    }

    private void sleep4Awhile(long j) {
        try {
            Thread.sleep(j);
        } catch (InterruptedException e) {
            log.warn("sleep interrupted when get dispatcher gateway");
        }
    }
}
