package com.kingdee.bos.qing.dpp.engine.flink.transform.sink.qs;

import com.kingdee.bos.qing.common.rpc.common.QRpcInvocationHandler;
import com.kingdee.bos.qing.common.rpc.common.QRpcInvokeHelper;
import com.kingdee.bos.qing.common.rpc.common.QRpcRemoteCaller;
import com.kingdee.bos.qing.datasource.meta.MetaInfo;
import com.kingdee.bos.qing.datasource.meta.MetaInfoDecodeUtil;
import com.kingdee.bos.qing.dpp.common.file.FileUploadHelper;
import com.kingdee.bos.qing.dpp.common.interfaces.ISinkedQsFileReceiver;
import com.kingdee.bos.qing.dpp.common.options.QDppOptions;
import com.kingdee.bos.qing.dpp.common.qs.QDppQsFileWriter;
import com.kingdee.bos.qing.dpp.engine.flink.util.RemoteJobResultReceiverFactory;
import com.kingdee.bos.qing.dpp.engine.flink.util.RowDataToQDppRowDataConvertUtils;
import com.kingdee.bos.qing.dpp.job.exception.DataSinkException;
import com.kingdee.bos.qing.dpp.job.interfaces.IJobStatusResultReceiver;
import com.kingdee.bos.qing.dpp.model.file.UploadFile;
import com.kingdee.bos.qing.dpp.rpc.RemoteInvokerHelper;
import com.kingdee.bos.qing.dpp.utils.CloseUtils;
import com.kingdee.bos.qing.dpp.utils.StringUtils;
import com.kingdee.bos.qing.filesystem.manager.AbstractQingFile;
import com.kingdee.bos.qing.filesystem.manager.FileFactory;
import com.kingdee.bos.qing.filesystem.manager.api.IQingFile;
import com.kingdee.bos.qing.filesystem.manager.model.QingTempFileType;
import com.kingdee.bos.qing.filesystem.stream.QingInputStream;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.io.IOException;
import java.lang.reflect.Proxy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.execution.SuppressRestartsException;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/transform/sink/qs/QDppQsFileSinkFunction.class */
public class QDppQsFileSinkFunction extends RichSinkFunction<RowData> {
    private String qsFileName;
    private String fileSubFolder;
    private DataType producedDataType;
    private DataType[] colDataTypes;
    private QDppQsFileWriter qsWriter;
    private String jobHexId;
    private String metaFileName;
    private String uploadHost;
    private int port;
    private String jobName;
    private boolean needUploadQsFile;
    private boolean deleteTempSinkedQs;
    private AbstractQingFile sinkQingFile;
    private byte rpcVersion;
    private String sinkReceiverId;
    private boolean isSideOutput;
    private final Logger log = LoggerFactory.getLogger(getClass());
    private boolean finishWriter = false;

    public QDppQsFileSinkFunction(String str, String str2, DataType dataType, String str3, boolean z) {
        this.isSideOutput = false;
        this.fileSubFolder = str;
        this.producedDataType = dataType;
        this.metaFileName = str2;
        this.sinkReceiverId = str3;
        this.isSideOutput = z;
    }

    public void open(Configuration configuration) throws Exception {
        Configuration globalJobParameters = getRuntimeContext().getExecutionConfig().getGlobalJobParameters();
        this.jobName = globalJobParameters.getString(QDppOptions.JOB_GLOBAL_PARAMS_JOB_NAME.key(), (String) null);
        this.uploadHost = globalJobParameters.getString(QDppOptions.JOB_GLOBAL_PARAMS_FROM_HOST.key(), (String) null);
        this.port = globalJobParameters.getInteger(QDppOptions.JOB_GLOBAL_PARAMS_PORT.key(), 8564);
        this.needUploadQsFile = globalJobParameters.getBoolean(QDppOptions.JOB_GLOBAL_PARAMS_QS_UPLOAD_ENABLE.key(), true);
        this.deleteTempSinkedQs = globalJobParameters.getBoolean(QDppOptions.JOB_GLOBAL_PARAMS_QS_DELETE_ENABLE.key(), true);
        this.rpcVersion = Byte.parseByte(globalJobParameters.getString(QDppOptions.JOB_GLOBAL_PARAMS_RPC_VERSION.key(), String.valueOf(10)));
        checkRestartable();
        this.jobHexId = getRuntimeContext().getJobId().toHexString();
        this.log.info("open qdpp sink function, jobId:" + this.jobHexId);
        this.colDataTypes = (DataType[]) DataType.getFieldDataTypes(this.producedDataType).toArray(new DataType[0]);
        MetaInfo createMetaInfo = createMetaInfo();
        IQingFile newTempFile = FileFactory.newTempFile(QingTempFileType.getInstanceBySubFolder(this.fileSubFolder));
        this.qsFileName = newTempFile.getName();
        this.sinkQingFile = newTempFile.findFile(QingTempFileType.getInstanceBySubFolder(this.fileSubFolder), this.qsFileName);
        this.qsWriter = new QDppQsFileWriter(this.sinkQingFile);
        this.qsWriter.start(createMetaInfo);
    }

