package kd.bos.algox.flink.enhance.krpc.impl;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.netty.channel.nio.NioEventLoopGroup;
import java.io.Serializable;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.net.UnknownHostException;
import java.text.MessageFormat;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.regex.Pattern;
import kd.bos.algox.flink.enhance.krpc.Actor;
import kd.bos.algox.flink.enhance.krpc.ActorRef;
import kd.bos.algox.flink.enhance.krpc.Dispatcher;
import kd.bos.algox.flink.enhance.krpc.KFencedAkkaRpcActor;
import kd.bos.algox.flink.enhance.krpc.KRpcActorMissException;
import kd.bos.algox.flink.enhance.krpc.KRpcException;
import kd.bos.algox.flink.enhance.krpc.KScheduledExecutor;
import kd.bos.algox.flink.enhance.krpc.MsgPlus;
import kd.bos.algox.flink.enhance.krpc.ThreadPoolSupplier;
import kd.bos.algox.flink.enhance.krpc.impl.transport.Client;
import kd.bos.algox.flink.enhance.krpc.impl.transport.NettyTransport;
import kd.bos.algox.flink.enhance.krpc.impl.transport.Server;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import org.apache.commons.lang3.RandomUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;

/* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/DispatcherImpl.class */
public class DispatcherImpl implements Dispatcher {
    public static final Log log = LogFactory.getLog(DispatcherImpl.class);
    private final Cache<String, String> endpointMatchCache;
    private final ExecutorService executors;
    private final DispatcherConfig config;
    private Server server;
    private KScheduledExecutor scheduledExecutor;
    private final NioEventLoopGroup clientEventLoopGroup;
    private final ThreadPoolSupplier threadPoolSupplier;
    private final ConcurrentHashMap<String, Actor> dispatcher = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ActorRef> refs = new ConcurrentHashMap<>();
    private final ConcurrentHashMap<String, ClientReference> clients = new ConcurrentHashMap<>();
    private final CompletableFuture<Void> shutdownFuture = new CompletableFuture<>();
    private final AtomicBoolean shutdownTag = new AtomicBoolean(false);
    private final String identifier = String.valueOf(RandomUtils.nextInt(1, 999));

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/DispatcherImpl$ActorRefHeartbeat.class */
    public static class ActorRefHeartbeat implements Runnable {
        private final ActorRef actorRef;
        private final int delayMillis;
        private final DispatcherImpl dispatcher;
        static final /* synthetic */ boolean $assertionsDisabled;

        public ActorRefHeartbeat(ActorRef actorRef, int i, DispatcherImpl dispatcherImpl) {
            if (!$assertionsDisabled && i <= 0) {
                throw new AssertionError();
            }
            this.actorRef = actorRef;
            this.delayMillis = i;
            this.dispatcher = dispatcherImpl;
        }

        @Override // java.lang.Runnable
        public void run() {
            long currentTimeMillis = System.currentTimeMillis() - this.actorRef.getLastReadWriteTimestamp();
            if (currentTimeMillis < 0) {
                currentTimeMillis = 0;
            }
            if (currentTimeMillis <= this.delayMillis) {
                this.dispatcher.scheduledExecutor.schedule(new ActorRefHeartbeat(this.actorRef, this.delayMillis, this.dispatcher), this.delayMillis - currentTimeMillis, TimeUnit.MILLISECONDS);
            } else {
                this.actorRef.ask(new HeartbeatMessage()).whenComplete((obj, th) -> {
                    if (!(th instanceof KRpcActorMissException)) {
                        this.dispatcher.scheduledExecutor.schedule(new ActorRefHeartbeat(this.actorRef, this.delayMillis, this.dispatcher), this.delayMillis, TimeUnit.MILLISECONDS);
                    } else {
                        DispatcherImpl.log.debug("ActorRef:{} has receive KRpcActorMissException,will close ActorRef", this.actorRef.getClientEndpoint());
                        this.actorRef.close();
                    }
                });
            }
        }

