package kd.bos.algox.flink.enhance.krpc.impl;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import kd.bos.algox.flink.enhance.krpc.Actor;
import kd.bos.algox.flink.enhance.krpc.Dispatcher;
import kd.bos.algox.flink.enhance.krpc.KFencedAkkaRpcActor;
import kd.bos.algox.flink.enhance.krpc.MailBox;
import kd.bos.algox.flink.enhance.krpc.MsgPlus;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.ControlMessages;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcInvalidStateException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;
import org.apache.flink.util.Preconditions;

/* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl.class */
public class ActorImpl<T extends RpcEndpoint & RpcGateway> implements Actor {
    private final String endpointName;
    private final Dispatcher dispatcher;
    protected final T rpcEndpoint;
    private volatile RpcEndpointTerminationResult rpcEndpointTerminationResult;
    private final MainThreadValidatorUtil mainThreadValidator;
    private final CompletableFuture<Void> terminateFuture = new CompletableFuture<>();
    private final ReleasableResourceHolder resourceHolder = new ReleasableResourceHolder();
    private final MailBox mailBox = new MailBoxImpl(this);
    private final AtomicBoolean rpcEndpointStopped = new AtomicBoolean(false);
    private State state = StoppedState.STOPPED;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: kd.bos.algox.flink.enhance.krpc.impl.ActorImpl$1, reason: invalid class name */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$org$apache$flink$runtime$rpc$akka$ControlMessages = new int[ControlMessages.values().length];