    private MetaInfo createMetaInfo() throws IOException {
        QingInputStream inputStream = FileFactory.newFileVisitor(QingTempFileType.DS_CACHE, this.metaFileName).getInputStream();
        ByteBuf buffer = Unpooled.buffer(4096);
        while (true) {
            try {
                int read = inputStream.read();
                if (read == -1) {
                    byte[] bArr = new byte[buffer.readableBytes()];
                    buffer.readBytes(bArr);
                    MetaInfo decode = MetaInfoDecodeUtil.decode(new String(bArr));
                    buffer.clear();
                    CloseUtils.close(inputStream);
                    return decode;
                }
                buffer.writeByte(read);
            } catch (Throwable th) {
                buffer.clear();
                CloseUtils.close(inputStream);
                throw th;
            }
        }
    }

    private void checkRestartable() {
        if (!this.isSideOutput && getRuntimeContext().getAttemptNumber() > 0) {
            notifyToRestart();
        }
    }

    private void notifyToRestart() {
        final IJobStatusResultReceiver orCreateJobStatusReceiver = RemoteJobResultReceiverFactory.getOrCreateJobStatusReceiver(this.uploadHost, this.port, 15000L, this.rpcVersion);
        try {
            if (!((Boolean) QRpcInvokeHelper.retryableCall(new QRpcRemoteCaller<Boolean>() { // from class: com.kingdee.bos.qing.dpp.engine.flink.transform.sink.qs.QDppQsFileSinkFunction.1
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public Boolean m41call() throws Exception {
                    return Boolean.valueOf(orCreateJobStatusReceiver.tryRestartOnFail(QDppQsFileSinkFunction.this.jobName));
                }
            }, 3)).booleanValue()) {
                throw new SuppressRestartsException(new DataSinkException("qs file typed sink task can not be restart ,jobName:" + this.jobName));
            }
        } catch (Exception e) {
            throw new SuppressRestartsException(e);
        }
    }

    public void invoke(RowData rowData, SinkFunction.Context context) throws Exception {
        this.qsWriter.writeData(RowDataToQDppRowDataConvertUtils.readRowValue(rowData, this.colDataTypes));
    }

    public void finish() throws Exception {
        this.qsWriter.finishWriteData();
        this.qsWriter.close((Exception) null);
        this.finishWriter = true;
        if (this.needUploadQsFile) {
            UploadFile uploadFile = new UploadFile();
            uploadFile.setTargetHost(this.uploadHost);
            uploadFile.setPort(this.port);
            uploadFile.setFileName(this.qsFileName);
            uploadFile.setFileType(QingTempFileType.TEMP_QS);
            uploadFile.setSrcFileAbsolutePath(this.sinkQingFile.getLocalFullPath());
            uploadFile.setTargetRpcVersion(this.rpcVersion);
            FileUploadHelper.uploadFile(uploadFile);
        }
        notifyQsFileReady();
    }

    private void notifyQsFileReady() throws Exception {
        if (StringUtils.isNotBlank(this.sinkReceiverId)) {
            final ISinkedQsFileReceiver iSinkedQsFileReceiver = (ISinkedQsFileReceiver) Proxy.newProxyInstance(getClass().getClassLoader(), new Class[]{ISinkedQsFileReceiver.class}, new QRpcInvocationHandler(RemoteInvokerHelper.createInvokerProxy(this.rpcVersion, this.sinkReceiverId, this.uploadHost, this.port), true, 15000L));
            QRpcInvokeHelper.retryableCall(new QRpcRemoteCaller<Void>() { // from class: com.kingdee.bos.qing.dpp.engine.flink.transform.sink.qs.QDppQsFileSinkFunction.2
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public Void m42call() throws Exception {
                    iSinkedQsFileReceiver.receive(QDppQsFileSinkFunction.this.qsFileName, QDppQsFileSinkFunction.this.fileSubFolder);
                    return null;
                }
            }, 3, 2000L);
        } else {
            if (this.isSideOutput) {
                return;
            }
            final IJobStatusResultReceiver orCreateJobStatusReceiver = RemoteJobResultReceiverFactory.getOrCreateJobStatusReceiver(this.uploadHost, this.port, 5000L, this.rpcVersion);
            QRpcInvokeHelper.retryableCall(new QRpcRemoteCaller<Void>() { // from class: com.kingdee.bos.qing.dpp.engine.flink.transform.sink.qs.QDppQsFileSinkFunction.3
                /* renamed from: call, reason: merged with bridge method [inline-methods] */
                public Void m43call() throws Exception {
                    orCreateJobStatusReceiver.notifyQsFileReady(QDppQsFileSinkFunction.this.jobName, QDppQsFileSinkFunction.this.qsFileName);
                    return null;
                }
            }, 3, 2000L);
        }
    }

    public void close() throws Exception {
        if (!this.finishWriter) {
            try {
                this.qsWriter.finishWriteData();
                this.qsWriter.close((Exception) null);
            } catch (Exception e) {
                this.deleteTempSinkedQs = true;
            }
        }
        if (this.deleteTempSinkedQs && !this.sinkQingFile.delete()) {
            this.log.warn("delete qs file failed,fileName:" + this.qsFileName);
        }
        super.close();
    }
}
