package kd.bos.util.async;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import kd.bos.thread.ThreadEndClear;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:kd/bos/util/async/AsyncOutput.class */
public class AsyncOutput<T> {
    private Map<Long, Runnable> runnableMaps = new ConcurrentHashMap();
    private SetQueue<Runnable> queue = new SetQueue<>();
    private Map<Long, ConcurrentLinkedQueue<T>> spanMaps = new ConcurrentHashMap();
    private AtomicInteger totalSpanCount = new AtomicInteger(0);
    private AtomicLong discardTotalCount = new AtomicLong(0);
    private int maxTotalQueueCount = Integer.MAX_VALUE;
    private boolean isWaitWhenFull = false;
    private boolean isStartedFlag = true;
    private Consumer<List<T>> consumer;
    private static final Logger log = LoggerFactory.getLogger(AsyncOutput.class);
    private static final Map<String, AsyncOutput<?>> ayncTasks = new ConcurrentHashMap(2);

    public static <J> AsyncOutput<J> create(String str, int i, Consumer<List<J>> consumer) {
        return (AsyncOutput) ayncTasks.computeIfAbsent(str, str2 -> {
            return new AsyncOutput(str, i, consumer);
        });
    }

    private AsyncOutput(String str, int i, Consumer<List<T>> consumer) {
        initMonitor(i, str);
        this.consumer = consumer;
    }

    public AsyncOutput<T> withWaitWhenFull(boolean z) {
        this.isWaitWhenFull = z;
        return this;
    }

    public AsyncOutput<T> withMaxTotalQueueCount(int i) {
        this.maxTotalQueueCount = i;
        return this;
    }

    public boolean isStarted() {
        return this.isStartedFlag;
    }

    public void stop() {
        this.isStartedFlag = false;
    }

    private void initMonitor(int i, String str) {
        for (int i2 = 0; i2 < i; i2++) {
            Thread thread = new Thread(() -> {
                while (this.isStartedFlag) {
                    try {
                        Runnable poll = this.queue.poll();
                        if (poll != null) {
                            poll.run();
                        }
                    } catch (Exception e) {
                        LockSupport.parkNanos(100000000L);
                        log.warn(str + ": run task of trace_span exception ", e);
                    }
                }
            }, str + "-" + i2);
            thread.setDaemon(true);
            thread.start();
        }
        ThreadEndClear.addListener(set -> {
            ArrayList arrayList = new ArrayList();
            this.spanMaps.forEach((l, concurrentLinkedQueue) -> {
                if (set.contains(l)) {
                    return;
                }
                arrayList.add(l);
            });
            arrayList.forEach(l2 -> {
                this.spanMaps.remove(l2);
            });
            arrayList.clear();
            this.runnableMaps.forEach((l3, runnable) -> {
                if (set.contains(l3)) {
                    return;
                }
                arrayList.add(l3);
            });
            arrayList.forEach(l4 -> {
                this.runnableMaps.remove(l4);
            });
        });
    }

    public void onEvent(T t) {
        long id = Thread.currentThread().getId();
        ConcurrentLinkedQueue<T> computeIfAbsent = this.spanMaps.computeIfAbsent(Long.valueOf(id), l -> {
            ConcurrentLinkedQueue concurrentLinkedQueue = new ConcurrentLinkedQueue();
            this.runnableMaps.put(Long.valueOf(id), new WrapperTask(id, this.spanMaps, this.totalSpanCount, this.consumer));
            return concurrentLinkedQueue;
        });
        this.queue.putIfAbsent(this.runnableMaps.get(Long.valueOf(id)));
        if (this.isWaitWhenFull) {
            while (this.totalSpanCount.get() > this.maxTotalQueueCount) {
                LockSupport.parkNanos(50000000L);
            }
        } else if (this.totalSpanCount.get() > this.maxTotalQueueCount) {
            this.discardTotalCount.incrementAndGet();
            return;
        }
        computeIfAbsent.add(t);
        this.totalSpanCount.incrementAndGet();
    }

    public long getDiscardCount() {
        return this.discardTotalCount.get();
    }
}
