package kd.bos.algo.dataset.cache.kv;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import kd.bos.algo.AlgoException;
import kd.bos.algo.CacheHint;
import kd.bos.algo.Row;
import kd.bos.algo.RowMeta;
import kd.bos.algo.dataset.cache.DataSetCacheMeta;
import kd.bos.algo.dataset.cache.DataSetCacheSpi;
import kd.bos.algo.dataset.cache.SimpleMetaImpl;
import kd.bos.algo.exception.AlgoExceedAllowMaxRowsException;
import kd.bos.algo.serde.RowSerde;
import kd.bos.algo.storage.KVStorage;
import kd.bos.algo.storage.ZipKVReader;
import kd.bos.algo.storage.ZipKVWriter;
import kd.bos.metric.Meter;
import kd.bos.metric.MetricSystem;
import kd.bos.metric.Timer;
import kd.bos.trace.TraceSpan;
import kd.bos.trace.Tracer;

/* loaded from: input_file:kd/bos/algo/dataset/cache/kv/KVSpiImpl.class */
public class KVSpiImpl implements DataSetCacheSpi {
    private KVStorage storage;
    private RowSerde rowSerde;
    private Timer timerByName;
    private Meter meterByName;
    private Meter errorByName;
    private Appender appender;
    private MainThread mainThread = null;
    private Timer timer = MetricSystem.timer("kd.metrics.algo.cache.saveTimer");
    private Meter meter = MetricSystem.meter("kd.metrics.algo.cache.saveMeter");
    private Meter errorMeter = MetricSystem.meter("kd.metrics.algo.cache.saveMeter.error");
    private String compressor = System.getProperty("algo.storage.compressor");

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:kd/bos/algo/dataset/cache/kv/KVSpiImpl$Appender.class */
    public class Appender {
        private RowMeta rowMeta;
        private CacheHint hint;
        private String cacheId;
        private KVStorage.KVWriter writer;
        private int pageSize;
        private int rowCount;
        private int pageId;
        private Page page;

        Appender() {
        }

        static /* synthetic */ int access$608(Appender appender) {
            int i = appender.rowCount;
            appender.rowCount = i + 1;
            return i;
        }

