package kd.bos.algo.dataset.store.mm.allocator;

import java.util.Iterator;
import java.util.function.Supplier;
import kd.bos.algo.RowMeta;
import kd.bos.algo.dataset.store.MultiValueMapStore;
import kd.bos.algo.dataset.store.mm.MMFactory;
import kd.bos.algo.dataset.store.mm.MMHashMapStore;
import kd.bos.algo.dataset.store.mm.MMMultiValueMapStore;
import kd.bos.algo.dataset.store.mm.MemUnit;
import kd.bos.algo.dataset.store.mm.MemoryAllocateException;
import kd.bos.algo.dataset.store.mm.QuoteCalculator;
import kd.bos.algo.dataset.store.mm.QuoteListener;
import kd.bos.algo.dataset.store.mm.SpillUnit;
import kd.bos.algo.dataset.store.mm.Spiller;
import kd.bos.algo.dataset.store.mm.StoreUnitAllocator;
import kd.bos.algo.dataset.store.mm.StoreUnitHolder;
import kd.bos.algo.dataset.store.mm.allocator.CommonLinkedQueue;
import kd.bos.util.ThreadLocals;
import org.apache.log4j.Logger;

/* loaded from: input_file:kd/bos/algo/dataset/store/mm/allocator/StoreUnitAllocatorByQueue.class */
public class StoreUnitAllocatorByQueue implements StoreUnitAllocator {
    private final SharedStoreUnitHolderLinkedQueue bigQueue;
    private final QuoteCalculator quoteCalculator;
    private final AtomicQuoteValue globalQuote = new AtomicQuoteValue();
    private final AtomicQuoteValue memoryOnlyQuote = new AtomicQuoteValue();
    private final AtomicQuoteValue bigQuote = new AtomicQuoteValue();
    private final CommonLinkedQueue<ThreadGod> threadGodQueue = new CommonLinkedQueue<>();
    private final Spiller syncSpiller = MMFactory.getSyncSpiller();
    private final Spiller asyncSpiller = MMFactory.getAsyncSpiller();
    private Logger logger = Logger.getLogger(getClass());
    private final ThreadLocal<ThreadGod> threadLocals = ThreadLocals.create(() -> {
        return createThreadGod();
    });
    private Object spillLock = new Object();
    private final SharedStoreUnitHolderLinkedQueue globalQueue = new SharedStoreUnitHolderLinkedQueue(this.globalQuote);
    private final SharedStoreUnitHolderLinkedQueue memoryOnlyQueue = new SharedStoreUnitHolderLinkedQueue(this.globalQuote, this.memoryOnlyQuote);

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algo/dataset/store/mm/allocator/StoreUnitAllocatorByQueue$MyQuoteListener.class */
    public class MyQuoteListener implements QuoteListener {
        private final SimpleStoreUnitHolderLinkedQueue threadHolderQueue;
        private final Supplier<AtomicQuoteValue>[] quoteValueSuppliers;

        public MyQuoteListener(SimpleStoreUnitHolderLinkedQueue simpleStoreUnitHolderLinkedQueue, Supplier<AtomicQuoteValue>... supplierArr) {
            this.threadHolderQueue = simpleStoreUnitHolderLinkedQueue;
            this.quoteValueSuppliers = supplierArr;
        }

        @Override // kd.bos.algo.dataset.store.mm.QuoteListener
        public void quoteInc(int i) {
            for (Supplier<AtomicQuoteValue> supplier : this.quoteValueSuppliers) {
                supplier.get().inc(i);
            }
            int quoteAvailable = this.threadHolderQueue.quoteAvailable();
            if (quoteAvailable >= i) {
                if (StoreUnitAllocatorByQueue.this.globalQuoteAvailable() < i) {
                    StoreUnitAllocatorByQueue.this.spillGlobal(StoreUnitAllocatorByQueue.this.asyncSpiller, i);
                }
            } else {
                int spillThreadGod = StoreUnitAllocatorByQueue.this.spillThreadGod(StoreUnitAllocatorByQueue.this.asyncSpiller, i, this.threadHolderQueue);
                if (spillThreadGod > 0) {
                    spillThreadGod = StoreUnitAllocatorByQueue.this.spillGlobal(StoreUnitAllocatorByQueue.this.asyncSpiller, i);
                }
                if (spillThreadGod > 0) {
                    StoreUnitAllocatorByQueue.this.logger.warn("MemoryAllocateException, threadQuoteAvailable:" + quoteAvailable + ",stats:" + StoreUnitAllocatorByQueue.this.stats());
                    throw new MemoryAllocateException("Can't allocate memory unit.");
                }
            }
        }