        static {
            $assertionsDisabled = !DispatcherImpl.class.desiredAssertionStatus();
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/DispatcherImpl$ActorRefReleasableResource.class */
    public static class ActorRefReleasableResource implements ReleasableResource {
        private final DispatcherImpl dispatcher;
        private final String refId;
        private final String virtualActorEndpoint;

        public ActorRefReleasableResource(DispatcherImpl dispatcherImpl, String str, String str2) {
            this.dispatcher = dispatcherImpl;
            this.refId = str;
            this.virtualActorEndpoint = str2;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ReleasableResource, java.lang.AutoCloseable
        public void close() {
            runIgnoredException(() -> {
            });
            runIgnoredException(() -> {
                this.dispatcher.postMessage(new PoisonPill(this.virtualActorEndpoint));
            });
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/DispatcherImpl$ActorReleasableResource.class */
    public static class ActorReleasableResource implements ReleasableResource {
        private final DispatcherImpl dispatcher;
        private final String endpoint;

        public ActorReleasableResource(DispatcherImpl dispatcherImpl, String str) {
            this.dispatcher = dispatcherImpl;
            this.endpoint = str;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ReleasableResource, java.lang.AutoCloseable
        public void close() {
            runIgnoredException(() -> {
            });
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/DispatcherImpl$ClientReference.class */
    public static class ClientReference implements AutoCloseable {
        private final AtomicInteger refCount = new AtomicInteger();
        private final ReleasableResourceHolder releasableResourceHolder = new ReleasableResourceHolder();
        private final Client client;

        public ClientReference(String str, Client client, DispatcherImpl dispatcherImpl) {
            this.client = client;
            this.releasableResourceHolder.bind(() -> {
            });
            ReleasableResourceHolder releasableResourceHolder = this.releasableResourceHolder;
            client.getClass();
            releasableResourceHolder.bind(client::close);
        }

        public void addReference() {
            this.refCount.getAndIncrement();
        }

        public Client getClient() {
            return this.client;
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            if (this.refCount.decrementAndGet() == 0) {
                this.releasableResourceHolder.close();
            }
        }
    }

    public DispatcherImpl(DispatcherConfig dispatcherConfig, ThreadPoolSupplier threadPoolSupplier) {
        this.config = dispatcherConfig;
        this.threadPoolSupplier = threadPoolSupplier;
        this.executors = threadPoolSupplier.getDispatcherExecutorService();
        this.clientEventLoopGroup = threadPoolSupplier.getClientEventLoopGroup();
        CacheBuilder newBuilder = CacheBuilder.newBuilder();
        newBuilder.expireAfterAccess(30L, TimeUnit.MINUTES);
        newBuilder.maximumSize(512L);
        this.endpointMatchCache = newBuilder.build();
    }

    public void setScheduledExecutor(KScheduledExecutor kScheduledExecutor) {
        this.scheduledExecutor = kScheduledExecutor;
    }

    private String tryMatchEndpoint(String str) {
        Enumeration<String> keys = this.dispatcher.keys();
        while (keys.hasMoreElements()) {
            String nextElement = keys.nextElement();
            if (Pattern.compile(str).matcher(nextElement).find()) {
                return nextElement;
            }
        }
        return null;
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public void postMessage(MsgPlus msgPlus) {
        if (this.shutdownTag.get()) {
            log.warn("Dispatcher:{} has been shutdown, will discard message:{}", this.identifier, msgPlus);
            return;
        }
        Actor actor = this.dispatcher.get(msgPlus.getEndpoint());
        if (actor == null) {
            String str = (String) this.endpointMatchCache.getIfPresent(msgPlus.getEndpoint());
            if (str == null) {
                str = tryMatchEndpoint(msgPlus.getEndpoint());
                if (str == null) {
                    msgPlus.responseException(new KRpcActorMissException("Dispatch error: {0} not match actor", msgPlus.getEndpoint()));
                    return;
                }
                this.endpointMatchCache.put(msgPlus.getEndpoint(), str);
            }
            actor = this.dispatcher.get(str);
        }
        if (actor == null) {
            log.error("Dispatch error: {} not match actor", msgPlus.getEndpoint());
            msgPlus.responseException(new KRpcActorMissException("Dispatch error: {0} not match actor", msgPlus.getEndpoint()));
            return;
        }
        log.debug("Dispatch message: {} -> {}", msgPlus.getEndpoint(), actor.toString());
        actor.postMessage(msgPlus);
        if (actor.isInProcess()) {
            return;
        }
        ExecutorService executorService = this.executors;
        Actor actor2 = actor;
        actor2.getClass();
        executorService.submit(actor2::process);
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public void start() throws InterruptedException {
        NettyTransport nettyTransport = new NettyTransport();
        nettyTransport.setDispatcher(this);
        nettyTransport.setBossGroup(this.threadPoolSupplier.getServerBossLoopGroup());
        nettyTransport.setWorkerGroup(this.threadPoolSupplier.getServerWorkerLoopGroup());
        nettyTransport.setTcpNoDelay(this.config.isTcpNoDelay());
        nettyTransport.setMaxFrameSize(this.config.getMaxFrameSize());
        nettyTransport.setConnectTimeout(this.config.getConnectTimeout());
        this.server = nettyTransport.buildServer(this.config.getBindAddress(), this.config.getPort());
        this.server.start();
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public <T extends RpcEndpoint & RpcGateway> Actor createActor(String str, T t) {
        return this.dispatcher.computeIfAbsent(str, str2 -> {
            if (t instanceof FencedRpcEndpoint) {
                return createFenceActor(str, t);
            }
            ActorImpl actorImpl = new ActorImpl(str, this, t);
            actorImpl.addReleasableResource(new ActorReleasableResource(this, str));
            return actorImpl;
        });
    }

    private <F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> Actor createFenceActor(String str, Object obj) {
        KFencedAkkaRpcActor kFencedAkkaRpcActor = new KFencedAkkaRpcActor(str, this, (FencedRpcEndpoint) obj);
        kFencedAkkaRpcActor.addReleasableResource(new ActorReleasableResource(this, str));
        return kFencedAkkaRpcActor;
    }

    public static boolean isLocalAddress(String str) {
        try {
            InetAddress byName = InetAddress.getByName(str);
            if (byName.isAnyLocalAddress() || byName.isLoopbackAddress()) {
                return true;
            }
            return NetworkInterface.getByInetAddress(byName) != null;
        } catch (SocketException | UnknownHostException e) {
            throw new RuntimeException(e.getMessage(), e);
        }
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public ActorRef createActorRef(String str, String str2, int i) {
        String format = MessageFormat.format("ref://{0}:{1}:/{2}", str2, Integer.valueOf(i), str);
        return this.refs.computeIfAbsent(format, str3 -> {
            ActorRef remoteActorRef;
            if (isLocalAddress(str2) && getPort() == i) {
                remoteActorRef = new LocalActorRef(str, this);
            } else {
                ClientReference computeIfAbsent = this.clients.computeIfAbsent(MessageFormat.format("{0}:{1}", str2, Integer.valueOf(i)), str3 -> {
                    NettyTransport nettyTransport = new NettyTransport();
                    nettyTransport.setDispatcher(this);
                    nettyTransport.setTcpNoDelay(this.config.isTcpNoDelay());
                    nettyTransport.setMaxFrameSize(this.config.getMaxFrameSize());
                    nettyTransport.setConnectTimeout(this.config.getConnectTimeout());
                    Client buildClient = nettyTransport.buildClient(str2, i, this.clientEventLoopGroup);
                    try {
                        buildClient.connect();
                        return new ClientReference(str3, buildClient, this);
                    } catch (Exception e) {
                        throw new KRpcException(e, "Connect to endpoint:{0} on {1}:{2} fail, msg:{3}", str, str2, Integer.valueOf(i), e.getMessage());
                    }
                });
                computeIfAbsent.addReference();
                remoteActorRef = new RemoteActorRef(computeIfAbsent, str, this);
            }
            VirtualActorImpl virtualActorImpl = new VirtualActorImpl(remoteActorRef.getClientEndpoint());
            this.dispatcher.put(remoteActorRef.getClientEndpoint(), virtualActorImpl);
            virtualActorImpl.addReleasableResource(new ActorReleasableResource(this, remoteActorRef.getClientEndpoint()));
            remoteActorRef.addReleasableResource(new ActorRefReleasableResource(this, format, remoteActorRef.getClientEndpoint()));
            getScheduledExecutor().schedule(new ActorRefHeartbeat(remoteActorRef, this.config.getActorRefCheckTimeoutMillis(), this), this.config.getActorRefCheckTimeoutMillis(), TimeUnit.MILLISECONDS);
            return remoteActorRef;
        });
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public ExecutorService getExecutorService() {
        return this.executors;
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public ScheduledExecutor getScheduledExecutor() {
        return this.scheduledExecutor;
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public CompletableFuture<Void> getShutdownFuture() {
        return this.shutdownFuture;
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public void shutdownAsync() {
        new Thread(() -> {
            if (this.shutdownTag.compareAndSet(false, true)) {
                try {
                    if (this.server != null) {
                        this.server.shutdown();
                    }
                    this.dispatcher.clear();
                    this.refs.clear();
                    this.endpointMatchCache.invalidateAll();
                    this.executors.shutdown();
                    this.clientEventLoopGroup.shutdownGracefully();
                } finally {
                    this.shutdownFuture.complete(null);
                }
            }
        }, "DispatchShutdown-" + this.identifier).start();
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Dispatcher
    public int getPort() {
        return this.server.getPort();
    }
}
