package com.kingdee.bos.qing.dpp.datasource.input;

import com.kingdee.bos.qing.common.context.QingContext;
import com.kingdee.bos.qing.dpp.common.annotations.SourceInput;
import com.kingdee.bos.qing.dpp.common.datasync.DataSyncCtx;
import com.kingdee.bos.qing.dpp.common.datasync.DataSyncableFactory;
import com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter;
import com.kingdee.bos.qing.dpp.common.datasync.IDataSyncronizer;
import com.kingdee.bos.qing.dpp.common.datasync.model.SyncedFieldMeta;
import com.kingdee.bos.qing.dpp.common.types.ConnectorType;
import com.kingdee.bos.qing.dpp.common.types.DBType;
import com.kingdee.bos.qing.dpp.datasource.listeners.DataSourceSideOutputListenerAdapter;
import com.kingdee.bos.qing.dpp.datasource.listeners.DataSourceSideOutputListeners;
import com.kingdee.bos.qing.dpp.exception.QDppSideOutputAbandonException;
import com.kingdee.bos.qing.dpp.exception.QDppSourceException;
import com.kingdee.bos.qing.dpp.model.schema.DppField;
import com.kingdee.bos.qing.dpp.model.schema.SourceInputSchema;
import com.kingdee.bos.qing.dpp.model.transform.source.AbstractDppSource;
import com.kingdee.bos.qing.dpp.model.transform.source.SideOutputDppSource;
import com.kingdee.bos.qing.util.ThreadPoolManage;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@SourceInput(ConnectorType.SIDE_OUTPUT)
/* loaded from: input_file:com/kingdee/bos/qing/dpp/datasource/input/DppSideOutputSourceInput.class */
public class DppSideOutputSourceInput extends AbstractSourceDataInput<SideOutputDppSource> {
    private static final Logger log = LoggerFactory.getLogger(DppSideOutputSourceInput.class);
    private String jobName;
    private SideOutputDppSource sideOutputDppSource;
    private IDataSyncronizer dataSyncronizer;
    private AbstractSourceDataInput sourceDataInput = null;
    private LinkedBlockingQueue<BatchRows> rowsToSinkQueue = new LinkedBlockingQueue<>(2000);
    private volatile boolean sinking = false;
    private boolean sinkable = false;
    private boolean sinkThreadCreated = false;
    private AtomicReference<Exception> sinkErrRef = new AtomicReference<>(null);

    /* loaded from: input_file:com/kingdee/bos/qing/dpp/datasource/input/DppSideOutputSourceInput$BatchRows.class */
    private static class BatchRows {
        private List<Object[]> rows;

        public List<Object[]> getRows() {
            return this.rows;
        }

        public BatchRows(List<Object[]> list) {
            this.rows = list;
        }
    }

    /* loaded from: input_file:com/kingdee/bos/qing/dpp/datasource/input/DppSideOutputSourceInput$SideOutputWorker.class */
    private class SideOutputWorker extends DataSourceSideOutputListenerAdapter implements Callable {
        private List<DppField> fieldList;
        private volatile boolean abandon = false;
        private CountDownLatch latch = new CountDownLatch(1);

        public SideOutputWorker(List<DppField> list) {
            this.fieldList = list;
            DataSourceSideOutputListeners.regListener(DppSideOutputSourceInput.this.jobName, DppSideOutputSourceInput.this.sideOutputDppSource, this);
        }

        @Override // com.kingdee.bos.qing.dpp.datasource.listeners.DataSourceSideOutputListenerAdapter, com.kingdee.bos.qing.dpp.datasource.listeners.IDataSourceSideOutputListener
        public void onJobFailed(String str, SideOutputDppSource sideOutputDppSource) {
            this.abandon = true;
            DppSideOutputSourceInput.this.sinking = false;
        }

        @Override // java.util.concurrent.Callable
        public Object call() throws Exception {
            IDataSyncWriter iDataSyncWriter = null;
            try {
                try {
                    iDataSyncWriter = DppSideOutputSourceInput.this.initDataSyncWriter();
                    iDataSyncWriter.begin();
                    while (DppSideOutputSourceInput.this.sinking) {
                        BatchRows batchRows = (BatchRows) DppSideOutputSourceInput.this.rowsToSinkQueue.poll(1000L, TimeUnit.MILLISECONDS);
                        if (null != batchRows) {
                            Iterator<Object[]> it = batchRows.getRows().iterator();
                            while (it.hasNext()) {
                                iDataSyncWriter.writeRowData(it.next());
                            }
                        }
                    }
                    if (!this.abandon && DppSideOutputSourceInput.this.sinkErrRef.get() == null && DppSideOutputSourceInput.this.rowsToSinkQueue.size() > 0) {
                        Iterator it2 = DppSideOutputSourceInput.this.rowsToSinkQueue.iterator();
                        while (it2.hasNext()) {
                            Iterator<Object[]> it3 = ((BatchRows) it2.next()).getRows().iterator();
                            while (it3.hasNext()) {
                                iDataSyncWriter.writeRowData(it3.next());
                            }
                        }
                    }
                    try {
                        finishGpfdistTask(iDataSyncWriter);
                        this.latch.countDown();
                        return null;
                    } finally {
                    }
                } catch (Throwable th) {
                    try {
                        finishGpfdistTask(iDataSyncWriter);
                        this.latch.countDown();
                        throw th;
                    } finally {
                    }
                }
            } catch (Exception e) {
                DppSideOutputSourceInput.this.sinkErrRef.set(e);
                DppSideOutputSourceInput.this.sinkable = false;
                DppSideOutputSourceInput.log.error("sink source data to greenplum error", e);
                try {
                    finishGpfdistTask(iDataSyncWriter);
                    this.latch.countDown();
                    return null;
                } finally {
                    this.latch.countDown();
                }
            }
        }

