package kd.bos.mq.rabbit;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import com.rabbitmq.client.ShutdownSignalException;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.LockSupport;
import kd.bos.context.RequestContext;
import kd.bos.context.RequestContextCreator;
import kd.bos.db.DBRoute;
import kd.bos.exception.BosErrorCode;
import kd.bos.exception.KDException;
import kd.bos.instance.Instance;
import kd.bos.metric.Counter;
import kd.bos.metric.MetricSystem;
import kd.bos.mq.MessageConsumer;
import kd.bos.mq.delay.DelayControlManager;
import kd.bos.mq.delay.MetaTime;
import kd.bos.mq.delay.RabbitMQDelayManager;
import kd.bos.mq.dlx.DLXConfig;
import kd.bos.mq.dlx.DLXMesPubFactory;
import kd.bos.mq.dlx.DLXStrategy;
import kd.bos.mq.dlx.MessageRecord;
import kd.bos.mq.stat.ConsumerStats;
import kd.bos.mq.support.Consumer;
import kd.bos.mq.support.KdtxSupport;
import kd.bos.mq.support.Message;
import kd.bos.mq.support.MessageSerde;
import kd.bos.mq.support.QueueManager;
import kd.bos.mq.support.TimeCacheMap;
import kd.bos.mq.support.TranscationSupport;
import kd.bos.thread.SetThreadName;
import kd.bos.threads.ThreadPool;
import kd.bos.threads.ThreadPools;
import kd.bos.trace.TraceSpan;
import kd.bos.trace.Tracer;
import kd.bos.trace.util.TraceIdUtil;
import kd.bos.util.StringUtils;

/* loaded from: input_file:kd/bos/mq/rabbit/RabbitConsumer.class */
public class RabbitConsumer extends DefaultConsumer implements Consumer {
    private static Counter noRequestContextCounter = MetricSystem.counter("kd.metrics.mq.consumer.noRequestContextCounter");
    private static Counter decodeMessageErrorCounter = MetricSystem.counter("kd.metrics.mq.consumer.decodeMessageErrorCounter");
    private static Counter handlerMessagerErrorCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerOnMessagerErrorCounter");
    private static Counter handlerMessagerTranscationRollBackCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerMessagerTranscationRollBackCounter");
    private static Counter handlerMessagerTranscationRepeateCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerMessagerTranscationRepeateCounter");
    private static Counter handlerMessagerRepeateCounter = MetricSystem.counter("kd.metrics.mq.consumer.handlerMessagerRepeateCounter");
    private static Counter consumerounter = MetricSystem.counter("kd.metrics.mq.consumer.Counter");
    private static Counter activeCounter = MetricSystem.counter("kd.metrics.mq.consumer.activeConsumers");
    private static String maxPoolCount = System.getProperty("mq.consumer.maxpoolsize", "32");
    private static ThreadPool pool = ThreadPools.newCachedThreadPool("RabbitMqAsyncConsumer", 5, Integer.parseInt(maxPoolCount));
    private static Map<String, AtomicInteger> queueActiveCountMap = new ConcurrentHashMap(2);
    private final AtomicInteger queueActiveThreadCount;
    private Channel channel;
    private String queue;
    private boolean autoAck;
    private int concurrency;
    private int maxQueueLength;
    private MessageConsumer mc;
    private String region;
    private volatile boolean isStartedFlag;
    private TimeCacheMap<AtomicInteger> messageRepeatTimes;
    private Date startAt;

    @FunctionalInterface
    /* loaded from: input_file:kd/bos/mq/rabbit/RabbitConsumer$AckedCallBack.class */
    public interface AckedCallBack {
        void call();
    }

    public RabbitConsumer(String str, String str2, boolean z, int i, MessageConsumer messageConsumer, int i2, Channel channel) {
        super(channel);
        this.messageRepeatTimes = new TimeCacheMap<>(300);
        this.channel = channel;
        this.region = str;
        this.queue = str2;
        this.autoAck = z;
        this.concurrency = i;
        this.mc = messageConsumer;
        this.maxQueueLength = i2;
        this.queueActiveThreadCount = queueActiveCountMap.computeIfAbsent(str + str2, str3 -> {
            return new AtomicInteger(0);
        });
        consumerounter.inc();
    }

