package org.apache.flink.runtime.zookeeper;

import com.google.common.collect.Sets;
import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.api.ACLBackgroundPathAndBytesable;
import org.apache.curator.framework.api.ACLPathAndBytesable;
import org.apache.curator.framework.api.BackgroundPathAndBytesable;
import org.apache.curator.framework.api.transaction.CuratorTransactionBridge;
import org.apache.curator.utils.ZKPaths;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.persistence.IntegerResourceVersion;
import org.apache.flink.runtime.persistence.PossibleInconsistentStateException;
import org.apache.flink.runtime.persistence.RetrievableStateStorageHelper;
import org.apache.flink.runtime.persistence.StateHandleStore;
import org.apache.flink.runtime.state.RetrievableStateHandle;
import org.apache.flink.runtime.util.StateHandleStoreUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.Preconditions;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStore.class */
public class ZooKeeperStateHandleStore<T extends Serializable> implements StateHandleStore<T, IntegerResourceVersion> {
    private static final Logger LOG = LoggerFactory.getLogger(ZooKeeperStateHandleStore.class);

    @VisibleForTesting
    static final Set<Class<? extends KeeperException>> PRE_COMMIT_EXCEPTIONS = Sets.newHashSet(new Class[]{KeeperException.NodeExistsException.class, KeeperException.BadArgumentsException.class, KeeperException.NoNodeException.class, KeeperException.NoAuthException.class, KeeperException.BadVersionException.class, KeeperException.AuthFailedException.class, KeeperException.InvalidACLException.class, KeeperException.SessionMovedException.class, KeeperException.NotReadOnlyException.class});
    private final CuratorFramework client;
    private final RetrievableStateStorageHelper<T> storage;
    private final String lockNode = UUID.randomUUID().toString();

    public ZooKeeperStateHandleStore(CuratorFramework curatorFramework, RetrievableStateStorageHelper<T> retrievableStateStorageHelper) {
        this.client = (CuratorFramework) Preconditions.checkNotNull(curatorFramework, "Curator client");
        this.storage = (RetrievableStateStorageHelper) Preconditions.checkNotNull(retrievableStateStorageHelper, "State storage");
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public RetrievableStateHandle<T> addAndLock(String str, T t) throws PossibleInconsistentStateException, Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        Preconditions.checkNotNull(t, "State");
        String normalizePath = normalizePath(str);
        RetrievableStateHandle<T> store = this.storage.store(t);
        try {
            writeStoreHandleTransactionally(normalizePath, StateHandleStoreUtils.serializeOrDiscard(store));
            return store;
        } catch (Exception e) {
            if (indicatesPossiblyInconsistentState(e)) {
                throw new PossibleInconsistentStateException(e);
            }
            store.discardState();
            throw ((StateHandleStore.AlreadyExistException) ExceptionUtils.findThrowable(e, KeeperException.NodeExistsException.class).map(nodeExistsException -> {
                return new StateHandleStore.AlreadyExistException("ZooKeeper node " + normalizePath + " already exists.", nodeExistsException);
            }).orElseThrow(() -> {
                return e;
            }));
        }
    }

    @VisibleForTesting
    protected void writeStoreHandleTransactionally(String str, byte[] bArr) throws Exception {
        ((CuratorTransactionBridge) ((ACLPathAndBytesable) ((CuratorTransactionBridge) ((ACLPathAndBytesable) this.client.inTransaction().create().withMode(CreateMode.PERSISTENT)).forPath(str, bArr)).and().create().withMode(CreateMode.EPHEMERAL)).forPath(getLockPath(str))).and().commit();
    }

