package com.kingdee.bos.qing.modeler.datasync.domain.impl;

import com.kingdee.bos.qing.common.session.QingSessionUtil;
import com.kingdee.bos.qing.dpp.client.dataset.LocalSourceDataSet;
import com.kingdee.bos.qing.dpp.client.exception.DataSetReadException;
import com.kingdee.bos.qing.dpp.client.job.JobRuntimeCache;
import com.kingdee.bos.qing.dpp.common.datasync.DataSyncCtx;
import com.kingdee.bos.qing.dpp.common.datasync.DataSyncableFactory;
import com.kingdee.bos.qing.dpp.common.datasync.IDataSyncWriter;
import com.kingdee.bos.qing.dpp.common.datasync.IDataSyncronizer;
import com.kingdee.bos.qing.dpp.common.datasync.model.DataSyncTaskId;
import com.kingdee.bos.qing.dpp.common.datasync.model.SrcDataSyncTaskId;
import com.kingdee.bos.qing.dpp.common.datasync.model.SyncedFieldMeta;
import com.kingdee.bos.qing.dpp.common.gpfdist.GpfDistHelper;
import com.kingdee.bos.qing.dpp.job.exception.DataSinkException;
import com.kingdee.bos.qing.dpp.model.filters.IRuntimeFilter;
import com.kingdee.bos.qing.dpp.model.schema.DppField;
import com.kingdee.bos.qing.dpp.model.transform.Transformation;
import com.kingdee.bos.qing.dpp.model.transform.settings.GPFDistSinkSettings;
import com.kingdee.bos.qing.dpp.model.transform.settings.SinkSettings;
import com.kingdee.bos.qing.dpp.model.transform.source.AbstractDppSource;
import com.kingdee.bos.qing.dpp.model.transform.source.DppJdbcSource;
import com.kingdee.bos.qing.dpp.model.transform.source.SideOutputDppSource;
import com.kingdee.bos.qing.modeler.datasync.model.LocalSinkResult;
import com.kingdee.bos.qing.modeler.deploy.common.Constant;
import com.kingdee.bos.qing.util.JsonUtil;
import com.kingdee.bos.qing.util.LogUtil;
import java.sql.Timestamp;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import kd.bos.context.RequestContext;

/* loaded from: input_file:com/kingdee/bos/qing/modeler/datasync/domain/impl/LocalSinkExecutorForIncrement.class */
public class LocalSinkExecutorForIncrement {
    private final Transformation transformation;
    private IDataSyncWriter dataSyncWriter;
    private DataSyncTaskId dataSyncTaskId;
    private static final boolean physicalTableReserve = true;
    private final List<IRuntimeFilter> filters;
    private final SinkSettings sinkSettings;
    private List<DppField> fieldList;

    public LocalSinkExecutorForIncrement(Transformation transformation, SinkSettings sinkSettings, List<IRuntimeFilter> list) {
        this.transformation = transformation;
        this.sinkSettings = sinkSettings;
        this.filters = list;
    }

    public LocalSinkResult execute() throws Exception {
        initDataSyncWriter();
        try {
            this.dataSyncWriter.begin();
            Iterator<IRuntimeFilter> it = this.filters.iterator();
            while (it.hasNext()) {
                loadAndWriteData(it.next());
            }
            this.dataSyncWriter.finish();
            return createLocalSinkResult();
        } catch (Exception e) {
            if (this.dataSyncWriter != null) {
                this.dataSyncWriter.resetError(e);
                this.dataSyncWriter.finishOnErr();
            }
            LogUtil.error("materialized error: sink source data to greenplum error. ", e);
            throw e;
        }
    }

    private void initDataSyncWriter() throws DataSetReadException, DataSinkException {
        AbstractDppSource buildSource = this.transformation.getTransformSettings().buildSource(Constant.DESC);
        SideOutputDppSource sideOutputDppSource = new SideOutputDppSource(buildSource, this.sinkSettings);
        DataSyncCtx dataSyncCtx = new DataSyncCtx();
        dataSyncCtx.withOption("SinkSetting", this.sinkSettings).withOption("PhysicalTablePreserveEnabled", true).withOption("OutputSource", sideOutputDppSource);
        IDataSyncronizer createDataSyncable = DataSyncableFactory.createDataSyncable(dataSyncCtx);
        if (null == createDataSyncable) {
            throw new DataSinkException("create data syncroinzer failed");
        }
        DppJdbcSource buildSinkSource = this.sinkSettings.buildSinkSource();
        this.fieldList = getDataSet(this.transformation, 1).getRowMeta();
        ArrayList arrayList = new ArrayList(this.fieldList);
        arrayList.add(GpfDistHelper.createBatchSyncSeqField());
        arrayList.add(GpfDistHelper.createBatchSyncTimeField());
        SyncedFieldMeta syncedFieldMeta = new SyncedFieldMeta(arrayList, buildSinkSource.getDbType());
        String uuid = UUID.randomUUID().toString();
        this.dataSyncWriter = createDataSyncable.createWriter(uuid, syncedFieldMeta);
        this.dataSyncTaskId = new SrcDataSyncTaskId(uuid, buildSource.getUniqueKey());
        JobRuntimeCache.cacheDppSource(this.dataSyncTaskId.getId(), this.sinkSettings.buildSinkSource());
        QingSessionUtil.getGlobalQingSessionImpl().set("REQUESTCONTEXT_KEY_" + this.dataSyncTaskId.getId(), JsonUtil.encodeToString(RequestContext.get()), 24, TimeUnit.HOURS);
    }

    private void loadAndWriteData(IRuntimeFilter iRuntimeFilter) throws Exception {
        this.transformation.getTransformSettings().setPushDownFilter(iRuntimeFilter);
        LocalSourceDataSet localSourceDataSet = null;
        try {
            long j = 1;
            if (this.sinkSettings instanceof GPFDistSinkSettings) {
                j = this.sinkSettings.getNewBatchSeq();
            }
            localSourceDataSet = getDataSet(this.transformation, -1);
            while (localSourceDataSet.hasNext()) {
                Object[] all = localSourceDataSet.nextRow().getAll();
                if (all != null) {
                    Object[] objArr = new Object[all.length + 2];
                    System.arraycopy(all, 0, objArr, 0, all.length);
                    objArr[objArr.length - 2] = Long.valueOf(j);
                    objArr[objArr.length - 1] = new Timestamp(System.currentTimeMillis());
                    this.dataSyncWriter.writeRowData(objArr);
                }
            }
            if (localSourceDataSet != null) {
                localSourceDataSet.close();
            }
        } catch (Throwable th) {
            if (localSourceDataSet != null) {
                localSourceDataSet.close();
            }
            throw th;
        }
    }

    private LocalSourceDataSet getDataSet(Transformation transformation, int i) throws DataSetReadException {
        return new LocalSourceDataSet(transformation, i);
    }

    private LocalSinkResult createLocalSinkResult() {
        return new LocalSinkResult(this.dataSyncTaskId, this.sinkSettings.buildSinkSource(), true, this.fieldList);
    }
}