        void finishGpfdistTask(IDataSyncWriter iDataSyncWriter) {
            if (this.abandon) {
                finishOnErr(iDataSyncWriter);
                DataSourceSideOutputListeners.notifyError(DppSideOutputSourceInput.this.jobName, DppSideOutputSourceInput.this.sideOutputDppSource, this.fieldList, new QDppSideOutputAbandonException("current side output task is abandon due to current job running error"));
            } else {
                if (DppSideOutputSourceInput.this.sinkErrRef.get() != null) {
                    finishOnErr(iDataSyncWriter);
                    DataSourceSideOutputListeners.notifyError(DppSideOutputSourceInput.this.jobName, DppSideOutputSourceInput.this.sideOutputDppSource, this.fieldList, (Exception) DppSideOutputSourceInput.this.sinkErrRef.get());
                    return;
                }
                finishDataSyncWriter(iDataSyncWriter);
                if (DppSideOutputSourceInput.this.sinkErrRef.get() == null) {
                    DataSourceSideOutputListeners.notifyFinish(DppSideOutputSourceInput.this.jobName, DppSideOutputSourceInput.this.sideOutputDppSource, this.fieldList);
                } else {
                    finishOnErr(iDataSyncWriter);
                    DataSourceSideOutputListeners.notifyError(DppSideOutputSourceInput.this.jobName, DppSideOutputSourceInput.this.sideOutputDppSource, this.fieldList, (Exception) DppSideOutputSourceInput.this.sinkErrRef.get());
                }
            }
        }

        void finishDataSyncWriter(IDataSyncWriter iDataSyncWriter) {
            if (null != iDataSyncWriter) {
                try {
                    iDataSyncWriter.finish();
                } catch (Exception e) {
                    DppSideOutputSourceInput.log.error("finish csv data loader failed", e);
                    DppSideOutputSourceInput.this.sinkErrRef.set(e);
                }
            }
        }

        void finishOnErr(IDataSyncWriter iDataSyncWriter) {
            if (null != iDataSyncWriter) {
                iDataSyncWriter.finishOnErr();
            }
        }
    }

    @Override // com.kingdee.bos.qing.dpp.datasource.input.AbstractSourceDataInput
    public void open(SideOutputDppSource sideOutputDppSource, QueryOption queryOption) throws QDppSourceException {
        this.jobName = queryOption.getJobName();
        this.sideOutputDppSource = sideOutputDppSource;
        createDataSyncronizer();
        if (queryOption.getAttemptNumber() > 0) {
            throw new QDppSourceException("data sync task can not be restart");
        }
        AbstractDppSource originSource = sideOutputDppSource.getOriginSource();
        this.sourceDataInput = SourceInputFactory.newLocalSourceInput(originSource.getConnectType());
        this.sourceDataInput.open(originSource, queryOption);
        this.sinkable = this.dataSyncronizer != null && this.dataSyncronizer.isSyncable();
    }

    private void createDataSyncronizer() {
        DataSyncCtx dataSyncCtx = new DataSyncCtx();
        dataSyncCtx.withOption(DataSyncCtx.SINK_SETTING_KEY, this.sideOutputDppSource.getSinkSettings()).withOption(DataSyncCtx.OUTPUT_SOURCE, this.sideOutputDppSource);
        this.dataSyncronizer = DataSyncableFactory.createDataSyncable(dataSyncCtx);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.kingdee.bos.qing.dpp.datasource.input.AbstractSourceDataInput
    public SourceInputSchema createSchema() throws QDppSourceException {
        return this.sourceDataInput.createSchema();
    }

    @Override // com.kingdee.bos.qing.dpp.datasource.input.AbstractSourceDataInput
    public List<Object[]> nextRows(Integer num) throws QDppSourceException {
        if (!this.sinkThreadCreated && this.sinkable) {
            List<DppField> fields = this.inputSchema.getFields();
            this.sinking = true;
            ThreadPoolManage.excuteThreadWithContext(ThreadPoolManage.QingThreadPoolName.QING_LONG_TIME_TASK_HANDLER, new SideOutputWorker(fields), (QingContext) null);
            this.sinkThreadCreated = true;
        }
        List<Object[]> nextRows = this.sourceDataInput.nextRows(num);
        if (!this.sinkable) {
            return nextRows;
        }
        if (nextRows != null) {
            try {
                this.rowsToSinkQueue.put(new BatchRows(nextRows));
            } catch (InterruptedException e) {
                this.sinkErrRef.compareAndSet(null, e);
                this.sinking = false;
            }
        }
        return nextRows;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public IDataSyncWriter initDataSyncWriter() throws QDppSourceException {
        return this.dataSyncronizer.createWriter(this.jobName, new SyncedFieldMeta(getInputSchema().getFields(), DBType.GREENPLUM));
    }

    @Override // com.kingdee.bos.qing.dpp.datasource.input.AbstractSourceDataInput
    protected AbstractDppSource getHandledDppSource() {
        return this.sideOutputDppSource;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // com.kingdee.bos.qing.dpp.datasource.input.AbstractSourceDataInput
    public void internalClose() {
        this.sinking = false;
        this.sourceDataInput.internalClose();
    }
}
