package com.kingdee.bos.qing.dpp.engine.flink.transform.sink.gpfdist;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.kingdee.bos.qing.common.rpc.common.QRpcInvokeHelper;
import com.kingdee.bos.qing.common.rpc.common.QRpcRemoteCaller;
import com.kingdee.bos.qing.dpp.common.datasync.impl.GpfdistDataSyncWriter;
import com.kingdee.bos.qing.dpp.common.datasync.model.SyncedFieldMeta;
import com.kingdee.bos.qing.dpp.common.gpfdist.exception.GpfdistException;
import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.engine.flink.transform.udf.common.FuncFinishState;
import com.kingdee.bos.qing.dpp.engine.flink.util.RemoteJobResultReceiverFactory;
import com.kingdee.bos.qing.dpp.engine.flink.util.RowDataToQDppRowDataConvertUtils;
import com.kingdee.bos.qing.dpp.job.interfaces.IJobStatusResultReceiver;
import com.kingdee.bos.qing.dpp.job.model.QDppJobResult;
import com.kingdee.bos.qing.dpp.job.model.QDppJobStatus;
import com.kingdee.bos.qing.dpp.model.transform.settings.GPFDistSinkSettings;
import com.kingdee.bos.qing.dpp.utils.DppErrorUtil;
import com.kingdee.bos.qing.dpp.utils.StringUtils;
import java.sql.Timestamp;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/transform/sink/gpfdist/QDppGpfDistSinkFunction.class */
public class QDppGpfDistSinkFunction extends RichSinkFunction<RowData> {
    private DataType producedDataType;
    private DataType[] colDataTypes;
    private String jobHexId;
    private String jobName;
    private static final Logger log = LoggerFactory.getLogger(QDppGpfDistSinkFunction.class);
    private GpfdistDataSyncWriter dataSyncWriter;
    private FuncFinishState finishState = FuncFinishState.NOT_EXECUTE;
    private boolean inStreamMode = false;
    private String hostName;
    private int port;
    private GPFDistSinkSettings sinkSettings;
    private Exception dataLoadException;
    private Timestamp insertTime;
    private IJobStatusResultReceiver resultReceiver;

    public QDppGpfDistSinkFunction(GPFDistSinkSettings gPFDistSinkSettings, DataType dataType) {
        this.producedDataType = dataType;
        this.sinkSettings = gPFDistSinkSettings;
    }

    private void prepareRemoteJobResultReceiver() {
        this.resultReceiver = RemoteJobResultReceiverFactory.getOrCreateJobStatusReceiver(this.hostName, this.port, 10000L, (byte) 11);
    }

    public void open(Configuration configuration) throws Exception {
        try {
            this.jobHexId = getRuntimeContext().getJobId().toHexString();
            this.colDataTypes = (DataType[]) DataType.getFieldDataTypes(this.producedDataType).toArray(new DataType[0]);
            log.info("open qdpp gpfdist sink function, jobId:" + this.jobHexId);
            Configuration globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
            this.jobName = globalJobParameters.getString(QDppOptions.JOB_GLOBAL_PARAMS_JOB_NAME.key(), (String) null);
            this.inStreamMode = globalJobParameters.getBoolean(QDppOptions.JOB_GLOBAL_PARAMS_USE_STREAM_MODE.key(), false);
            this.hostName = globalJobParameters.getString(QDppOptions.JOB_GLOBAL_PARAMS_FROM_HOST.key(), (String) null);
            this.port = globalJobParameters.getInteger(QDppOptions.JOB_GLOBAL_PARAMS_PORT.key(), -1);
            String string = globalJobParameters.getString(GpfDistSinkTableBuilder.GLOBAL_PARAM_EXTERNAL_TABLE_FIELD_META_KEY, (String) null);
            if (StringUtils.isNullOrEmpty(string)) {
                throw new IllegalStateException("no external table field data types found,jobName:" + this.jobName);
            }
            int attemptNumber = getRuntimeContext().getAttemptNumber();
            if (attemptNumber > 0) {
                throw new SuppressRestartsException(new GpfdistException("gpfdist sync task can not be restart"));
            }
            prepareRemoteJobResultReceiver();
            ObjectMapper objectMapper = new ObjectMapper();
            objectMapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
            this.dataSyncWriter = new GpfdistDataSyncWriter(this.jobName, (SyncedFieldMeta) objectMapper.readValue(string, SyncedFieldMeta.class), this.sinkSettings);
            this.insertTime = new Timestamp(System.currentTimeMillis());
            this.dataSyncWriter.setDataSyncListener(new GpfdistSyncListenerOnEngineSide(this.jobName, this.hostName, this.port, this.inStreamMode, attemptNumber, this.sinkSettings));
            this.dataSyncWriter.begin();
        } catch (Exception e) {
            this.dataLoadException = DppErrorUtil.getCauseError(e);
            if (null != this.dataSyncWriter) {
                this.dataSyncWriter.resetError(this.dataLoadException);
            }
            throw new SuppressRestartsException(this.dataLoadException);
        }
    }

