package com.bes.mq.file;

import com.bes.mq.BESMQConnection;
import com.bes.mq.BESMQMessageConsumer;
import com.bes.mq.command.BESMQDestination;
import com.bes.mq.command.BESMQFileMessage;
import com.bes.mq.command.FileChunk;
import com.bes.mq.command.FileRequest;
import com.bes.mq.command.FileRequestAck;
import com.bes.mq.command.MessageId;
import com.bes.mq.command.Response;
import com.bes.mq.org.slf4j.Logger;
import com.bes.mq.org.slf4j.LoggerFactory;
import com.bes.mq.transport.TransportListenerAdapter;
import com.bes.mq.transport.stomp.Stomp;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.io.UnsupportedEncodingException;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicReference;
import javax.jms.JMSException;

/* loaded from: input_file:com/bes/mq/file/FileRequestor.class */
public class FileRequestor extends TransportListenerAdapter {
    private static final Logger LOG = LoggerFactory.getLogger(FileRequestor.class);
    private final BESMQMessageConsumer consumer;
    private final BESMQConnection connection;
    private boolean retransferMode;
    private FileChannel metaChannel;
    private FileLock metaFileLock;
    private String uid;
    private byte[] uidBytes;
    private volatile long currentPos;
    private long size;
    private MessageId messageId;
    private String filePath;
    private File metaFile = null;
    private RandomAccessFile metaRaf = null;
    private File file = null;
    private RandomAccessFile fileRaf = null;
    private volatile boolean stopped = false;
    private volatile boolean complete = false;
    private volatile String destPath = null;
    private long startRequestTime = 0;
    private long lastFlushTime = 0;
    private AtomicReference<Exception> lastException = new AtomicReference<>();
    private AtomicReference<FileRequest> fileRequestRef = new AtomicReference<>();
    private AtomicReference<FileRequestAck> fileRequestAckRef = new AtomicReference<>();
    private Map<BESMQMessageConsumer, BESMQFileMessage> relatedConsumersAndMsgs = new ConcurrentHashMap();
    private long lastCur = 0;
    private long lastTime = System.currentTimeMillis();

    public FileRequestor(BESMQConnection bESMQConnection, BESMQMessageConsumer bESMQMessageConsumer, String str) {
        this.uid = null;
        this.uidBytes = null;
        this.connection = bESMQConnection;
        this.consumer = bESMQMessageConsumer;
        this.uid = str;
        try {
            this.uidBytes = str.getBytes("utf-8");
        } catch (UnsupportedEncodingException e) {
            this.uidBytes = str.getBytes();
        }
        this.retransferMode = bESMQConnection.getFileTransferPolicy().isResumeMode();
        File file = new File(bESMQConnection.getFileTransferPolicy().getMetaDir());
        if (!file.exists()) {
            file.mkdirs();
        }
        File file2 = new File(bESMQConnection.getFileTransferPolicy().getRecvFilesDir());
        if (!file2.exists()) {
            file2.mkdirs();
        }
        this.connection.addTransportListener(this);
    }

    public void attatched(BESMQMessageConsumer bESMQMessageConsumer, BESMQFileMessage bESMQFileMessage) {
        this.relatedConsumersAndMsgs.put(bESMQMessageConsumer, bESMQFileMessage);
    }

    public Response processFileMessage(BESMQFileMessage bESMQFileMessage) throws JMSException {
        this.messageId = bESMQFileMessage.getMessageId();
        this.filePath = bESMQFileMessage.getFilePath();
        if (this.complete) {
            LOG.info("Request file " + this.filePath + " has finished(transportResumed)");
            return null;
        }
        this.startRequestTime = System.currentTimeMillis();
        FileRequest readFileMeta = readFileMeta(bESMQFileMessage);
        if (this.complete) {
            LOG.info("Request file " + this.filePath + " has finished");
            return null;
        }
        this.connection.asyncSendPacket(readFileMeta);
        this.fileRequestRef.set(readFileMeta);
        updateProgress(this.size, this.currentPos, true);
        LOG.info("Start to request file " + this.filePath + ",  pos " + this.currentPos);
        return null;
    }

    public Response processFileRequestAck(FileRequestAck fileRequestAck) throws JMSException {
        this.fileRequestAckRef.set(fileRequestAck);
        try {
            if (fileRequestAck.getResultCode() == 0) {
                this.size = fileRequestAck.getSize();
                LOG.info("Receive request ack for file " + this.filePath + ", size " + this.size);
                updateProgress(this.size, this.currentPos, true);
                if (this.size == this.currentPos) {
                    LOG.info("Request file " + this.filePath + " has completed");
                    onComplete();
                } else if (this.fileRaf == null) {
                    this.fileRaf = new RandomAccessFile(this.file, "rw");
                }
            } else {
                onComplete();
            }
            return null;
        } catch (IOException e) {
            LOG.error("Failed to process request ack for file " + this.filePath, (Throwable) e);
            return null;
        }
    }

