package kd.bos.algox.flink.enhance.krpc.impl.transport;

import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.ByteBufOutputStream;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import io.netty.handler.codec.LengthFieldPrepender;
import io.netty.handler.codec.MessageToByteEncoder;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.text.MessageFormat;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import kd.bos.algox.flink.enhance.krpc.Dispatcher;
import kd.bos.algox.flink.enhance.krpc.PortInfo;
import kd.bos.algox.flink.enhance.krpc.impl.DispatcherImpl;
import kd.bos.algox.flink.enhance.krpc.impl.RemoteMsg;
import kd.bos.algox.flink.enhance.krpc.impl.RemoteMsgPlus;
import kd.bos.algox.flink.enhance.krpc.impl.transport.Client;
import kd.bos.algox.flink.enhance.krpc.impl.transport.netty4.ByteBufAllocatorFactory;
import kd.bos.algox.flink.enhance.krpc.impl.transport.netty4.NioServerSocketChannelEnhance;
import kd.bos.algox.flink.enhance.krpc.impl.transport.netty4.NioSocketChannelEnhance;
import kd.bos.logging.Log;
import kd.bos.logging.LogFactory;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport.class */
public class NettyTransport {
    private static final Log log = LogFactory.getLog("NettyTransport");
    private Compress compress;
    private Dispatcher dispatcher;
    private boolean tcpNoDelay;
    private int connectTimeout = 5000;
    private long maxFrameSize = 2147483647L;
    private Serialize serialize = Utils.newHessianSerialize();
    private EventLoopGroup bossGroup = new NioEventLoopGroup(1);
    private EventLoopGroup workerGroup = new NioEventLoopGroup(2);

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$IgnoredCall.class */
    private static class IgnoredCall implements Client.Call {
        public static final IgnoredCall instance = new IgnoredCall();

