package com.kingdee.bos.qing.dpp.engine.flink.transform.sink.socket;

import com.kingdee.bos.qing.dpp.engine.flink.job.QDppJobContext;
import com.kingdee.bos.qing.dpp.engine.flink.transform.TableCreateDescripter;
import com.kingdee.bos.qing.dpp.engine.flink.transform.model.TransformVertex;
import com.kingdee.bos.qing.dpp.engine.flink.transform.sink.AbstractSinkTableBuilder;
import com.kingdee.bos.qing.dpp.exception.TableBuildException;
import com.kingdee.bos.qing.dpp.model.transform.settings.QDppSocketSinkSettings;
import com.kingdee.bos.qing.dpp.model.transform.settings.SinkSettings;
import java.util.List;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.catalog.Column;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/kingdee/bos/qing/dpp/engine/flink/transform/sink/socket/SocketSinkTableBuilder.class */
public class SocketSinkTableBuilder extends AbstractSinkTableBuilder {
    private static final Logger log = LoggerFactory.getLogger(SocketSinkTableBuilder.class.getName());

    @Override // com.kingdee.bos.qing.dpp.engine.flink.transform.sink.AbstractSinkTableBuilder
    public String buildDynamicSinkTable(QDppJobContext qDppJobContext, TransformVertex transformVertex, Table table, SinkSettings sinkSettings) throws TableBuildException {
        QDppSocketSinkSettings qDppSocketSinkSettings = (QDppSocketSinkSettings) sinkSettings;
        List columns = table.getResolvedSchema().getColumns();
        TableCreateDescripter.Builder tableDescripBuilder = TableCreateDescripter.tableDescripBuilder(false);
        tableDescripBuilder.withTableName(qDppSocketSinkSettings.getTmpTableName()).withPrimaryKey(null).withOption("connector", QDppSocketDynamicSinkFactory.SINK_TYPE).withOption("receiverRefId", qDppSocketSinkSettings.getSinkRefId());
        columns.forEach(column -> {
            tableDescripBuilder.withColumn(Column.physical(column.getName(), column.getDataType()));
        });
        String generateCreateSql = tableDescripBuilder.build().generateCreateSql();
        log.info("socket sink table sql:" + generateCreateSql);
        qDppJobContext.getTableEnv().executeSql(generateCreateSql);
        return qDppSocketSinkSettings.getTmpTableName();
    }
}