        static /* synthetic */ int access$704(Appender appender) {
            int i = appender.pageId + 1;
            appender.pageId = i;
            return i;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algo/dataset/cache/kv/KVSpiImpl$MainThread.class */
    public class MainThread extends Thread {
        private LinkedBlockingQueue<Page> queue = new LinkedBlockingQueue<>(4);
        private boolean started = false;
        private boolean end = false;
        private CountDownLatch latch = new CountDownLatch(1);
        private long cost = 0;
        private KVStorage.KVWriter writer;
        private String key1;
        private RowMeta rowMeta;
        private CacheHint hint;
        private AlgoException error;

        public MainThread(KVStorage.KVWriter kVWriter, String str, RowMeta rowMeta, CacheHint cacheHint) {
            this.writer = kVWriter;
            this.key1 = str;
            this.rowMeta = rowMeta;
            this.hint = cacheHint;
        }

        void end() {
            this.end = true;
        }

        void addPage(Page page) {
            if (this.error != null) {
                this.latch.countDown();
                throw this.error;
            }
            try {
                this.queue.put(page);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
            if (this.started) {
                return;
            }
            start();
            this.started = true;
        }

        @Override // java.lang.Thread, java.lang.Runnable
        public void run() {
            Page page = null;
            while (true) {
                try {
                    try {
                        page = this.queue.poll(10L, TimeUnit.MILLISECONDS);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    if (page == null) {
                        if (!this.end) {
                            continue;
                        } else if (this.error != null || this.queue.isEmpty()) {
                            break;
                        }
                    } else {
                        if (this.error != null) {
                            break;
                        }
                        long currentTimeMillis = System.currentTimeMillis();
                        processOne(page);
                        this.cost += System.currentTimeMillis() - currentTimeMillis;
                    }
                } finally {
                    this.latch.countDown();
                }
            }
        }

        private void processOne(Page page) {
            try {
                KVSpiImpl.this.writePage(this.writer, this.rowMeta, page, this.hint);
            } catch (AlgoException e) {
                this.error = e;
                this.end = true;
            } catch (Exception e2) {
                this.error = new AlgoException(e2);
                this.end = true;
            }
        }

        public void cancelOnException(AlgoException algoException) {
            this.error = algoException;
            this.end = true;
            waitDone();
            if (this.writer.hasFlushed()) {
                KVSpiImpl.this.delete(this.key1, false);
            }
        }

        public void waitDone() {
            try {
                this.latch.await();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:kd/bos/algo/dataset/cache/kv/KVSpiImpl$Page.class */
    public class Page {
        private int pageId;
        private int rowCount;
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        DataOutputStream dout = new DataOutputStream(this.baos);

        Page(int i) {
            this.pageId = i;
        }

        public void addRow(RowMeta rowMeta, Row row) {
            KVSpiImpl.this.writeRow(rowMeta, row, this.dout);
            this.rowCount++;
        }

        public byte[] toByteArray() throws IOException {
            this.dout.flush();
            return this.baos.toByteArray();
        }
    }

    public KVSpiImpl(KVStorage kVStorage) {
        this.storage = kVStorage;
        this.timerByName = MetricSystem.timer("kd.metrics.algo.cache.saveTimer." + kVStorage.getName());
        this.meterByName = MetricSystem.meter("kd.metrics.algo.cache.saveMeter." + kVStorage.getName());
        this.errorByName = MetricSystem.meter("kd.metrics.algo.cache.saveMeter.error." + kVStorage.getName());
    }

    private String getCacheId(CacheHint cacheHint) {
        return cacheHint.getCacheId() != null ? cacheHint.getCacheId() : generateId();
    }

    private String generateId() {
        return "datasetcache-" + UUID.randomUUID().toString().replace("-", "");
    }

    public String getPageKey(int i) {
        return "page" + i;
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void open(RowMeta rowMeta, CacheHint cacheHint) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        this.appender = new Appender();
        this.appender.rowMeta = rowMeta;
        this.appender.hint = cacheHint;
        this.appender.cacheId = getCacheId(cacheHint);
        this.appender.pageSize = cacheHint.getPageSize();
        try {
            this.appender.writer = createWriter(this.appender.cacheId, cacheHint.getTimeout());
            this.appender.page = new Page(0);
        } catch (IOException e) {
            this.errorMeter.mark();
            this.errorByName.mark();
            throw new AlgoException("can't write page: " + e.getMessage(), e);
        }
    }

    private boolean isExceedAllowMaxRows(int i, CacheHint cacheHint) {
        if (i <= cacheHint.getAllowMaxRows()) {
            return false;
        }
        if (cacheHint.isThrowExceptionWhenExceedAllowMaxRows()) {
            throw new AlgoExceedAllowMaxRowsException("DataSetCache exceed allow max rows:[rowCount:" + i + ",allowMaxRows:" + cacheHint.getAllowMaxRows() + "]");
        }
        return true;
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void append(Iterator<Row> it) {
        while (it.hasNext()) {
            append(it.next());
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void append(Row row) {
        try {
            if (isExceedAllowMaxRows(this.appender.rowCount, this.appender.hint)) {
                return;
            }
            this.appender.page.addRow(this.appender.rowMeta, row);
            Appender.access$608(this.appender);
            if (this.appender.rowCount % this.appender.pageSize == 0) {
                if (this.mainThread == null) {
                    this.mainThread = new MainThread(this.appender.writer, this.appender.cacheId, this.appender.rowMeta, this.appender.hint);
                }
                this.mainThread.addPage(this.appender.page);
                this.appender.page = new Page(Appender.access$704(this.appender));
            }
        } catch (Exception e) {
            AlgoException algoException = e instanceof AlgoException ? (AlgoException) e : new AlgoException("Save error", e);
            if (this.mainThread != null) {
                this.mainThread.cancelOnException(algoException);
            } else {
                delete(this.appender.cacheId, false);
            }
            throw algoException;
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public DataSetCacheMeta finish() {
        if (this.appender.page.rowCount > 0) {
            if (this.mainThread != null) {
                this.mainThread.addPage(this.appender.page);
            } else {
                try {
                    writePage(this.appender.writer, this.appender.rowMeta, this.appender.page, this.appender.hint);
                } catch (Exception e) {
                    this.errorMeter.mark();
                    this.errorByName.mark();
                    AlgoException algoException = e instanceof AlgoException ? (AlgoException) e : new AlgoException("Save error", e);
                    delete(this.appender.cacheId, false);
                    throw algoException;
                }
            }
        }
        try {
            if (this.mainThread != null) {
                this.mainThread.end();
                this.mainThread.waitDone();
                if (this.mainThread.error != null) {
                    throw this.mainThread.error;
                }
            }
            SimpleMetaImpl simpleMetaImpl = new SimpleMetaImpl(this.appender.cacheId, this.appender.rowMeta, this.appender.rowCount, this.appender.hint.getPageSize(), this.appender.hint.getStorageType());
            simpleMetaImpl.setCompressor(this.compressor);
            writeMeta(this.appender.writer, simpleMetaImpl);
            writeTimeout(this.appender.writer, this.appender.hint);
            this.appender.writer.flush();
            return simpleMetaImpl;
        } catch (Exception e2) {
            this.errorMeter.mark();
            this.errorByName.mark();
            AlgoException algoException2 = e2 instanceof AlgoException ? (AlgoException) e2 : new AlgoException("Save error", e2);
            delete(this.appender.cacheId, false);
            throw algoException2;
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public DataSetCacheMeta save(RowMeta rowMeta, Iterator<Row> it, CacheHint cacheHint) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        Timer.Context time = this.timer.time();
        Timer.Context time2 = this.timerByName.time();
        String cacheId = getCacheId(cacheHint);
        try {
            try {
                KVStorage.KVWriter createWriter = createWriter(cacheId, cacheHint.getTimeout());
                int writeRows = writeRows(Tracer.getCurrentSpan(), createWriter, cacheId, rowMeta, it, cacheHint);
                this.meter.mark(writeRows);
                this.meterByName.mark(writeRows);
                SimpleMetaImpl simpleMetaImpl = new SimpleMetaImpl(cacheId, rowMeta, writeRows, cacheHint.getPageSize(), cacheHint.getStorageType());
                simpleMetaImpl.setCompressor(this.compressor);
                writeMeta(createWriter, simpleMetaImpl);
                writeTimeout(createWriter, cacheHint);
                createWriter.flush();
                time.stop();
                time2.stop();
                return simpleMetaImpl;
            } catch (Exception e) {
                this.errorMeter.mark();
                this.errorByName.mark();
                AlgoException algoException = e instanceof AlgoException ? (AlgoException) e : new AlgoException("Save error", e);
                if (this.mainThread != null) {
                    this.mainThread.cancelOnException(algoException);
                } else {
                    delete(cacheId, false);
                }
                throw algoException;
            }
        } catch (Throwable th) {
            time.stop();
            time2.stop();
            throw th;
        }
    }

    private void writeTimeout(KVStorage.KVWriter kVWriter, CacheHint cacheHint) throws IOException {
        long timeout = cacheHint.getTimeout();
        long currentTimeMillis = System.currentTimeMillis();
        kVWriter.putMeta("timeout", ("" + timeout).getBytes(StandardCharsets.UTF_8));
        kVWriter.putMeta("createTime", ("" + currentTimeMillis).getBytes(StandardCharsets.UTF_8));
    }

    private void writeMeta(KVStorage.KVWriter kVWriter, SimpleMetaImpl simpleMetaImpl) throws IOException {
        ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
        new ObjectOutputStream(byteArrayOutputStream).writeObject(simpleMetaImpl);
        byteArrayOutputStream.flush();
        kVWriter.putMeta("meta", byteArrayOutputStream.toByteArray());
    }

    private int writeRows(TraceSpan traceSpan, KVStorage.KVWriter kVWriter, String str, RowMeta rowMeta, Iterator<Row> it, CacheHint cacheHint) throws IOException {
        System.currentTimeMillis();
        int i = 0;
        int pageSize = cacheHint.getPageSize();
        int i2 = 0;
        Page page = new Page(0);
        while (it.hasNext()) {
            Row next = it.next();
            if (!isExceedAllowMaxRows(i, cacheHint)) {
                page.addRow(rowMeta, next);
                i++;
                if (i % pageSize == 0) {
                    if (this.mainThread == null) {
                        this.mainThread = new MainThread(kVWriter, str, rowMeta, cacheHint);
                    }
                    this.mainThread.addPage(page);
                    i2++;
                    page = new Page(i2);
                }
            }
        }
        if (i % pageSize > 0 && this.mainThread != null) {
            this.mainThread.addPage(page);
        }
        if (this.mainThread != null) {
            this.mainThread.end();
            this.mainThread.waitDone();
            if (this.mainThread.error != null) {
                throw this.mainThread.error;
            }
        } else if (i % pageSize > 0) {
            writePage(kVWriter, rowMeta, page, cacheHint);
        }
        return i;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writePage(KVStorage.KVWriter kVWriter, RowMeta rowMeta, Page page, CacheHint cacheHint) throws IOException {
        kVWriter.put(getPageKey(page.pageId), page.toByteArray());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void writeRow(RowMeta rowMeta, Row row, DataOutputStream dataOutputStream) {
        this.rowSerde.write(rowMeta, row, dataOutputStream);
    }

    private Row readRow(RowMeta rowMeta, DataInputStream dataInputStream) {
        if (this.rowSerde == null) {
            this.rowSerde = RowSerde.Factory.get(rowMeta);
        }
        return this.rowSerde.read(rowMeta, dataInputStream);
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public DataSetCacheMeta getMeta(String str) {
        byte[] meta;
        if (str == null) {
            throw new NullPointerException("id is null");
        }
        try {
            KVStorage.KVReader createReader = createReader(str, null);
            if (createReader == null || (meta = createReader.getMeta("meta")) == null) {
                return null;
            }
            return (DataSetCacheMeta) new ObjectInputStream(new ByteArrayInputStream(meta)).readObject();
        } catch (Exception e) {
            throw new AlgoException("can't getMeta: " + e.getMessage(), e);
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void delete(DataSetCacheMeta dataSetCacheMeta) {
        delete(dataSetCacheMeta.getId());
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public void delete(String str) {
        delete(str, true);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void delete(String str, boolean z) {
        try {
            this.storage.delete(str);
        } catch (Exception e) {
            if (z) {
                throw new AlgoException(e, "error delete: " + e.getMessage(), new Object[0]);
            }
        }
    }

    @Override // kd.bos.algo.dataset.cache.DataSetCacheSpi
    public List<Row> getList(DataSetCacheMeta dataSetCacheMeta, int i, int i2) {
        TraceSpan create = Tracer.create("DataSet", "Cache.KVSpi.getList");
        Throwable th = null;
        try {
            String id = dataSetCacheMeta.getId();
            create.addTag("id", id);
            create.addTag("begin", String.valueOf(i));
            create.addTag("length", String.valueOf(i2));
            try {
                KVStorage.KVReader createReader = createReader(id, dataSetCacheMeta.getCompressor());
                RowMeta rowMeta = dataSetCacheMeta.getRowMeta();
                int rowCount = dataSetCacheMeta.getRowCount();
                if (rowCount == 0) {
                    List<Row> emptyList = Collections.emptyList();
                    if (create != null) {
                        if (0 != 0) {
                            try {
                                create.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            create.close();
                        }
                    }
                    return emptyList;
                }
                if (rowCount < i + i2) {
                    i2 = rowCount - i;
                    if (i2 <= 0) {
                        List<Row> emptyList2 = Collections.emptyList();
                        if (create != null) {
                            if (0 != 0) {
                                try {
                                    create.close();
                                } catch (Throwable th3) {
                                    th.addSuppressed(th3);
                                }
                            } else {
                                create.close();
                            }
                        }
                        return emptyList2;
                    }
                }
                int pageSize = dataSetCacheMeta.getPageSize();
                int i3 = i / pageSize;
                int i4 = i % pageSize;
                byte[] bArr = createReader.get(getPageKey(i3));
                if (bArr == null) {
                    throw AlgoException.create("kv cache not exists for id:%s,pageId:%d", id, Integer.valueOf(i3));
                }
                ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(bArr);
                DataInputStream dataInputStream = new DataInputStream(byteArrayInputStream);
                ArrayList arrayList = new ArrayList();
                int i5 = 0;
                int i6 = 0;
                while (i6 < i2 && i + i6 < rowCount) {
                    Row readRow = readRow(rowMeta, dataInputStream);
                    if (i5 < i4) {
                        i5++;
                    } else {
                        arrayList.add(readRow);
                        i5++;
                        i6++;
                        if (i5 % pageSize == 0 && i + i6 < rowCount) {
                            i4 = 0;
                            byteArrayInputStream.close();
                            i3++;
                            byteArrayInputStream = new ByteArrayInputStream(createReader.get(getPageKey(i3)));
                            dataInputStream = new DataInputStream(byteArrayInputStream);
                        }
                    }
                }
                return arrayList;
            } catch (IOException e) {
                throw new AlgoException("error when get dataset cache iterator.", e);
            }
        } finally {
            if (create != null) {
                if (0 != 0) {
                    try {
                        create.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    create.close();
                }
            }
        }
    }

    private KVStorage.KVWriter createWriter(String str, long j) throws IOException {
        KVStorage.KVWriter create = this.storage.create(str, j);
        if ("zip".equals(this.compressor)) {
            create = new ZipKVWriter(create);
        }
        return create;
    }

    private KVStorage.KVReader createReader(String str, String str2) throws IOException {
        KVStorage.KVReader open = this.storage.open(str);
        if ("zip".equals(str2)) {
            open = new ZipKVReader(open);
        }
        return open;
    }
}
