package kd.bos.algo.dataset.store.mm;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import kd.bos.algo.config.AlgoConfiguration;
import kd.bos.algo.util.concurrent.AlgoExecutors;

/* JADX INFO: Access modifiers changed from: package-private */
/* loaded from: input_file:kd/bos/algo/dataset/store/mm/SpillerAsync.class */
public class SpillerAsync extends Spiller {
    private volatile boolean started = false;
    private ExecutorService es;
    private LinkedBlockingQueue<StoreUnitHolder> queue;

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algo/dataset/store/mm/SpillerAsync$SpillRunnalbe.class */
    public class SpillRunnalbe implements Runnable {
        private SpillRunnalbe() {
        }

        @Override // java.lang.Runnable
        public void run() {
            while (true) {
                try {
                    ((StoreUnitHolder) SpillerAsync.this.queue.take()).transfer();
                } catch (InterruptedException e) {
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public SpillerAsync(int i) {
        this.queue = new LinkedBlockingQueue<>(i);
    }

    @Override // kd.bos.algo.dataset.store.mm.Spiller
    public boolean write(StoreUnitHolder storeUnitHolder) {
        if (!this.started) {
            start();
        }
        return this.queue.offer(storeUnitHolder);
    }

    private synchronized void start() {
        if (this.started) {
            return;
        }
        this.started = true;
        int i = AlgoConfiguration.MM_SPILL_ASYNC_THREADPOOL_SIZE.getInt();
        this.es = AlgoExecutors.newFixedThreadPool(i, "memory-spill");
        for (int i2 = 0; i2 < i; i2++) {
            this.es.submit(new SpillRunnalbe());
        }
    }
}