        static {
            try {
                $SwitchMap$org$apache$flink$runtime$rpc$akka$ControlMessages[ControlMessages.START.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rpc$akka$ControlMessages[ControlMessages.STOP.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$org$apache$flink$runtime$rpc$akka$ControlMessages[ControlMessages.TERMINATE.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$RpcEndpointTerminationResult.class */
    public static final class RpcEndpointTerminationResult {
        private static final RpcEndpointTerminationResult SUCCESS = new RpcEndpointTerminationResult(null);
        private final Throwable failureCause;

        private RpcEndpointTerminationResult(Throwable th) {
            this.failureCause = th;
        }

        public boolean isSuccess() {
            return this.failureCause == null;
        }

        public Throwable getFailureCause() {
            Preconditions.checkState(this.failureCause != null);
            return this.failureCause;
        }

        private static RpcEndpointTerminationResult success() {
            return SUCCESS;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static RpcEndpointTerminationResult failure(Throwable th) {
            return new RpcEndpointTerminationResult(th);
        }

        /* JADX INFO: Access modifiers changed from: private */
        public static RpcEndpointTerminationResult of(Throwable th) {
            return th == null ? success() : failure(th);
        }

        static /* synthetic */ RpcEndpointTerminationResult access$500() {
            return success();
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$StartedState.class */
    public enum StartedState implements State {
        STARTED;

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State start(ActorImpl actorImpl) {
            return STARTED;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State stop() {
            return StoppedState.STOPPED;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State terminate(ActorImpl actorImpl) {
            CompletableFuture completedExceptionally;
            actorImpl.mainThreadValidator.enterMainThread();
            try {
                try {
                    completedExceptionally = actorImpl.rpcEndpoint.internalCallOnStop();
                    actorImpl.mainThreadValidator.exitMainThread();
                } catch (Throwable th) {
                    completedExceptionally = FutureUtils.completedExceptionally(new AkkaRpcException(String.format("Failure while stopping RpcEndpoint %s.", actorImpl.endpointName), th));
                    actorImpl.mainThreadValidator.exitMainThread();
                }
                completedExceptionally.whenComplete((r4, th2) -> {
                    actorImpl.stop(RpcEndpointTerminationResult.of(th2));
                });
                return TerminatingState.TERMINATING;
            } catch (Throwable th3) {
                actorImpl.mainThreadValidator.exitMainThread();
                throw th3;
            }
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public boolean isRunning() {
            return true;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$State.class */
    public interface State {
        default State start(ActorImpl actorImpl) {
            throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.STARTED));
        }

        default State stop() {
            throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.STOPPED));
        }

        default State terminate(ActorImpl actorImpl) {
            throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.TERMINATING));
        }

        default State finishTermination() {
            return TerminatedState.TERMINATED;
        }

        default boolean isRunning() {
            return false;
        }

        default String invalidStateTransitionMessage(State state) {
            return String.format("AkkaRpcActor is currently in state %s and cannot go into state %s.", this, state);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$StoppedState.class */
    public enum StoppedState implements State {
        STOPPED;

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State start(ActorImpl actorImpl) {
            actorImpl.mainThreadValidator.enterMainThread();
            try {
                actorImpl.rpcEndpoint.internalCallOnStart();
            } catch (Throwable th) {
                actorImpl.stop(RpcEndpointTerminationResult.failure(new AkkaRpcException(String.format("Could not start RpcEndpoint %s.", actorImpl.endpointName), th)));
            } finally {
                actorImpl.mainThreadValidator.exitMainThread();
            }
            return StartedState.STARTED;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State stop() {
            return STOPPED;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State terminate(ActorImpl actorImpl) {
            actorImpl.stop(RpcEndpointTerminationResult.access$500());
            return TerminatingState.TERMINATING;
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$TerminatedState.class */
    public enum TerminatedState implements State {
        TERMINATED
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/ActorImpl$TerminatingState.class */
    public enum TerminatingState implements State {
        TERMINATING;

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public State terminate(ActorImpl actorImpl) {
            return TERMINATING;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl.State
        public boolean isRunning() {
            return true;
        }
    }

    public ActorImpl(String str, Dispatcher dispatcher, T t) {
        this.endpointName = str;
        this.dispatcher = dispatcher;
        this.rpcEndpoint = t;
        this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(new AkkaRpcException(String.format("RpcEndpoint %s has not been properly stopped.", t.getEndpointId())));
        this.mainThreadValidator = new MainThreadValidatorUtil(t);
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Actor
    public void postMessage(MsgPlus msgPlus) {
        this.mailBox.postMessage(msgPlus);
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Actor
    public boolean isInProcess() {
        return this.mailBox.isInProcess();
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Actor
    public void process() {
        this.mailBox.process();
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Actor
    public void addReleasableResource(AutoCloseable autoCloseable) {
        this.resourceHolder.bind(autoCloseable);
    }

    private void handleControlMessage(ControlMessages controlMessages, MsgPlus msgPlus) {
        try {
            switch (AnonymousClass1.$SwitchMap$org$apache$flink$runtime$rpc$akka$ControlMessages[controlMessages.ordinal()]) {
                case 1:
                    this.state = this.state.start(this);
                    break;
                case 2:
                    this.state = this.state.stop();
                    break;
                case 3:
                    this.state = this.state.terminate(this);
                    break;
                default:
                    handleUnknownControlMessage(controlMessages, msgPlus);
                    break;
            }
        } catch (Exception e) {
            this.rpcEndpointTerminationResult = RpcEndpointTerminationResult.failure(e);
            throw e;
        }
    }

    private void handleUnknownControlMessage(ControlMessages controlMessages, MsgPlus msgPlus) {
        String format = String.format("Received unknown control message %s. Dropping this message!", controlMessages);
        DispatcherImpl.log.warn(format);
        msgPlus.responseException(new AkkaUnknownMessageException(format));
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Actor
    public CompletableFuture<Void> getActorTerminateFuture() {
        return this.terminateFuture;
    }

    @Override // kd.bos.algox.flink.enhance.krpc.Actor
    public void handleMsgPlus(MsgPlus msgPlus) {
        if (msgPlus instanceof PoisonPill) {
            this.state.terminate(this);
            return;
        }
        if (msgPlus.getMsg() instanceof ControlMessages) {
            handleControlMessage((ControlMessages) msgPlus.getMsg(), msgPlus);
            return;
        }
        if (msgPlus.getMsg() instanceof HeartbeatMessage) {
            msgPlus.responseSuccess(new HeartbeatMessage());
            return;
        }
        if (!this.state.isRunning()) {
            DispatcherImpl.log.info("The rpc endpoint {} has not been started yet. Discarding message {} until processing is started.", this.endpointName, msgPlus.getMsg().getClass().getName());
            msgPlus.responseException(new AkkaRpcException(String.format("Discard message, because the rpc endpoint %s has not been started yet.", this.endpointName)));
        } else {
            this.mainThreadValidator.enterMainThread();
            try {
                handleMsgPlus0(msgPlus);
            } finally {
                this.mainThreadValidator.exitMainThread();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public void handleMsgPlus0(MsgPlus msgPlus) {
        Object msg = msgPlus.getMsg();
        if (msg instanceof RunAsync) {
            handleRunAsync((RunAsync) msg, msgPlus);
            return;
        }
        if (msg instanceof CallAsync) {
            handleCallAsync((CallAsync) msg, msgPlus);
        } else if (msg instanceof RpcInvocation) {
            handleRpcInvocation((RpcInvocation) msg, msgPlus);
        } else {
            DispatcherImpl.log.warn("Received message of unknown type {} with value {}. Dropping this message!", msgPlus.getMsg().getClass().getName(), msgPlus.getMsg());
            msgPlus.responseException(new AkkaUnknownMessageException("Received unknown message " + msgPlus.getMsg() + " of type " + msgPlus.getMsg().getClass().getSimpleName() + '.'));
        }
    }

    private void handleRpcInvocation(RpcInvocation rpcInvocation, MsgPlus msgPlus) {
        Method method = null;
        try {
            method = this.rpcEndpoint.getClass().getMethod(rpcInvocation.getMethodName(), rpcInvocation.getParameterTypes());
        } catch (IOException e) {
            DispatcherImpl.log.error("Could not deserialize rpc invocation message.", e);
            msgPlus.responseException(new RpcConnectionException("Could not deserialize rpc invocation message.", e));
        } catch (ClassNotFoundException e2) {
            DispatcherImpl.log.error("Could not load method arguments.", e2);
            msgPlus.responseException(new RpcConnectionException("Could not load method arguments.", e2));
        } catch (NoSuchMethodException e3) {
            DispatcherImpl.log.error("Could not find rpc method for rpc invocation.", e3);
            msgPlus.responseException(new RpcConnectionException("Could not find rpc method for rpc invocation.", e3));
        }
        if (method != null) {
            try {
                method.setAccessible(true);
                if (method.getReturnType().equals(Void.TYPE)) {
                    method.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                } else {
                    try {
                        Object invoke = method.invoke(this.rpcEndpoint, rpcInvocation.getArgs());
                        if (invoke instanceof CompletableFuture) {
                            ((CompletableFuture) invoke).whenComplete((obj, th) -> {
                                if (th != null) {
                                    msgPlus.responseException(th);
                                } else {
                                    msgPlus.responseSuccess(obj);
                                }
                            });
                        } else {
                            msgPlus.responseSuccess(invoke);
                        }
                    } catch (InvocationTargetException e4) {
                        DispatcherImpl.log.debug("Reporting back error thrown in remote procedure {}", method, e4);
                        msgPlus.responseException(e4.getTargetException());
                    }
                }
            } catch (Throwable th2) {
                DispatcherImpl.log.error("Error while executing remote procedure call {}.", method, th2);
                msgPlus.responseException(th2);
            }
        }
    }

    private void handleCallAsync(CallAsync callAsync, MsgPlus msgPlus) {
        try {
            msgPlus.responseSuccess(callAsync.getCallable().call());
        } catch (Throwable th) {
            msgPlus.responseException(th);
        }
    }

    private void handleRunAsync(RunAsync runAsync, MsgPlus msgPlus) {
        long timeNanos = runAsync.getTimeNanos();
        if (timeNanos == 0 || timeNanos - System.nanoTime() <= 0) {
            try {
                runAsync.getRunnable().run();
                return;
            } catch (Throwable th) {
                DispatcherImpl.log.error("Caught exception while executing runnable in main thread.", th);
                throw new RuntimeException(th);
            }
        }
        UnfencedMessage runAsync2 = new RunAsync(runAsync.getRunnable(), timeNanos);
        if (getClass().equals(KFencedAkkaRpcActor.class)) {
            runAsync2 = new UnfencedMessage(runAsync2);
        }
        MsgPlus fencedMsg = msgPlus.fencedMsg(runAsync2);
        this.dispatcher.getScheduledExecutor().schedule(() -> {
            this.dispatcher.postMessage(fencedMsg);
        }, timeNanos - System.nanoTime(), TimeUnit.NANOSECONDS);
    }

    protected Object envelopeSelfMessage(Object obj) {
        return obj;
    }

    public void postStop() {
        this.resourceHolder.close();
        if (this.rpcEndpointTerminationResult.isSuccess()) {
            DispatcherImpl.log.debug("The RpcEndpoint {} terminated successfully.", this.rpcEndpoint.getEndpointId());
            this.terminateFuture.complete(null);
        } else {
            DispatcherImpl.log.info("The RpcEndpoint {} failed.", this.rpcEndpoint.getEndpointId(), this.rpcEndpointTerminationResult.getFailureCause());
            this.terminateFuture.completeExceptionally(this.rpcEndpointTerminationResult.getFailureCause());
        }
        this.state = this.state.finishTermination();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void stop(RpcEndpointTerminationResult rpcEndpointTerminationResult) {
        if (this.rpcEndpointStopped.compareAndSet(false, true)) {
            this.rpcEndpointTerminationResult = rpcEndpointTerminationResult;
            postStop();
        }
    }

    public String toString() {
        return "ActorImpl{endpointName='" + this.endpointName + "'}";
    }
}
