package com.kingdee.bos.qing.dpp.engine.flink.embed;

import com.kingdee.bos.qing.dpp.common.log.DppLogger;
import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.engine.flink.entrypoint.QDppDispatcherResourceMgrComponent;
import com.kingdee.bos.qing.dpp.engine.flink.job.execution.EmbedJobOperateServiceImpl;
import com.kingdee.bos.qing.dpp.engine.flink.util.SerializableThrowableUtils;
import com.kingdee.bos.qing.dpp.job.interfaces.IJobOperateService;
import com.kingdee.bos.qing.dpp.job.interfaces.IJobStatusResultReceiver;
import com.kingdee.bos.qing.dpp.job.model.QDppJobResult;
import com.kingdee.bos.qing.dpp.job.model.QDppJobStatus;
import com.kingdee.bos.qing.dpp.rpc.ServiceRefCenter;
import com.kingdee.bos.qing.dpp.rpc.model.ServiceRefInfo;
import com.kingdee.bos.qing.dpp.utils.DppGlobalScheduleExecutor;
import com.kingdee.bos.qing.util.LogUtil;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.DeploymentOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.util.SerializedThrowable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/embed/EmbedDppEngine.class */
public class EmbedDppEngine {
    private static EmbedDppEngine instance = null;
    private static AtomicBoolean started = new AtomicBoolean(false);
    private final Logger log = new DppLogger("QDpp-EmbedEngine:", LoggerFactory.getLogger(EmbedDppEngine.class));
    private Configuration configuration = null;
    private Map<String, EmbedJob> runningEmbedJobs = new HashMap(5);
    private Map<String, EmbedJob> finishJobs = new HashMap(5);
    private Map<String, String> jobIdToJobNames = new ConcurrentHashMap(10);

