package kd.bos.logging.logback.kafka;

import java.util.concurrent.ArrayBlockingQueue;
import kd.bos.metric.Gauge;
import kd.bos.metric.Meter;
import kd.bos.metric.MetricSystem;
import kd.bos.util.ConfigurationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kd/bos/logging/logback/kafka/KafkaBlockingQueue.class */
public class KafkaBlockingQueue {
    private static ArrayBlockingQueue<KafkaMessage<?>> queue;
    private static IKafkaSender<?> kafkaSender;
    private static final String config_prefix_key = "metric.log.kafka.prefix";
    private static Gauge<Integer> queueSize;
    private static Meter discardCount;
    private static int QUEUE_CAPACITY = 10000;
    private static boolean STOPFLAG = false;
    private static String prefix = "kd.metrics.log.kafka.";
    private static final Logger logger = LoggerFactory.getLogger(KafkaBlockingQueue.class);

    /* loaded from: input_file:kd/bos/logging/logback/kafka/KafkaBlockingQueue$Poll.class */
    static class Poll implements Runnable {
        Poll() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (!KafkaBlockingQueue.STOPFLAG) {
                try {
                    KafkaBlockingQueue.kafkaSender.send((KafkaMessage) KafkaBlockingQueue.queue.take());
                } catch (InterruptedException e) {
                    KafkaBlockingQueue.logger.error(e.getMessage(), e);
                } catch (Exception e2) {
                    KafkaBlockingQueue.logger.error(e2.getMessage(), e2);
                }
            }
        }
    }

    public static void init(IKafkaSender<?> iKafkaSender) {
        kafkaSender = iKafkaSender;
        STOPFLAG = false;
        Thread thread = new Thread(new Poll(), "log-kafka-pull");
        thread.setDaemon(true);
        thread.start();
        prefix = System.getProperty(config_prefix_key, prefix);
        queueSize = new Gauge<Integer>() { // from class: kd.bos.logging.logback.kafka.KafkaBlockingQueue.1
            /* renamed from: getValue, reason: merged with bridge method [inline-methods] */
            public Integer m9getValue() {
                return Integer.valueOf(KafkaBlockingQueue.queue.size());
            }
        };
        MetricSystem.registerGauge(name("queueSize"), queueSize);
        discardCount = MetricSystem.meter(name("discardCount"));
    }

    private static String name(String str) {
        return prefix + str;
    }

    public static void offer(KafkaMessage<?> kafkaMessage) {
        if (queue.offer(kafkaMessage)) {
            return;
        }
        discardCount.mark();
    }

    public static void stop() {
        STOPFLAG = true;
    }

    static {
        queue = null;
        queue = new ArrayBlockingQueue<>(ConfigurationUtil.getInteger("log.kafka.queue.capacity", Integer.valueOf(QUEUE_CAPACITY)).intValue());
    }
}