    private static boolean isforbiddenTenantCode(String str) {
        String[] split;
        if (Instance.isPausedServiceByMonitor()) {
            return true;
        }
        String property = System.getProperty("mq.consume.forbidden.tenantcodes");
        if (property == null || (split = property.split(",|;")) == null) {
            return false;
        }
        for (String str2 : split) {
            if (str.equalsIgnoreCase(str2)) {
                return true;
            }
        }
        return false;
    }

    public Channel getChannel() {
        return this.channel;
    }

    public void setChannel(Channel channel) {
        this.channel = channel;
    }

    @Override // kd.bos.mq.support.Consumer
    public void setConcurrency(int i) {
        this.concurrency = i;
    }

    @Override // kd.bos.mq.support.Consumer
    public String getRegion() {
        return this.region;
    }

    @Override // kd.bos.mq.support.Consumer
    public String getQueueName() {
        return this.queue;
    }

    public void setQueue(String str) {
        this.queue = str;
    }

    @Override // kd.bos.mq.support.Consumer
    public boolean isAutoAck() {
        return this.autoAck;
    }

    @Override // kd.bos.mq.support.Consumer
    public int getConcurrency() {
        return this.concurrency;
    }

    @Override // kd.bos.mq.support.Consumer
    public int getMaxQueueLength() {
        return this.maxQueueLength;
    }

    @Override // kd.bos.mq.support.Consumer
    public MessageConsumer getMessageConsumer() {
        return this.mc;
    }

    @Override // kd.bos.mq.support.Consumer
    public void start() {
        this.isStartedFlag = true;
        this.startAt = new Date();
        if (this.queueActiveThreadCount.get() > 0) {
            ExceptionLogger.warn("rabbitConsumer not start because of activeThread is " + this.queueActiveThreadCount.get() + ",need " + this.concurrency + "," + this.queue);
            try {
                this.channel.close();
                return;
            } catch (Exception e) {
                ExceptionLogger.log("Can't close channel for queue " + this.queue, e);
                return;
            }
        }
        QueueManager.declareIfNeed(this.channel, this.region, this.queue, this.maxQueueLength);
        try {
            this.channel.basicQos(this.concurrency);
            this.channel.basicConsume(this.queue, this.autoAck, this);
        } catch (IOException e2) {
            ExceptionLogger.log("Can't init consumer for queue " + this.queue, e2);
            throw new KDException(e2, BosErrorCode.mqException, new Object[]{"Can't init consumer for queue " + this.queue});
        }
    }

    @Override // kd.bos.mq.support.Consumer
    public void $$stop() {
        try {
            this.isStartedFlag = false;
            this.channel.close();
        } catch (Exception e) {
            ExceptionLogger.log("error when stop mqchannel" + this.queue, e);
        }
    }

    @Override // kd.bos.mq.support.Consumer
    public boolean isStarted() {
        return this.isStartedFlag;
    }

    @Override // kd.bos.mq.support.Consumer
    public Date getStartAt() {
        return this.startAt;
    }