        @Override // kd.bos.algo.dataset.store.mm.QuoteListener
        public void quoteDec(int i) {
            for (Supplier<AtomicQuoteValue> supplier : this.quoteValueSuppliers) {
                supplier.get().dec(i);
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algo/dataset/store/mm/allocator/StoreUnitAllocatorByQueue$ThreadGod.class */
    public class ThreadGod implements AutoCloseable {
        private final CommonLinkedQueue.Node node;
        private final SimpleStoreUnitHolderLinkedQueue holderQueue;

        public ThreadGod() {
            this.node = StoreUnitAllocatorByQueue.this.threadGodQueue.add(this);
            this.holderQueue = new SimpleStoreUnitHolderLinkedQueue(StoreUnitAllocatorByQueue.this.quoteCalculator.getThreadMaxQuote());
        }

        @Override // java.lang.AutoCloseable
        public void close() {
            this.node.released();
            this.holderQueue.release();
        }
    }

    public StoreUnitAllocatorByQueue() {
        this.memoryOnlyQueue.addAdditionalQuoteHolder(() -> {
            return this.threadLocals.get().holderQueue.getCurrentQuote();
        });
        this.bigQueue = new SharedStoreUnitHolderLinkedQueue(this.bigQuote);
        this.quoteCalculator = MMFactory.getQuoteCalculator();
    }

    @Override // kd.bos.algo.dataset.store.mm.StoreUnitAllocator
    public StoreUnitHolder allocateTransferable(RowMeta rowMeta, boolean z) {
        ThreadGod threadGod = this.threadLocals.get();
        if (threadGod.holderQueue.quoteAvailable() < 1) {
            spillThreadGod(this.asyncSpiller, 1, threadGod.holderQueue);
            return allocateSpillOnly(rowMeta);
        }
        if (globalQuoteAvailable() < 1) {
            spillGlobal(this.asyncSpiller, 1);
        }
        StoreUnitHolder storeUnitHolder = new StoreUnitHolder(new MemUnit(rowMeta, this.quoteCalculator.calcBlockRows(rowMeta), this.quoteCalculator.calcOneQuoteRows(rowMeta), new MyQuoteListener(threadGod.holderQueue, () -> {
            return threadGod.holderQueue.getCurrentQuote();
        }, () -> {
            return this.globalQuote;
        })));
        if (!z) {
            threadGod.holderQueue.add(storeUnitHolder);
        }
        this.globalQueue.add(storeUnitHolder);
        return storeUnitHolder;
    }

    @Override // kd.bos.algo.dataset.store.mm.StoreUnitAllocator
    public StoreUnitHolder allocateMemoryOnly(RowMeta rowMeta) {
        ThreadGod threadGod = this.threadLocals.get();
        if (threadGod.holderQueue.quoteAvailable() < 1) {
            spillThreadGod(this.asyncSpiller, 1, threadGod.holderQueue);
        } else if (globalQuoteAvailable() < 1) {
            spillGlobal(this.asyncSpiller, 1);
        }
        StoreUnitHolder storeUnitHolder = new StoreUnitHolder(new MemUnit(rowMeta, this.quoteCalculator.calcOneQuoteRows(rowMeta), new MyQuoteListener(threadGod.holderQueue, () -> {
            return threadGod.holderQueue.getCurrentQuote();
        }, () -> {
            return this.globalQuote;
        }, () -> {
            return this.memoryOnlyQuote;
        })));
        this.memoryOnlyQueue.add(storeUnitHolder);
        return storeUnitHolder;
    }

    @Override // kd.bos.algo.dataset.store.mm.StoreUnitAllocator
    public <K, V> MMHashMapStore<K, V> allocateHashMapStore(int i) {
        ThreadGod threadGod = this.threadLocals.get();
        return new MMHashMapStore<>(i, new MyQuoteListener(threadGod.holderQueue, () -> {
            return threadGod.holderQueue.getCurrentQuote();
        }, () -> {
            return this.globalQuote;
        }));
    }

    @Override // kd.bos.algo.dataset.store.mm.StoreUnitAllocator
    public <K, V> MultiValueMapStore<K, V> allocateMultiValueMapStore(int i) {
        ThreadGod threadGod = this.threadLocals.get();
        return new MMMultiValueMapStore(i, new MyQuoteListener(threadGod.holderQueue, () -> {
            return threadGod.holderQueue.getCurrentQuote();
        }, () -> {
            return this.globalQuote;
        }));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int globalQuoteAvailable() {
        return this.quoteCalculator.globalAvailable(this.globalQuote);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int spillGlobal(Spiller spiller, int i) {
        int i2;
        synchronized (this.spillLock) {
            while (i > 0) {
                StoreUnitHolder popAddFinished = this.globalQueue.popAddFinished();
                if (popAddFinished == null) {
                    break;
                }
                if (popAddFinished.isMemory()) {
                    i -= popAddFinished.getQuote();
                    if (!spiller.write(popAddFinished)) {
                        this.syncSpiller.write(popAddFinished);
                    }
                }
            }
            i2 = i;
        }
        return i2;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int spillThreadGod(Spiller spiller, int i, SimpleStoreUnitHolderLinkedQueue simpleStoreUnitHolderLinkedQueue) {
        int i2;
        synchronized (this.spillLock) {
            while (i > 0) {
                StoreUnitHolder popAddFinished = simpleStoreUnitHolderLinkedQueue.popAddFinished();
                if (popAddFinished == null) {
                    break;
                }
                if (popAddFinished.isMemory()) {
                    i -= popAddFinished.getQuote();
                    if (!spiller.write(popAddFinished)) {
                        this.syncSpiller.write(popAddFinished);
                    }
                }
            }
            i2 = i;
        }
        return i2;
    }

    @Override // kd.bos.algo.dataset.store.mm.StoreUnitAllocator
    public StoreUnitHolder allocateSpillOnly(RowMeta rowMeta) {
        return new StoreUnitHolder(new SpillUnit(rowMeta));
    }

    private StoreUnitHolder allocateSpill(RowMeta rowMeta, int i) {
        return new StoreUnitHolder(new SpillUnit(rowMeta));
    }

    private ThreadGod createThreadGod() {
        return new ThreadGod();
    }

    @Override // kd.bos.algo.dataset.store.mm.StoreUnitAllocator
    public String stats() {
        StringBuilder sb = new StringBuilder();
        try {
            sb.append("globalMaxQuote:").append(this.quoteCalculator.getGlobalMaxQuote()).append(",globalQuote:").append(this.globalQuote.getValue()).append(",memoryOnlyQuote:").append(this.memoryOnlyQuote.getValue()).append(",bigQuote:").append(this.bigQuote.getValue()).append(",globalQueue:").append(this.globalQueue.size()).append(",memoryOnlyQueue:").append(this.memoryOnlyQueue.size()).append(",bigQueue:").append(this.bigQueue.size()).append(",threadGods:").append(this.threadGodQueue.size()).append(",threadQuotes:");
            int i = 0;
            Iterator<ThreadGod> it = this.threadGodQueue.iterator();
            while (it.hasNext()) {
                ThreadGod next = it.next();
                if (next != null && next.holderQueue != null && next.holderQueue.getCurrentQuote() != null) {
                    int value = next.holderQueue.getCurrentQuote().getValue();
                    sb.append(value).append(",");
                    i += value;
                }
            }
            sb.append("total:").append(i);
        } catch (Exception e) {
            this.logger.warn("get stats info error.", e);
        }
        return sb.toString();
    }
}