    public Response processFileChunk(FileChunk fileChunk) throws JMSException {
        if (this.stopped) {
            LOG.warn("FileMessageRequestor has stopped, but received FileChunk, uid " + fileChunk.getUid());
            return null;
        }
        if (this.fileRequestAckRef.get() == null) {
            LOG.warn("FileRequestAck hasn't received, but received FileChunk, uid " + fileChunk.getUid());
            return null;
        }
        if (this.fileRequestAckRef.get().getResultCode() == 1) {
            LOG.warn("FileRequestAck marked with FILE_NOT_FOUND, but received FileChunk, uid " + fileChunk.getUid());
            return null;
        }
        try {
            if (fileChunk.getPosition() != this.currentPos) {
                throw new JMSException("Invliad file chunk for FileMessage,uid " + this.uid);
            }
            this.fileRaf.seek(this.currentPos);
            this.fileRaf.write(fileChunk.getContent());
            this.currentPos += fileChunk.getContent().length;
            updateProgress(this.size, this.currentPos, false);
            writeFileMeta(false);
            if (this.currentPos == this.size) {
                LOG.info("Finished to request file " + this.filePath + ", cost " + (System.currentTimeMillis() - this.startRequestTime) + "ms");
                try {
                    if (this.fileRaf != null) {
                        this.fileRaf.close();
                        this.fileRaf = null;
                    }
                } catch (IOException e) {
                }
                onComplete();
            }
            return null;
        } catch (Exception e2) {
            LOG.error("Failed to process file chunk for file " + this.filePath, (Throwable) e2);
            this.lastException.set(e2);
            stop();
            return null;
        }
    }

    public boolean isStopped() {
        return this.stopped;
    }

    public void stop() {
        if (this.stopped) {
            return;
        }
        this.stopped = true;
        writeFileMeta(true);
        try {
            if (this.metaFileLock != null) {
                this.metaFileLock.release();
            }
        } catch (IOException e) {
        }
        try {
            if (this.metaChannel != null) {
                this.metaChannel.close();
            }
        } catch (Exception e2) {
        }
        try {
            if (this.metaRaf != null) {
                this.metaRaf.close();
                this.metaRaf = null;
            }
        } catch (IOException e3) {
        }
        try {
            if (this.fileRaf != null) {
                this.fileRaf.close();
                this.fileRaf = null;
            }
        } catch (IOException e4) {
        }
        updateProgress(this.size, this.currentPos, true);
        this.connection.removeTransportListener(this);
        synchronized (this) {
            notifyAll();
        }
    }

    private void onComplete() throws JMSException {
        this.complete = true;
        stop();
    }

