package kd.bos.algox.flink.enhance.krpc;

import java.io.Serializable;
import java.util.Objects;
import kd.bos.algox.flink.enhance.krpc.impl.ActorImpl;
import kd.bos.algox.flink.enhance.krpc.impl.DispatcherImpl;
import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.LocalFencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;

/* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/KFencedAkkaRpcActor.class */
public class KFencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends ActorImpl<T> {
    public KFencedAkkaRpcActor(String str, Dispatcher dispatcher, T t) {
        super(str, dispatcher, t);
    }

    @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl
    protected void handleMsgPlus0(MsgPlus msgPlus) {
        Object msg = msgPlus.getMsg();
        if (!(msg instanceof FencedMessage)) {
            if (msg instanceof UnfencedMessage) {
                super.handleMsgPlus0(msgPlus.fencedMsg(((UnfencedMessage) msg).getPayload()));
                return;
            }
            if (DispatcherImpl.log.isDebugEnabled()) {
                DispatcherImpl.log.debug("Unknown message type: Ignoring message {} because it is neither of type {} nor {}.", new Object[]{msg, FencedMessage.class.getSimpleName(), UnfencedMessage.class.getSimpleName()});
            }
            msgPlus.responseException(new AkkaUnknownMessageException("Unknown message type: Ignoring message " + msg + " of type " + msg.getClass().getSimpleName() + " because it is neither of type " + FencedMessage.class.getSimpleName() + " nor " + UnfencedMessage.class.getSimpleName() + '.'));
            return;
        }
        Serializable fencingToken = this.rpcEndpoint.getFencingToken();
        if (fencingToken == null) {
            if (DispatcherImpl.log.isDebugEnabled()) {
                DispatcherImpl.log.debug("Fencing token not set: Ignoring message {} because the fencing token is null.", msg);
            }
            msgPlus.responseException(new FencingTokenException(String.format("Fencing token not set: Ignoring message %s sent to %s because the fencing token is null.", msg, this.rpcEndpoint.getAddress())));
            return;
        }
        FencedMessage fencedMessage = (FencedMessage) msg;
        Serializable fencingToken2 = fencedMessage.getFencingToken();
        if (Objects.equals(fencingToken, fencingToken2)) {
            super.handleMsgPlus0(msgPlus.fencedMsg(fencedMessage.getPayload()));
            return;
        }
        if (DispatcherImpl.log.isDebugEnabled()) {
            DispatcherImpl.log.debug("Fencing token mismatch: Ignoring message {} because the fencing token {} did not match the expected fencing token {}.", new Object[]{msg, fencingToken2, fencingToken});
        }
        msgPlus.responseException(new FencingTokenException("Fencing token mismatch: Ignoring message " + msg + " because the fencing token " + fencingToken2 + " did not match the expected fencing token " + fencingToken + '.'));
    }

    @Override // kd.bos.algox.flink.enhance.krpc.impl.ActorImpl
    protected Object envelopeSelfMessage(Object obj) {
        return new LocalFencedMessage(this.rpcEndpoint.getFencingToken(), obj);
    }
}