    /* renamed from: replace, reason: avoid collision after fix types in other method */
    public void replace2(String str, IntegerResourceVersion integerResourceVersion, T t) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        Preconditions.checkNotNull(t, "State");
        String normalizePath = normalizePath(str);
        RetrievableStateHandle<T> retrievableStateHandle = get(normalizePath, false);
        RetrievableStateHandle<T> store = this.storage.store(t);
        boolean z = false;
        boolean z2 = true;
        try {
            try {
                setStateHandle(normalizePath, StateHandleStoreUtils.serializeOrDiscard(store), integerResourceVersion.getValue());
                z = true;
                z2 = false;
                if (1 != 0) {
                    retrievableStateHandle.discardState();
                }
                if (0 != 0) {
                    store.discardState();
                }
            } catch (Exception e) {
                if (!indicatesPossiblyInconsistentState(e)) {
                    throw ((StateHandleStore.NotExistException) ExceptionUtils.findThrowable(e, KeeperException.NoNodeException.class).map(noNodeException -> {
                        return new StateHandleStore.NotExistException("ZooKeeper node " + normalizePath + " does not exist.", noNodeException);
                    }).orElseThrow(() -> {
                        return e;
                    }));
                }
                throw new PossibleInconsistentStateException(e);
            }
        } catch (Throwable th) {
            if (z) {
                retrievableStateHandle.discardState();
            }
            if (z2) {
                store.discardState();
            }
            throw th;
        }
    }

    @VisibleForTesting
    protected void setStateHandle(String str, byte[] bArr, int i) throws Exception {
        ((BackgroundPathAndBytesable) this.client.setData().withVersion(i)).forPath(str, bArr);
    }

    private boolean indicatesPossiblyInconsistentState(Exception exc) {
        return !PRE_COMMIT_EXCEPTIONS.contains(exc.getClass());
    }

    /* JADX WARN: Can't rename method to resolve collision */
    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public IntegerResourceVersion exists(String str) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        Stat stat = (Stat) this.client.checkExists().forPath(normalizePath(str));
        return stat != null ? IntegerResourceVersion.valueOf(stat.getVersion()) : IntegerResourceVersion.notExisting();
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public RetrievableStateHandle<T> getAndLock(String str) throws Exception {
        return get(str, true);
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public Collection<String> getAllHandles() throws Exception {
        while (((Stat) this.client.checkExists().forPath("/")) != null) {
            try {
                return (Collection) this.client.getChildren().forPath("/");
            } catch (KeeperException.NoNodeException e) {
            }
        }
        return Collections.emptyList();
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public List<Tuple2<RetrievableStateHandle<T>, String>> getAllAndLock() throws Exception {
        ArrayList arrayList = new ArrayList();
        boolean z = false;
        while (!z) {
            arrayList.clear();
            Stat stat = (Stat) this.client.checkExists().forPath("/");
            if (stat == null) {
                break;
            }
            int cversion = stat.getCversion();
            Iterator it = ((List) this.client.getChildren().forPath("/")).iterator();
            while (it.hasNext()) {
                String str = "/" + ((String) it.next());
                try {
                    arrayList.add(new Tuple2(getAndLock(str), str));
                } catch (IOException e) {
                    LOG.warn("Could not get all ZooKeeper children. Node {} contained corrupted data. Ignoring this node.", str, e);
                } catch (KeeperException.NoNodeException e2) {
                }
            }
            z = cversion == ((Stat) this.client.checkExists().forPath("/")).getCversion();
        }
        return arrayList;
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public boolean releaseAndTryRemove(String str) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        String normalizePath = normalizePath(str);
        RetrievableStateHandle<T> retrievableStateHandle = null;
        try {
            retrievableStateHandle = get(normalizePath, false);
        } catch (Exception e) {
            LOG.warn("Could not retrieve the state handle from node {}.", normalizePath, e);
        }
        release(str);
        try {
            this.client.delete().forPath(normalizePath);
            if (retrievableStateHandle == null) {
                return true;
            }
            retrievableStateHandle.discardState();
            return true;
        } catch (KeeperException.NotEmptyException e2) {
            LOG.debug("Could not delete znode {} because it is still locked.", normalizePath);
            return false;
        }
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public void releaseAndTryRemoveAll() throws Exception {
        Exception exc = null;
        Iterator<String> it = getAllHandles().iterator();
        while (it.hasNext()) {
            try {
                releaseAndTryRemove('/' + it.next());
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        if (exc != null) {
            throw new Exception("Could not properly release and try removing all state nodes.", exc);
        }
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public void release(String str) throws Exception {
        try {
            this.client.delete().forPath(getLockPath(normalizePath(str)));
        } catch (Exception e) {
            throw new Exception("Could not release the lock: " + getLockPath(str) + '.', e);
        } catch (KeeperException.NoNodeException e2) {
        }
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public void releaseAll() throws Exception {
        Exception exc = null;
        Iterator<String> it = getAllHandles().iterator();
        while (it.hasNext()) {
            try {
                release(it.next());
            } catch (Exception e) {
                exc = (Exception) ExceptionUtils.firstOrSuppressed(e, exc);
            }
        }
        if (exc != null) {
            throw new Exception("Could not properly release all state nodes.", exc);
        }
    }

    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public void clearEntries() throws Exception {
        String str = "/" + this.client.getNamespace();
        LOG.info("Removing {} from ZooKeeper", str);
        ZKPaths.deleteChildren(this.client.getZookeeperClient().getZooKeeper(), str, true);
    }

    public String toString() {
        return getClass().getSimpleName() + "{namespace='" + this.client.getNamespace() + "'}";
    }

    protected String getLockPath(String str) {
        return str + '/' + this.lockNode;
    }

    private RetrievableStateHandle<T> get(String str, boolean z) throws Exception {
        Preconditions.checkNotNull(str, "Path in ZooKeeper");
        String normalizePath = normalizePath(str);
        if (z) {
            try {
                ((ACLBackgroundPathAndBytesable) this.client.create().withMode(CreateMode.EPHEMERAL)).forPath(getLockPath(normalizePath));
            } catch (KeeperException.NodeExistsException e) {
            } catch (KeeperException.NoNodeException e2) {
                throw new StateHandleStore.NotExistException("ZooKeeper node " + normalizePath + " does not exist.", e2);
            }
        }
        boolean z2 = false;
        try {
            try {
                RetrievableStateHandle<T> retrievableStateHandle = (RetrievableStateHandle) StateHandleStoreUtils.deserialize((byte[]) this.client.getData().forPath(normalizePath));
                z2 = true;
                if (1 == 0 && z) {
                    release(normalizePath);
                }
                return retrievableStateHandle;
            } catch (Throwable th) {
                if (!z2 && z) {
                    release(normalizePath);
                }
                throw th;
            }
        } catch (KeeperException.NoNodeException e3) {
            throw new StateHandleStore.NotExistException("ZooKeeper node " + normalizePath + " does not exist.", e3);
        } catch (IOException | ClassNotFoundException e4) {
            throw new IOException("Failed to deserialize state handle from ZooKeeper data from " + normalizePath + '.', e4);
        }
    }

    private static String normalizePath(String str) {
        return str.startsWith("/") ? str : '/' + str;
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // org.apache.flink.runtime.persistence.StateHandleStore
    public /* bridge */ /* synthetic */ void replace(String str, IntegerResourceVersion integerResourceVersion, Serializable serializable) throws PossibleInconsistentStateException, Exception {
        replace2(str, integerResourceVersion, (IntegerResourceVersion) serializable);
    }
}