    private FileRequest readFileMeta(BESMQFileMessage bESMQFileMessage) throws JMSException {
        File file = new File(this.connection.getFileTransferPolicy().getMetaDir());
        if (!file.exists()) {
            file.mkdirs();
        }
        File file2 = new File(this.connection.getFileTransferPolicy().getRecvFilesDir());
        if (!file2.exists()) {
            file2.mkdirs();
        }
        this.currentPos = 0L;
        this.file = new File(file2, this.uid);
        this.metaFile = new File(file, getMetaFileName());
        if (this.retransferMode) {
            if (this.metaFile.exists()) {
                try {
                    this.metaRaf = new RandomAccessFile(this.metaFile, "rw");
                    if (this.metaFile.length() > 0) {
                        this.metaRaf.readByte();
                        int readInt = this.metaRaf.readInt();
                        if (readInt > 0) {
                            byte[] bArr = new byte[readInt];
                            this.metaRaf.read(bArr);
                            String str = new String(bArr, "UTF8");
                            if (str.equals(this.uid)) {
                                this.size = this.metaRaf.readLong();
                                this.currentPos = this.metaRaf.readLong();
                                this.complete = this.size == this.currentPos;
                                LOG.info("Found resume meta for FileMessage, uid: " + this.uid + ", size: " + this.size + " pos: " + this.currentPos + " file: " + bESMQFileMessage.getFilePath());
                            } else {
                                LOG.warn("Invalid file uid, expected " + this.uid + ", saved " + str + ", will request file from start position. file: " + bESMQFileMessage.getFilePath());
                            }
                        } else {
                            LOG.warn("Invalid file uid len" + readInt + ", will request file from start position. file: " + bESMQFileMessage.getFilePath());
                        }
                    }
                } catch (IOException e) {
                    LOG.warn("Read file meta failed, will request file from start position. file: " + bESMQFileMessage.getFilePath(), (Throwable) e);
                }
            } else {
                try {
                    this.metaFile.createNewFile();
                    this.metaRaf = new RandomAccessFile(this.metaFile, "rw");
                } catch (IOException e2) {
                    JMSException jMSException = new JMSException("Create meta file failed: " + this.metaFile.getAbsolutePath());
                    jMSException.initCause(e2);
                    throw jMSException;
                }
            }
            try {
                this.metaChannel = this.metaRaf.getChannel();
                this.metaFileLock = this.metaChannel.tryLock();
                if (this.metaFileLock == null) {
                    throw new JMSException("Can't hold the meta file lock, may be transfer is in progress: " + this.metaFile.getAbsolutePath());
                }
            } catch (IOException e3) {
                JMSException jMSException2 = new JMSException("Can't hold the meta file lock, may be transfer is in progress: " + this.metaFile.getAbsolutePath());
                jMSException2.initCause(e3);
                throw jMSException2;
            }
        }
        if (!this.file.exists()) {
            try {
                this.file.createNewFile();
            } catch (IOException e4) {
                JMSException jMSException3 = new JMSException("Can't create file " + this.file.getAbsolutePath());
                jMSException3.initCause(e4);
                throw jMSException3;
            }
        } else if (this.file.length() < this.currentPos) {
            LOG.warn("Invalid file len, saved " + this.currentPos + " got " + this.file.length() + ", will request file from start position");
            this.currentPos = 0L;
        }
        FileRequest fileRequest = new FileRequest();
        fileRequest.setResponseRequired(true);
        fileRequest.setUid(this.uid);
        fileRequest.setPosition(this.currentPos);
        fileRequest.setChunkSize(this.connection.getFileTransferPolicy().getChunkSize());
        fileRequest.setDispatchAsync(this.consumer.getConsumerInfo().isDispatchAsync());
        fileRequest.setResumeMode(this.retransferMode);
        fileRequest.setDeliveryMode(bESMQFileMessage.getJMSDeliveryMode());
        fileRequest.setDestination((BESMQDestination) bESMQFileMessage.getJMSDestination());
        return fileRequest;
    }

    private void writeFileMeta(boolean z) {
        if (this.retransferMode) {
            long currentTimeMillis = System.currentTimeMillis();
            if (z || this.lastFlushTime == 0 || currentTimeMillis - this.lastFlushTime > this.connection.getFileTransferPolicy().getMetaFlushPeriodInMills()) {
                this.lastFlushTime = currentTimeMillis;
                try {
                    this.metaRaf.seek(0L);
                    this.metaRaf.writeByte(1);
                    this.metaRaf.writeInt(this.uidBytes.length);
                    this.metaRaf.write(this.uidBytes);
                    this.metaRaf.writeLong(this.size);
                    this.metaRaf.writeLong(this.currentPos);
                } catch (IOException e) {
                    LOG.error("Failed to write meta file " + this.uid, (Throwable) e);
                }
            }
        }
    }

    private String getMetaFileName() throws JMSException {
        return this.uid + ".meta";
    }