        private IgnoredCall() {
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Client.Call
        public void onSuccess(Object obj) {
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Client.Call
        public void onFail(Throwable th) {
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$NettyClient.class */
    public static class NettyClient implements Client {
        private final String host;
        private final int port;
        private final NioEventLoopGroup workerGroup;
        private Compress compress;
        private Serialize serialize;
        private int connectTimeout;
        private Channel channel;
        private final AtomicBoolean connected = new AtomicBoolean(false);
        private final ConcurrentHashMap<String, Client.Call> waitList = new ConcurrentHashMap<>();
        private long maxFrameLength = 2147483647L;
        private boolean tcpNoDelay = true;

        public NettyClient(String str, int i, NioEventLoopGroup nioEventLoopGroup) {
            this.host = str;
            this.port = i;
            this.workerGroup = nioEventLoopGroup;
        }

        public void setTcpNoDelay(boolean z) {
            this.tcpNoDelay = z;
        }

        public void setConnectTimeout(int i) {
            this.connectTimeout = i;
        }

        public void setCompress(Compress compress) {
            this.compress = compress;
        }

        public void setSerialize(Serialize serialize) {
            this.serialize = serialize;
        }

        public void setMaxFrameLength(long j) {
            this.maxFrameLength = j;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Client
        public void connect() throws InterruptedException {
            if (this.connected.compareAndSet(false, true)) {
                Objects.requireNonNull(this.workerGroup);
                Objects.requireNonNull(this.serialize);
                doConnect();
            }
        }

        private void doConnect() throws InterruptedException {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(this.workerGroup);
            bootstrap.channel(NioSocketChannelEnhance.class);
            bootstrap.option(ChannelOption.SO_KEEPALIVE, true).option(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.tcpNoDelay)).option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Integer.valueOf(this.connectTimeout)).option(ChannelOption.ALLOCATOR, ByteBufAllocatorFactory.INSTANCE);
            bootstrap.handler(new ChannelInitializer<SocketChannel>() { // from class: kd.bos.algox.flink.enhance.krpc.impl.transport.NettyTransport.NettyClient.1
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder((int) NettyClient.this.maxFrameLength, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{new LengthFieldPrepender(4)}).addLast(new ChannelHandler[]{new PacketDecoder(NettyClient.this.compress, NettyClient.this.serialize)}).addLast(new ChannelHandler[]{new PacketEncoder(NettyClient.this.compress, NettyClient.this.serialize)}).addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: kd.bos.algox.flink.enhance.krpc.impl.transport.NettyTransport.NettyClient.1.1
                        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                            if (!(obj instanceof ResponsePacket)) {
                                NettyTransport.log.error("NettyClient receive unknown packet: " + obj);
                                return;
                            }
                            ResponsePacket responsePacket = (ResponsePacket) obj;
                            Client.Call call = (Client.Call) NettyClient.this.waitList.get(responsePacket.getId());
                            if (call == null) {
                                DispatcherImpl.log.error("Receive unknown package: " + responsePacket.getId());
                                return;
                            }
                            try {
                                if (responsePacket.isOk()) {
                                    call.onSuccess(responsePacket.getResponse());
                                } else {
                                    call.onFail(responsePacket.getError());
                                }
                            } finally {
                                NettyClient.this.waitList.remove(responsePacket.getId());
                            }
                        }
                    }});
                }
            });
            this.channel = bootstrap.connect(this.host, this.port).sync().channel();
            NettyTransport.log.info("NettyClient connect success, desc: " + toString());
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Client
        public void tell(RemoteMsg remoteMsg) {
            ask(remoteMsg, IgnoredCall.instance);
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Client
        public void ask(RemoteMsg remoteMsg, Client.Call call) {
            RequestPacket newRequest = RequestPacket.newRequest(remoteMsg);
            this.waitList.put(newRequest.getId(), call);
            this.channel.writeAndFlush(newRequest);
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Client
        public void close() {
            this.channel.close();
        }

        public String toString() {
            Object[] objArr = new Object[5];
            objArr[0] = this.host;
            objArr[1] = Integer.valueOf(this.port);
            objArr[2] = this.connected.get() ? "connected" : "not_connect";
            objArr[3] = this.compress == null ? "null" : this.compress.getName();
            objArr[4] = this.serialize == null ? "null" : this.serialize.getName();
            return MessageFormat.format("NettyClient({0}:{1}):status={2},compress={3},serialize={4}", objArr);
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$NettyServer.class */
    public static class NettyServer implements Server {
        private final String bindAddress;
        private final PortInfo port;
        private Compress compress;
        private Serialize serialize;
        private Dispatcher dispatcher;
        private Channel serverChannel;
        private EventLoopGroup bossGroup;
        private EventLoopGroup workerGroup;
        private final AtomicBoolean started = new AtomicBoolean(false);
        private int bindPort = -1;
        private long maxFrameLength = 104857600;
        private boolean tcpNoDelay = true;

        public NettyServer(String str, PortInfo portInfo) {
            this.bindAddress = str;
            this.port = portInfo;
        }

        public void setTcpNoDelay(boolean z) {
            this.tcpNoDelay = z;
        }

        public void setDispatcher(Dispatcher dispatcher) {
            this.dispatcher = dispatcher;
        }

        public void setCompress(Compress compress) {
            this.compress = compress;
        }

        public void setSerialize(Serialize serialize) {
            this.serialize = serialize;
        }

        public void setBossGroup(EventLoopGroup eventLoopGroup) {
            this.bossGroup = eventLoopGroup;
        }

        public void setWorkerGroup(EventLoopGroup eventLoopGroup) {
            this.workerGroup = eventLoopGroup;
        }

        public void setMaxFrameLength(long j) {
            this.maxFrameLength = j;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Server
        public void start() throws InterruptedException {
            if (this.started.compareAndSet(false, true)) {
                Objects.requireNonNull(this.serialize);
                doStart();
            }
        }

        private void doStart() throws InterruptedException {
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(this.bossGroup, this.workerGroup).channel(NioServerSocketChannelEnhance.class).childOption(ChannelOption.TCP_NODELAY, Boolean.valueOf(this.tcpNoDelay)).childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE).childOption(ChannelOption.ALLOCATOR, ByteBufAllocatorFactory.INSTANCE).childHandler(new ChannelInitializer<SocketChannel>() { // from class: kd.bos.algox.flink.enhance.krpc.impl.transport.NettyTransport.NettyServer.1
                public void initChannel(SocketChannel socketChannel) throws Exception {
                    socketChannel.pipeline().addLast(new ChannelHandler[]{new LengthFieldBasedFrameDecoder((int) NettyServer.this.maxFrameLength, 0, 4, 0, 4)}).addLast(new ChannelHandler[]{new LengthFieldPrepender(4)}).addLast(new ChannelHandler[]{new PacketDecoder(NettyServer.this.compress, NettyServer.this.serialize)}).addLast(new ChannelHandler[]{new PacketEncoder(NettyServer.this.compress, NettyServer.this.serialize)}).addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter() { // from class: kd.bos.algox.flink.enhance.krpc.impl.transport.NettyTransport.NettyServer.1.1
                        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
                            if (!(obj instanceof RequestPacket)) {
                                NettyTransport.log.error("NettyServer receive unknown packet: " + obj);
                                return;
                            }
                            RequestPacket requestPacket = (RequestPacket) obj;
                            NettyServer.this.dispatcher.postMessage(new RemoteMsgPlus(requestPacket.getId(), requestPacket.getRequestData(), new NettyServerSender(channelHandlerContext.channel())));
                        }
                    }});
                }
            }).option(ChannelOption.SO_BACKLOG, 128).childOption(ChannelOption.SO_KEEPALIVE, true);
            int start = this.port.getStart();
            int end = this.port.isRange() ? this.port.getEnd() : start;
            Exception exc = null;
            for (int i = start; i <= end; i++) {
                try {
                    this.serverChannel = serverBootstrap.bind(this.bindAddress, i).sync().channel();
                    this.bindPort = i;
                    exc = null;
                    break;
                } catch (Exception e) {
                    exc = e;
                }
            }
            if (exc != null) {
                String str = "NettyServer start fail on: " + this.bindAddress + ":" + (this.port.isRange() ? "[" + start + "-" + end + "]" : String.valueOf(start)) + ", " + exc.getMessage();
                NettyTransport.log.error(str, exc);
                throw new RuntimeException(str, exc);
            }
            NettyTransport.log.info("NettyServer start success, desc: " + toString());
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Server
        public void shutdown() {
            this.serverChannel.close();
            this.bossGroup.shutdownGracefully();
            this.workerGroup.shutdownGracefully();
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.Server
        public int getPort() {
            return this.bindPort;
        }

        public String toString() {
            Object[] objArr = new Object[5];
            objArr[0] = this.bindAddress;
            objArr[1] = Integer.valueOf(this.bindPort);
            objArr[2] = this.started.get() ? "started" : "not_start";
            objArr[3] = this.compress == null ? "null" : this.compress.getName();
            objArr[4] = this.serialize == null ? "null" : this.serialize.getName();
            return MessageFormat.format("NettyServer({0}:{1}):status={2},compress={3},serialize={4}", objArr);
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$NettyServerSender.class */
    public static class NettyServerSender implements ServerSender {
        private final Channel channel;

        public NettyServerSender(Channel channel) {
            this.channel = channel;
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.ServerSender
        public void send(String str, Object obj) {
            this.channel.writeAndFlush(ResponsePacket.success(str, obj));
        }

        @Override // kd.bos.algox.flink.enhance.krpc.impl.transport.ServerSender
        public void sendFail(String str, Throwable th) {
            this.channel.writeAndFlush(ResponsePacket.fail(str, th));
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$NettyThreadFactory.class */
    public static class NettyThreadFactory implements ThreadFactory {
        private final String name;
        private final AtomicInteger count = new AtomicInteger();

        public NettyThreadFactory(String str) {
            this.name = str;
        }

        @Override // java.util.concurrent.ThreadFactory
        public Thread newThread(@NotNull Runnable runnable) {
            Thread thread = new Thread(runnable);
            thread.setName(MessageFormat.format("{0}-{1}", this.name, Integer.valueOf(this.count.getAndIncrement())));
            return thread;
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$PacketDecoder.class */
    public static class PacketDecoder extends ByteToMessageDecoder {
        private final Compress compress;
        private final Serialize serialize;

        public PacketDecoder(Compress compress, Serialize serialize) {
            this.compress = compress;
            this.serialize = serialize;
        }

        protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
            list.add(this.serialize.deserialize(new BufferedInputStream(new ByteBufInputStream(byteBuf), 65536)));
        }
    }

    /* loaded from: input_file:kd/bos/algox/flink/enhance/krpc/impl/transport/NettyTransport$PacketEncoder.class */
    public static class PacketEncoder extends MessageToByteEncoder<Object> {
        private final Serialize serialize;

        public PacketEncoder(Compress compress, Serialize serialize) {
            super(false);
            this.serialize = serialize;
        }

        protected void encode(ChannelHandlerContext channelHandlerContext, Object obj, ByteBuf byteBuf) throws Exception {
            this.serialize.serialize(obj, new BufferedOutputStream(new ByteBufOutputStream(byteBuf), 65536));
        }
    }

    public void setMaxFrameSize(long j) {
    }

    public void setBossGroup(EventLoopGroup eventLoopGroup) {
        this.bossGroup = eventLoopGroup;
    }

    public void setWorkerGroup(EventLoopGroup eventLoopGroup) {
        this.workerGroup = eventLoopGroup;
    }

    public void setConnectTimeout(int i) {
        this.connectTimeout = i;
    }

    public void setCompress(Compress compress) {
        this.compress = compress;
    }

    public void setSerialize(Serialize serialize) {
        this.serialize = serialize;
    }

    public void setDispatcher(Dispatcher dispatcher) {
        this.dispatcher = dispatcher;
    }

    public void setTcpNoDelay(boolean z) {
        this.tcpNoDelay = z;
    }

    public Client buildClient(String str, int i, NioEventLoopGroup nioEventLoopGroup) {
        NettyClient nettyClient = new NettyClient(str, i, nioEventLoopGroup);
        nettyClient.setCompress(this.compress);
        nettyClient.setSerialize(this.serialize);
        nettyClient.setConnectTimeout(this.connectTimeout);
        nettyClient.setMaxFrameLength(this.maxFrameSize);
        nettyClient.setTcpNoDelay(this.tcpNoDelay);
        return nettyClient;
    }

    public Server buildServer(String str, PortInfo portInfo) {
        NettyServer nettyServer = new NettyServer(str, portInfo);
        nettyServer.setBossGroup(this.bossGroup);
        nettyServer.setWorkerGroup(this.workerGroup);
        nettyServer.setCompress(this.compress);
        nettyServer.setSerialize(this.serialize);
        nettyServer.setDispatcher(this.dispatcher);
        nettyServer.setMaxFrameLength(this.maxFrameSize);
        nettyServer.setTcpNoDelay(this.tcpNoDelay);
        return nettyServer;
    }
}