    public void invoke(RowData rowData, SinkFunction.Context context) {
        try {
            Object[] readRowValue = RowDataToQDppRowDataConvertUtils.readRowValue(rowData, this.colDataTypes);
            if (this.sinkSettings.isAutoAddReservedField()) {
                Object[] objArr = new Object[readRowValue.length + 2];
                System.arraycopy(readRowValue, 0, objArr, 0, readRowValue.length);
                objArr[objArr.length - 2] = Long.valueOf(this.sinkSettings.getNewBatchSeq());
                objArr[objArr.length - 1] = this.insertTime;
                this.dataSyncWriter.writeRowData(objArr);
            } else {
                this.dataSyncWriter.writeRowData(readRowValue);
            }
        } catch (Exception e) {
            this.dataLoadException = DppErrorUtil.getCauseError(e);
            this.dataSyncWriter.resetError(this.dataLoadException);
            throw new SuppressRestartsException(this.dataLoadException);
        }
    }

    public void finish() {
        try {
            this.dataSyncWriter.finish();
            this.finishState = FuncFinishState.SUCCEED;
            notifyDataEnd();
        } catch (Exception e) {
            this.finishState = FuncFinishState.ERROR;
            throw new SuppressRestartsException(DppErrorUtil.getCauseError(e));
        }
    }

    private void notifyDataEnd() throws Exception {
        RuntimeContext runtimeContext = getRuntimeContext();
        final QDppJobResult qDppJobResult = new QDppJobResult(this.jobName);
        qDppJobResult.setJobHexId(runtimeContext.getJobId().toHexString());
        qDppJobResult.setJobStatus(QDppJobStatus.DATA_END);
        qDppJobResult.setSinkSource(this.sinkSettings.buildSinkSource());
        QRpcInvokeHelper.retryableCall(new QRpcRemoteCaller<Void>() { // from class: com.kingdee.bos.qing.dpp.engine.flink.transform.sink.gpfdist.QDppGpfDistSinkFunction.1
            /* renamed from: call, reason: merged with bridge method [inline-methods] */
            public Void m38call() throws Exception {
                QDppGpfDistSinkFunction.this.resultReceiver.receiveJobResult(qDppJobResult);
                return null;
            }
        }, 3);
    }

    public void close() throws Exception {
        switch (this.finishState) {
            case SUCCEED:
                return;
            case ERROR:
            case NOT_EXECUTE:
                finishOnErr();
                return;
            default:
                return;
        }
    }

    void finishOnErr() {
        try {
            if (null != this.dataSyncWriter) {
                log.warn("finish dataLoader when error happened,jobName:" + this.jobName);
                this.dataSyncWriter.finishOnErr();
            }
        } catch (Exception e) {
            log.error("finish gpfdist data loader error", e);
        }
    }
}