    /* renamed from: com.kingdee.bos.qing.dpp.engine.flink.embed.EmbedDppEngine$1, reason: invalid class name */
    /* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/embed/EmbedDppEngine$1.class */
    static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$api$common$JobStatus = new int[JobStatus.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FINISHED.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.FAILED.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$api$common$JobStatus[JobStatus.CANCELED.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/embed/EmbedDppEngine$JobResultMonitor.class */
    private class JobResultMonitor implements Runnable {
        private JobResultMonitor() {
        }

        @Override // java.lang.Runnable
        public void run() {
            HashSet hashSet = new HashSet(5);
            try {
                HashMap hashMap = new HashMap(5);
                synchronized (EmbedDppEngine.this.runningEmbedJobs) {
                    hashMap.putAll(EmbedDppEngine.this.runningEmbedJobs);
                }
                for (Map.Entry entry : hashMap.entrySet()) {
                    String str = (String) entry.getKey();
                    EmbedJob embedJob = (EmbedJob) entry.getValue();
                    String hexString = embedJob.getJobID().toHexString();
                    QDppJobResult qDppJobResult = new QDppJobResult(str);
                    qDppJobResult.setJobHexId(hexString);
                    embedJob.getJobStatus().whenComplete((jobStatus, th) -> {
                        if (null != th) {
                            if (!(th instanceof FlinkJobNotFoundException)) {
                                EmbedDppEngine.this.log.error("get job status failed,jobID:" + hexString, th);
                                return;
                            } else {
                                EmbedDppEngine.this.log.info("job status not found because of flink job not found exception,jobId:" + hexString);
                                jobStatus = JobStatus.FINISHED;
                            }
                        }
                        boolean z = false;
                        switch (AnonymousClass1.$SwitchMap$org$apache$flink$api$common$JobStatus[jobStatus.ordinal()]) {
                            case 1:
                                qDppJobResult.setJobStatus(QDppJobStatus.SUCCEED);
                                z = true;
                                break;
                            case 2:
                                qDppJobResult.setJobStatus(QDppJobStatus.FAILED);
                                z = true;
                                break;
                            case 3:
                                qDppJobResult.setJobStatus(QDppJobStatus.CANCELED);
                                z = true;
                                break;
                        }
                        if (z) {
                            hashSet.add(str);
                            embedJob.setEndedJobStatus(jobStatus);
                            embedJob.getJobResult().whenComplete((jobResult, th) -> {
                                if (null != th) {
                                    qDppJobResult.setError("unknown job error,failed to get job error info!");
                                    return;
                                }
                                embedJob.setFinalJobResult(jobResult);
                                if (jobResult.getSerializedThrowable().isPresent()) {
                                    qDppJobResult.setError(SerializableThrowableUtils.convertToQDppError(((SerializedThrowable) jobResult.getSerializedThrowable().get()).getCause()).getMessage());
                                }
                            }).join();
                            embedJob.setFinish();
                            ServiceRefInfo serviceRefInfo = ServiceRefCenter.getInstance().getServiceRefInfo(IJobStatusResultReceiver.class.getName());
                            if (null != serviceRefInfo) {
                                ((IJobStatusResultReceiver) serviceRefInfo.getRef()).receiveJobResult(qDppJobResult);
                            }
                        }
                    }).join();
                }
                synchronized (EmbedDppEngine.this.runningEmbedJobs) {
                    hashSet.forEach(str2 -> {
                        EmbedJob embedJob2 = (EmbedJob) EmbedDppEngine.this.runningEmbedJobs.remove(str2);
                        if (null != embedJob2) {
                            EmbedDppEngine.this.finishJobs.put(str2, embedJob2);
                        }
                    });
                    long currentTimeMillis = System.currentTimeMillis();
                    HashSet hashSet2 = new HashSet(3);
                    for (EmbedJob embedJob2 : EmbedDppEngine.this.finishJobs.values()) {
                        if (currentTimeMillis - embedJob2.getFinishTime() > 120000) {
                            hashSet2.add(embedJob2.getJobID().toHexString());
                        }
                    }
                    Iterator it = hashSet2.iterator();
                    while (it.hasNext()) {
                        EmbedDppEngine.this.finishJobs.remove((String) EmbedDppEngine.this.jobIdToJobNames.remove((String) it.next()));
                    }
                    EmbedDppEngine.this.runningEmbedJobs.notifyAll();
                }
            } catch (Exception e) {
                LogUtil.warn("job result monitor error:" + e.getLocalizedMessage());
            }
        }

        /* synthetic */ JobResultMonitor(EmbedDppEngine embedDppEngine, AnonymousClass1 anonymousClass1) {
            this();
        }
    }

    public void start() {
        if (!started.compareAndSet(false, true)) {
            throw new IllegalStateException("embed engine has been already started");
        }
        instance = this;
        registerJobService();
        initConfiguration();
        QDppDispatcherResourceMgrComponent.get().startEmbed();
        DppGlobalScheduleExecutor.scheduleAtFixRate(new JobResultMonitor(this, null), 1000L, 1000L, TimeUnit.MILLISECONDS);
    }

    public EmbedJob getEmbedJob(String str) {
        synchronized (this.runningEmbedJobs) {
            String str2 = this.jobIdToJobNames.get(str);
            if (null == str2) {
                return null;
            }
            EmbedJob embedJob = this.runningEmbedJobs.get(str2);
            if (null == embedJob) {
                embedJob = this.finishJobs.get(str2);
            }
            return embedJob;
        }
    }

    private void initConfiguration() {
        this.configuration = new Configuration();
        this.configuration.set(DeploymentOptions.ATTACHED, Boolean.TRUE);
        this.configuration.set(TaskManagerOptions.NUM_TASK_SLOTS, QDppOptions.ENGINE_LOCAL_EMBED_TASK_SLOT_SIZE.getValue());
        this.configuration.setInteger("local.number-taskmanager", ((Integer) QDppOptions.ENGINE_LOCAL_EMBED_TASK_MANAGER_SIZE.getValue()).intValue());
        this.configuration.set(RestOptions.BIND_ADDRESS, "localhost");
        this.configuration.setString(RestOptions.BIND_PORT, "0");
        this.configuration.set(RestOptions.ADDRESS, "localhost");
    }

    private void registerJobService() {
        EmbedJobOperateServiceImpl embedJobOperateServiceImpl = new EmbedJobOperateServiceImpl();
        ServiceRefInfo serviceRefInfo = new ServiceRefInfo(IJobOperateService.class.getName());
        serviceRefInfo.setPermanent(true);
        serviceRefInfo.setRef(embedJobOperateServiceImpl);
        ServiceRefCenter.getInstance().cacheRefInfo(serviceRefInfo);
    }

    public static EmbedDppEngine getInstance() {
        return instance;
    }

    public Configuration getConfiguration() {
        return this.configuration;
    }

    public void cacheEmbedJob(String str, EmbedJob embedJob) {
        synchronized (this.runningEmbedJobs) {
            this.runningEmbedJobs.put(str, embedJob);
            this.jobIdToJobNames.put(embedJob.getJobID().toHexString(), str);
        }
    }
}