    public void waitForComplete(BESMQMessageConsumer bESMQMessageConsumer, BESMQFileMessage bESMQFileMessage, long j, Runnable runnable) throws JMSException {
        if (!this.relatedConsumersAndMsgs.containsKey(bESMQMessageConsumer)) {
            LOG.error("FileMessageRequestor not contains the consumer: " + bESMQMessageConsumer);
            return;
        }
        if (!this.complete && !this.stopped) {
            synchronized (this) {
                if (!this.complete && !this.stopped) {
                    try {
                        if (j < 0) {
                            wait();
                        } else if (j != 0) {
                            wait(j);
                        }
                    } catch (InterruptedException e) {
                        throw new JMSException("FileMessageRequestor waitForCompele Interrupted");
                    }
                }
            }
        }
        if (!this.complete) {
            if (this.stopped || j < 0) {
                JMSException jMSException = new JMSException("Failed to request file " + this.filePath);
                if (this.lastException.get() != null) {
                    jMSException.initCause(this.lastException.get());
                }
                throw jMSException;
            }
            if (runnable != null) {
                runnable.run();
                return;
            }
            return;
        }
        boolean isReadOnlyBody = bESMQFileMessage.isReadOnlyBody();
        boolean isReadOnlyProperties = bESMQFileMessage.isReadOnlyProperties();
        try {
            bESMQFileMessage.setReadOnlyBody(false);
            bESMQFileMessage.setReadOnlyProperties(false);
            if (this.fileRequestAckRef.get() == null || this.fileRequestAckRef.get().getResultCode() != 1) {
                if (this.destPath == null) {
                    String fileName = bESMQFileMessage.getFileName();
                    String stringProperty = bESMQFileMessage.getStringProperty(FileTransferConstants.RENMAE_TO);
                    if (stringProperty != null && stringProperty.length() > 0) {
                        fileName = stringProperty;
                    }
                    if (fileName.indexOf(Stomp.Headers.SEPERATOR) > -1) {
                        fileName = fileName.replaceAll(Stomp.Headers.SEPERATOR, "_");
                    }
                    File file = new File(this.connection.getFileTransferPolicy().getRecvFilesDir(), fileName);
                    if (file.getParentFile() != null) {
                        file.getParentFile().mkdirs();
                    }
                    if (file.exists() && !this.connection.getFileTransferPolicy().isOverwrite()) {
                        file = new File(file.getPath() + BESMQDestination.PATH_SEPERATOR + this.uid);
                    }
                    boolean renameTo = this.file.renameTo(file);
                    if (!renameTo && file.exists() && this.connection.getFileTransferPolicy().isOverwrite()) {
                        LOG.warn("Failed to rename file " + this.file.getAbsolutePath() + " to " + file.getAbsolutePath() + ", will try to delete dest file firstly");
                        if (file.delete()) {
                            renameTo = this.file.renameTo(file);
                        } else {
                            LOG.warn("Failed to delete dest file " + file.getAbsolutePath());
                        }
                    }
                    if (renameTo) {
                        this.destPath = file.getAbsolutePath();
                    } else {
                        LOG.warn("Failed to rename file " + this.file.getAbsolutePath() + " to " + file.getAbsolutePath());
                        this.destPath = this.file.getAbsolutePath();
                    }
                }
                bESMQFileMessage.setFilePath(this.destPath);
            } else {
                bESMQFileMessage.setBooleanProperty(FileTransferConstants.FILE_NOT_FOUND, true);
            }
            if (this.currentPos == this.size) {
                if (this.metaFile != null) {
                    this.metaFile.delete();
                }
            } else if (this.metaFile.length() == 0) {
                this.metaFile.delete();
            }
            this.relatedConsumersAndMsgs.remove(bESMQMessageConsumer);
            if (this.relatedConsumersAndMsgs.isEmpty()) {
                this.connection.removeFileMessageRequestor(this.uid);
            }
        } finally {
            bESMQFileMessage.setReadOnlyBody(isReadOnlyBody);
            bESMQFileMessage.setReadOnlyProperties(isReadOnlyProperties);
        }
    }

    private void updateProgress(long j, long j2, boolean z) {
        if (!z) {
            if (((j2 - this.lastCur) * 100) / Math.max(j, 1L) < 1) {
                return;
            }
            this.lastCur = j2;
            long currentTimeMillis = System.currentTimeMillis();
            if (currentTimeMillis - this.lastTime < 1000) {
                return;
            } else {
                this.lastTime = currentTimeMillis;
            }
        }
        double max = (j2 * 100) / Math.max(j, 1L);
        Iterator<BESMQMessageConsumer> it = this.relatedConsumersAndMsgs.keySet().iterator();
        while (it.hasNext()) {
            FileTransferListener fileTransferListener = it.next().getFileTransferListener();
            if (fileTransferListener != null) {
                fileTransferListener.onTransfer(this.messageId, this.filePath, j, j2, max);
            }
        }
    }

    @Override // com.bes.mq.transport.TransportListenerAdapter, com.bes.mq.transport.TransportListener
    public void transportInterupted() {
        LOG.info("The connection transport interupted");
    }

    @Override // com.bes.mq.transport.TransportListenerAdapter, com.bes.mq.transport.TransportListener
    public void transportResumed() {
        if (this.complete) {
            LOG.warn("The connection transport resumed, request file " + this.filePath + " has completed");
        }
        FileRequest fileRequest = this.fileRequestRef.get();
        if (fileRequest == null) {
            LOG.warn("The connection transport resumed, but can't get the previous FileRequest for file " + this.filePath);
            return;
        }
        fileRequest.setPosition(this.currentPos);
        try {
            LOG.info("The connection transport resumed, resend the FileRequest for file " + this.filePath + ", pos " + fileRequest.getPosition());
            this.connection.asyncSendPacket(fileRequest);
        } catch (JMSException e) {
            LOG.error("Failed to resend FileRequest to broker", (Throwable) e);
        }
    }
}
