package org.apache.flink.runtime.highavailability.zookeeper;

import java.io.IOException;
import java.nio.charset.Charset;
import java.util.Arrays;
import org.apache.curator.framework.CuratorFramework;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.runtime.highavailability.RunningJobsRegistry;
import org.apache.flink.util.Preconditions;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/highavailability/zookeeper/ZooKeeperRunningJobsRegistry.class */
public class ZooKeeperRunningJobsRegistry implements RunningJobsRegistry {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperRunningJobsRegistry.class);
    private static final Charset ENCODING = Charset.forName("utf-8");
    private final CuratorFramework client;
    private final String runningJobPath;

    public ZooKeeperRunningJobsRegistry(CuratorFramework curatorFramework, Configuration configuration) {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "client");
        this.runningJobPath = configuration.getString(HighAvailabilityOptions.ZOOKEEPER_RUNNING_JOB_REGISTRY_PATH);
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public void setJobRunning(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        try {
            writeEnumToZooKeeper(jobID, RunningJobsRegistry.JobSchedulingStatus.RUNNING);
        } catch (Exception e) {
            throw new IOException("Failed to set RUNNING state in ZooKeeper for job " + jobID, e);
        }
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public void setJobFinished(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        try {
            writeEnumToZooKeeper(jobID, RunningJobsRegistry.JobSchedulingStatus.DONE);
        } catch (Exception e) {
            throw new IOException("Failed to set DONE state in ZooKeeper for job " + jobID, e);
        }
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public RunningJobsRegistry.JobSchedulingStatus getJobSchedulingStatus(JobID jobID) throws IOException {
        byte[] bArr;
        Preconditions.checkNotNull(jobID);
        try {
            String createZkPath = createZkPath(jobID);
            if (((Stat) this.client.checkExists().forPath(createZkPath)) == null || (bArr = (byte[]) this.client.getData().forPath(createZkPath)) == null) {
                return RunningJobsRegistry.JobSchedulingStatus.PENDING;
            }
            try {
                return RunningJobsRegistry.JobSchedulingStatus.valueOf(new String(bArr, ENCODING));
            } catch (IllegalArgumentException e) {
                throw new IOException("Found corrupt data in ZooKeeper: " + Arrays.toString(bArr) + " is no valid job status");
            }
        } catch (Exception e2) {
            throw new IOException("Get finished state from zk fail for job " + jobID.toString(), e2);
        }
    }

    @Override // org.apache.flink.runtime.highavailability.RunningJobsRegistry
    public void clearJob(JobID jobID) throws IOException {
        Preconditions.checkNotNull(jobID);
        String createZkPath = createZkPath(jobID);
        try {
            if (((Stat) this.client.checkExists().forPath(createZkPath)) != null) {
                this.client.delete().forPath(createZkPath);
            }
        } catch (KeeperException.NoNodeException e) {
        } catch (Exception e2) {
            throw new IOException("Failed to clear job state from ZooKeeper for job " + jobID, e2);
        }
    }

    private String createZkPath(JobID jobID) {
        return this.runningJobPath + jobID.toString();
    }

    private void writeEnumToZooKeeper(JobID jobID, RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus) throws Exception {
        LOG.debug("Setting scheduling state for job {} to {}.", jobID, jobSchedulingStatus);
        String createZkPath = createZkPath(jobID);
        while (true) {
            try {
                break;
            } catch (KeeperException.NoNodeException | KeeperException.NodeExistsException e) {
                LOG.debug("Retrying failure to set job state from ZooKeeper for job {}", jobID, e);
            }
        }
        if (((Stat) this.client.checkExists().forPath(createZkPath)) != null) {
            this.client.setData().forPath(createZkPath, jobSchedulingStatus.name().getBytes(ENCODING));
        } else {
            this.client.create().creatingParentContainersIfNeeded().forPath(createZkPath, jobSchedulingStatus.name().getBytes(ENCODING));
        }
    }
}