    public void handleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) throws IOException {
        if (Config.disableParallelConsume()) {
            innerHandleDelivery(str, envelope, basicProperties, bArr);
        } else {
            pool.execute(() -> {
                try {
                    this.queueActiveThreadCount.incrementAndGet();
                    innerHandleDelivery(str, envelope, basicProperties, bArr);
                    this.queueActiveThreadCount.decrementAndGet();
                } catch (Throwable th) {
                    this.queueActiveThreadCount.decrementAndGet();
                    throw th;
                }
            });
        }
    }

    private void innerHandleDelivery(String str, Envelope envelope, AMQP.BasicProperties basicProperties, byte[] bArr) {
        RabbitAcker rabbitAcker = new RabbitAcker(this.channel, this.autoAck);
        String valueOf = String.valueOf(envelope.getDeliveryTag());
        try {
            Message message = toMessage(bArr);
            if (renewDelay(valueOf, message, rabbitAcker)) {
                return;
            }
            if (StringUtils.isNotEmpty(message.getKdtxId())) {
                _handleDtxDelivery(envelope, message, rabbitAcker);
                return;
            }
            long innerId = message.getInnerId();
            if (innerId != 0) {
                AtomicInteger atomicInteger = this.messageRepeatTimes.get(Long.valueOf(innerId));
                if (atomicInteger == null) {
                    atomicInteger = new AtomicInteger(0);
                    this.messageRepeatTimes.put(Long.valueOf(innerId), atomicInteger);
                }
                if (atomicInteger.getAndIncrement() > 100) {
                    rabbitAcker.discard(valueOf);
                    this.messageRepeatTimes.remove(Long.valueOf(innerId));
                    handlerMessagerRepeateCounter.inc();
                    ExceptionLogger.log("MQError:Repeat send message too many times, auto discard, messageId=" + valueOf + ",queue=" + this.queue);
                    return;
                }
            }
            try {
                try {
                    _handleDelivery(str, envelope, valueOf, message, rabbitAcker);
                    if (rabbitAcker.isDenied()) {
                        return;
                    }
                    this.messageRepeatTimes.remove(Long.valueOf(innerId));
                } catch (Error | Exception e) {
                    if (rabbitAcker.hasDone()) {
                        ExceptionLogger.log("MQError:handleDelivery uncatched exception, messageId=" + valueOf, e);
                    } else {
                        int tryTimes = TranscationSupport.instance().tryTimes(String.valueOf(message.getInnerId()));
                        if (tryTimes > 30) {
                            rabbitAcker.discard(valueOf);
                            ExceptionLogger.log("MQError:handleDelivery uncatched exception,try times >30, auto discard, messageId=" + valueOf, e);
                        } else {
                            applyWait(tryTimes);
                            rabbitAcker.deny(valueOf);
                            ExceptionLogger.log("MQError:handleDelivery uncatched exception , auto deny, messageId=" + valueOf, e);
                        }
                    }
                    if (rabbitAcker.isDenied()) {
                        return;
                    }
                    this.messageRepeatTimes.remove(Long.valueOf(innerId));
                }
            } catch (Throwable th) {
                if (!rabbitAcker.isDenied()) {
                    this.messageRepeatTimes.remove(Long.valueOf(innerId));
                }
                throw th;
            }
        } catch (Exception e2) {
            rabbitAcker.discard(valueOf);
            decodeMessageErrorCounter.inc();
            ExceptionLogger.log("MQError:toMessage exception, auto discard, messageId=" + valueOf + ",queue:" + this.queue, e2);
        }
    }

    private boolean renewDelay(String str, Message message, RabbitAcker rabbitAcker) {
        int startDeliverTime = (int) ((message.getStartDeliverTime() - System.currentTimeMillis()) / 1000);
        if (message.getStartDeliverTime() <= 0 || startDeliverTime <= 0) {
            return false;
        }
        try {
            RabbitMQDelayManager.publishDelayMessageConfirmModel(MessageSerde.get().encode(message), this.channel, this.queue, DelayControlManager.selectMaxMetaTime(startDeliverTime));
            rabbitAcker.discard(str);
            return true;
        } catch (Exception e) {
            rabbitAcker.deny(str);
            ExceptionLogger.log("renew delay for queue " + this.queue + " error ", e);
            return true;
        }
    }

    private void _handleDtxDelivery(Envelope envelope, Message message, RabbitAcker rabbitAcker) {
        String valueOf = String.valueOf(envelope.getDeliveryTag());
        boolean z = false;
        DBRoute of = DBRoute.of(message.getRouteKey());
        try {
            try {
                if (!createTraceAndRequestContext(message)) {
                    rabbitAcker.discard(valueOf);
                    KdtxSupport.received(of, message.getKdtxId(), message.getSeq());
                    ExceptionLogger.log("MQError:message has`t requestContext, auto discard, messageId=" + valueOf);
                    noRequestContextCounter.inc();
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                        if (0 != 0) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                        return;
                    }
                    return;
                }
                boolean isReceived = KdtxSupport.isReceived(of, message.getKdtxId(), message.getSeq());
                if (!isReceived) {
                    try {
                        KdtxSupport.received(of, message.getKdtxId(), message.getSeq());
                    } catch (Exception e) {
                        rabbitAcker.deny(valueOf);
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "MQError:message update receive statue error, will requeue", true);
                        noRequestContextCounter.inc();
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                            if (0 != 0) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                            return;
                        }
                        return;
                    }
                } else if (!envelope.isRedeliver()) {
                    boolean isManual = KdtxSupport.isManual(message.getKdtxId(), message.getSeq());
                    z = isManual;
                    if (!isManual) {
                        rabbitAcker.discard(valueOf);
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                            if (z) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                            return;
                        }
                        return;
                    }
                } else if (KdtxSupport.mqSecondCompensate(message.getKdtxId(), message.getSeq())) {
                    rabbitAcker.discard(valueOf);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                        if (0 != 0) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                        return;
                    }
                    return;
                }
                if (message.getKdtxId().equals(message.getTranscationTag())) {
                    if (TranscationSupport.instance().isRollBack(message.getTranscationTag())) {
                        rabbitAcker.isDiscarded();
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                            if (z) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                            return;
                        }
                        return;
                    }
                    if (!TranscationSupport.instance().existXid(message.getRouteKey(), message.getTranscationTag())) {
                        rabbitAcker.isDiscarded();
                        KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                        if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 0 != 0)) {
                            if (z) {
                                KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                            }
                            KdtxSupport.endConsumer(of, message.getKdtxId());
                            return;
                        }
                        return;
                    }
                }
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), String.format("message[isReceived:%s,isManual:%s] start execute", Boolean.valueOf(isReceived), Boolean.valueOf(z)), false);
                if (isforbiddenTenantCode(RequestContext.get().getTenantCode())) {
                    LockSupport.parkNanos(1000000000L);
                    rabbitAcker.deny(valueOf);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                        if (z) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                        return;
                    }
                    return;
                }
                if (DLXConfig.isSendDLX(message.getRequestContext())) {
                    message.setDLXMessage(true);
                    DLXMesPubFactory.getDLXMessagePublisher().sendMessage(this.region, this.queue, message, valueOf, rabbitAcker);
                    KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                    if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                        if (z) {
                            KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                        }
                        KdtxSupport.endConsumer(of, message.getKdtxId());
                        return;
                    }
                    return;
                }
                doTraceAndConsume(envelope, valueOf, message, rabbitAcker);
                if (message.isDLXMessage() && DLXConfig.getDLXStrategy() == DLXStrategy.DEFAULT) {
                    MessageRecord.update(2, message.getInnerId());
                }
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                    if (z) {
                        KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                    }
                    KdtxSupport.endConsumer(of, message.getKdtxId());
                }
            } catch (Throwable th) {
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
                if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                    if (0 != 0) {
                        KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                    }
                    KdtxSupport.endConsumer(of, message.getKdtxId());
                }
                throw th;
            }
        } catch (Error | Exception e2) {
            if (!rabbitAcker.hasDone()) {
                applyWait(1);
                rabbitAcker.deny(valueOf);
                KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "common exception auto deny,errorMsg:" + e2.getMessage(), true);
            }
            ExceptionLogger.log("MQError:handleDelivery uncatched exception , auto deny, messageId=" + valueOf, e2);
            KdtxSupport.recordLog(message.getKdtxId(), message.getSeq(), "execute finished", false);
            if (rabbitAcker.isAcked() || (rabbitAcker.isDiscarded() && 1 != 0)) {
                if (0 != 0) {
                    KdtxSupport.compensateSuccess(message.getKdtxId(), message.getSeq());
                }
                KdtxSupport.endConsumer(of, message.getKdtxId());
            }
        }
    }

    public void _handleDelivery(String str, Envelope envelope, String str2, Message message, RabbitAcker rabbitAcker) {
        if (!createTraceAndRequestContext(message)) {
            rabbitAcker.discard(str2);
            ExceptionLogger.log("MQError:message has`t requestContext, auto discard, messageId=" + str2);
            noRequestContextCounter.inc();
            return;
        }
        if (isforbiddenTenantCode(RequestContext.get().getTenantCode())) {
            LockSupport.parkNanos(1000000000L);
            rabbitAcker.deny(str2);
            return;
        }
        if (DLXConfig.isSendDLX(message.getRequestContext())) {
            message.setDLXMessage(true);
            DLXMesPubFactory.getDLXMessagePublisher().sendMessage(this.region, this.queue, message, str2, rabbitAcker);
        } else if (isTransactionSubmit(str2, message, rabbitAcker)) {
            doTraceAndConsume(envelope, str2, message, rabbitAcker);
            if (message.isDLXMessage() && DLXConfig.getDLXStrategy() == DLXStrategy.DEFAULT) {
                MessageRecord.update(2, message.getInnerId());
            }
        }
    }

    private boolean isTransactionSubmit(String str, Message message, RabbitAcker rabbitAcker) {
        String transcationTag = message.getTranscationTag();
        if (transcationTag == null || this.autoAck) {
            return true;
        }
        int i = -1;
        boolean isMessageTimeout = isMessageTimeout(message);
        while (!TranscationSupport.instance().isRollBack(transcationTag)) {
            boolean existXid = TranscationSupport.instance().existXid(message.getRouteKey(), transcationTag);
            i++;
            if (3 != i && !isMessageTimeout) {
                applyWait(i);
                if (existXid) {
                }
            }
            if (!existXid) {
                waitDeal(str, message, rabbitAcker, transcationTag);
                return false;
            }
            String routeKey = message.getRouteKey();
            String routeKey2 = this.mc.getRouteKey();
            if (routeKey2 == null) {
                routeKey2 = routeKey;
            }
            if (TranscationSupport.instance().existConsumedId(routeKey2, transcationTag)) {
                rabbitAcker.discard(str);
                handlerMessagerTranscationRepeateCounter.inc();
                return false;
            }
            String str2 = routeKey2;
            rabbitAcker.setAckedCallBack(() -> {
                try {
                    TranscationSupport.instance().insertConsumed(str2, transcationTag);
                    TranscationSupport.instance().deleteXid(routeKey, transcationTag);
                } catch (Exception e) {
                    ExceptionLogger.log("mq TranscationSupport error of acktion" + str, e);
                }
            });
            return true;
        }
        rabbitAcker.discard(str);
        handlerMessagerTranscationRollBackCounter.inc();
        return false;
    }

    private void waitDeal(String str, Message message, RabbitAcker rabbitAcker, String str2) {
        if (message.getRetryTimes() != -1) {
            delayStrategy(str, message, rabbitAcker);
        } else {
            timesStrategy(str, message, rabbitAcker, str2);
        }
    }

    private void delayStrategy(String str, Message message, RabbitAcker rabbitAcker) {
        int retryTimes = message.getRetryTimes();
        if (retryTimes >= 14) {
            rabbitAcker.discard(str);
            return;
        }
        int i = retryTimes + 3;
        message.setRetryTimes(retryTimes + 1);
        byte[] encode = MessageSerde.get().encode(message);
        MetaTime genInstanceByLevel = MetaTime.genInstanceByLevel(i);
        if (genInstanceByLevel != null) {
            try {
                RabbitMQDelayManager.publishDelayMessageConfirmModel(encode, this.channel, this.queue, genInstanceByLevel);
                rabbitAcker.discard(str);
            } catch (Exception e) {
                rabbitAcker.deny(str);
                ExceptionLogger.log("publishDelayMessageConfirmModel for queue " + this.queue + " error ", e);
            }
        }
    }

    private void timesStrategy(String str, Message message, RabbitAcker rabbitAcker, String str2) {
        int tryTimes = TranscationSupport.instance().tryTimes(str2);
        if (tryTimes > 30 || isMessageTimeout(message)) {
            rabbitAcker.discard(str);
            handlerMessagerTranscationRollBackCounter.inc();
        } else {
            applyWait(tryTimes);
            rabbitAcker.deny(str);
        }
    }

    private void doTraceAndConsume(Envelope envelope, String str, Message message, RabbitAcker rabbitAcker) {
        TraceSpan create = Tracer.create("MQConsumer", "onMessage", true);
        Throwable th = null;
        try {
            create.addTag("messageId", str);
            create.addTag("resent", Boolean.toString(envelope.isRedeliver()));
            create.addTag("queue", this.queue);
            create.addTag("region", this.region);
            try {
                try {
                    activeCounter.inc();
                    ConsumerStats.incrementActiveCount(this.queue, str);
                    this.mc.onMessage(message.getBody(), str, envelope.isRedeliver(), rabbitAcker);
                    if (!rabbitAcker.hasDone()) {
                        rabbitAcker.ack(str);
                    }
                    activeCounter.dec();
                    ConsumerStats.decrementActiveCount(this.queue, str);
                } catch (Throwable th2) {
                    activeCounter.dec();
                    ConsumerStats.decrementActiveCount(this.queue, str);
                    throw th2;
                }
            } catch (Exception e) {
                if (rabbitAcker.hasDone()) {
                    ExceptionLogger.log("MQError:onMessage uncatched exception, messageId=" + str, e);
                } else {
                    handlerMessagerErrorCounter.inc();
                    ExceptionLogger.log("MQError:onMessage uncatched exception, auto discard, messageId=" + str, e);
                    rabbitAcker.discard(str);
                }
                activeCounter.dec();
                ConsumerStats.decrementActiveCount(this.queue, str);
            }
            if (create != null) {
                if (0 == 0) {
                    create.close();
                    return;
                }
                try {
                    create.close();
                } catch (Throwable th3) {
                    th.addSuppressed(th3);
                }
            }
        } catch (Throwable th4) {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    create.close();
                }
            }
            throw th4;
        }
    }

    private boolean isMessageTimeout(Message message) {
        return System.currentTimeMillis() - message.getMessageTime() > ((long) Integer.parseInt(System.getProperty("mq.trasncation.message.waittime", "300"))) * 1000;
    }

    private boolean createTraceAndRequestContext(Message message) {
        RequestContext requestContext = message.getRequestContext();
        if (requestContext == null) {
            return false;
        }
        String createTraceIdString = TraceIdUtil.createTraceIdString();
        SetThreadName.setTraceId(createTraceIdString);
        requestContext.setTraceId(createTraceIdString);
        RequestContextCreator.restoreForMQ(requestContext);
        return true;
    }

    private void applyWait(int i) {
        if (i < 1) {
            return;
        }
        if (i == 1) {
            LockSupport.parkNanos(100000000L);
        } else {
            LockSupport.parkNanos(1000000000 * i);
        }
    }

    public void handleShutdownSignal(String str, ShutdownSignalException shutdownSignalException) {
        super.handleShutdownSignal(str, shutdownSignalException);
        if (shutdownSignalException.getReason().getReplyCode() != 200) {
            if (this.queueActiveThreadCount.get() > 0) {
                ExceptionLogger.warn("rabbitConsumer not restart because of activeThread is " + this.queueActiveThreadCount.get() + ",need " + this.concurrency + "," + this.queue);
                return;
            }
            try {
                this.channel = ChannelFactory.getChannel(this.region);
                this.channel.basicQos(this.concurrency);
                this.channel.basicConsume(this.queue, this.autoAck, this);
                this.startAt = new Date();
                ExceptionLogger.log("reCreateConsumerChannelForQueue:" + this.queue, shutdownSignalException);
            } catch (Exception e) {
                ExceptionLogger.log("Can't init consumer for queue " + this.queue, e);
                throw new KDException(shutdownSignalException, BosErrorCode.mqException, new Object[]{"Can't init consumer for queue " + this.queue});
            }
        }
    }

    private Message toMessage(byte[] bArr) {
        return MessageSerde.get().decode(bArr);
    }
}
