package org.apache.iotdb.db.sync.sender.pipe;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.iotdb.commons.exception.MetadataException;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.engine.modification.Deletion;
import org.apache.iotdb.db.exception.sync.PipeException;
import org.apache.iotdb.db.qp.physical.PhysicalPlan;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.DeletionPipeData;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.SchemaPipeData;
import org.apache.iotdb.db.sync.pipedata.TsFilePipeData;
import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
import org.apache.iotdb.db.sync.sender.manager.SchemaSyncManager;
import org.apache.iotdb.db.sync.sender.manager.TsFileSyncManager;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.sender.recovery.TsFilePipeLogger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/sender/pipe/TsFilePipe.class */
public class TsFilePipe implements Pipe {
    private static final Logger logger = LoggerFactory.getLogger(TsFilePipe.class);
    private final long createTime;
    private final String name;
    private final PipeSink pipeSink;
    private final long dataStartTime;
    private final boolean syncDelOp;
    private final BufferedPipeDataQueue historyQueue;
    private final BufferedPipeDataQueue realTimeQueue;
    private long maxSerialNumber;
    private final SchemaSyncManager schemaSyncManager = SchemaSyncManager.getInstance();
    private final TsFileSyncManager tsFileSyncManager = TsFileSyncManager.getInstance();
    private final TsFilePipeLogger pipeLog = new TsFilePipeLogger(this);
    private final ReentrantLock collectRealTimeDataLock = new ReentrantLock();
    private boolean isCollectingRealTimeData = false;
    private Pipe.PipeStatus status = Pipe.PipeStatus.STOP;

    public TsFilePipe(long j, String str, PipeSink pipeSink, long j2, boolean z) {
        this.createTime = j;
        this.name = str;
        this.pipeSink = pipeSink;
        this.dataStartTime = j2;
        this.syncDelOp = z;
        this.historyQueue = new BufferedPipeDataQueue(SyncPathUtil.getSenderHistoryPipeLogDir(str, j));
        this.realTimeQueue = new BufferedPipeDataQueue(SyncPathUtil.getSenderRealTimePipeLogDir(str, j));
        this.maxSerialNumber = Math.max(0L, this.realTimeQueue.getLastMaxSerialNumber());
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized void start() throws PipeException {
        if (this.status == Pipe.PipeStatus.DROP) {
            throw new PipeException(String.format("Can not start pipe %s, because the pipe has been drop.", this.name));
        }
        if (this.status == Pipe.PipeStatus.RUNNING) {
            return;
        }
        try {
            if (!this.pipeLog.isCollectFinished()) {
                this.pipeLog.clear();
                collectData();
                this.pipeLog.finishCollect();
            }
            if (!this.isCollectingRealTimeData) {
                registerMetadata();
                registerTsFile();
                this.isCollectingRealTimeData = true;
            }
            this.status = Pipe.PipeStatus.RUNNING;
        } catch (IOException e) {
            logger.error(String.format("Clear pipe dir %s error.", SyncPathUtil.getSenderPipeDir(this.name, this.createTime)), e);
            throw new PipeException("Start error, can not clear pipe log.");
        }
    }

    private void collectData() {
        registerMetadata();
        List<PhysicalPlan> collectHistoryMetadata = collectHistoryMetadata();
        List<File> registerAndCollectHistoryTsFile = registerAndCollectHistoryTsFile();
        this.isCollectingRealTimeData = true;
        int size = collectHistoryMetadata.size();
        int size2 = registerAndCollectHistoryTsFile.size();
        for (int i = 0; i < size; i++) {
            this.historyQueue.offer(new SchemaPipeData(collectHistoryMetadata.get(i), ((1 - size2) - size) + i));
        }
        for (int i2 = 0; i2 < size2; i2++) {
            long j = (1 - size2) + i2;
            File file = registerAndCollectHistoryTsFile.get(i2);
            this.historyQueue.offer(new TsFilePipeData(file.getParent(), file.getName(), j));
        }
    }

    private void registerMetadata() {
        this.schemaSyncManager.registerSyncTask(this);
    }

    private void deregisterMetadata() {
        this.schemaSyncManager.deregisterSyncTask();
    }

    private List<PhysicalPlan> collectHistoryMetadata() {
        return this.schemaSyncManager.collectHistoryMetadata();
    }

    public void collectRealTimeMetaData(PhysicalPlan physicalPlan) {
        this.collectRealTimeDataLock.lock();
        try {
            this.maxSerialNumber++;
            this.realTimeQueue.offer(new SchemaPipeData(physicalPlan, this.maxSerialNumber));
        } finally {
            this.collectRealTimeDataLock.unlock();
        }
    }

    private void registerTsFile() {
        this.tsFileSyncManager.registerSyncTask(this);
    }

    private void deregisterTsFile() {
        this.tsFileSyncManager.deregisterSyncTask();
    }

    private List<File> registerAndCollectHistoryTsFile() {
        return this.tsFileSyncManager.registerAndCollectHistoryTsFile(this, this.dataStartTime);
    }

    public File createHistoryTsFileHardlink(File file, long j) {
        this.collectRealTimeDataLock.lock();
        try {
            try {
                if (this.pipeLog.isHardlinkExist(file)) {
                    return null;
                }
                File createTsFileAndModsHardlink = this.pipeLog.createTsFileAndModsHardlink(file, j);
                this.collectRealTimeDataLock.unlock();
                return createTsFileAndModsHardlink;
            } catch (IOException e) {
                logger.error(String.format("Create hardlink for history tsfile %s error.", file.getPath()), e);
                this.collectRealTimeDataLock.unlock();
                return null;
            }
        } finally {
            this.collectRealTimeDataLock.unlock();
        }
    }

    public void collectRealTimeDeletion(Deletion deletion) {
        this.collectRealTimeDataLock.lock();
        try {
            try {
                if (!this.syncDelOp) {
                    this.collectRealTimeDataLock.unlock();
                    return;
                }
                Iterator<PartialPath> it = this.schemaSyncManager.splitPathPatternByDevice(deletion.getPath()).iterator();
                while (it.hasNext()) {
                    Deletion deletion2 = new Deletion(it.next(), deletion.getFileOffset(), deletion.getStartTime(), deletion.getEndTime());
                    this.maxSerialNumber++;
                    this.realTimeQueue.offer(new DeletionPipeData(deletion2, this.maxSerialNumber));
                }
                this.collectRealTimeDataLock.unlock();
            } catch (MetadataException e) {
                logger.warn(String.format("Collect deletion %s error.", deletion), e);
                this.collectRealTimeDataLock.unlock();
            }
        } catch (Throwable th) {
            this.collectRealTimeDataLock.unlock();
            throw th;
        }
    }

    public void collectRealTimeTsFile(File file) {
        this.collectRealTimeDataLock.lock();
        try {
            try {
            } catch (IOException e) {
                logger.warn(String.format("Create Hardlink tsfile %s on disk error, serial number is %d.", file.getPath(), Long.valueOf(this.maxSerialNumber)), e);
                this.collectRealTimeDataLock.unlock();
            }
            if (this.pipeLog.isHardlinkExist(file)) {
                this.collectRealTimeDataLock.unlock();
                return;
            }
            this.maxSerialNumber++;
            File createTsFileHardlink = this.pipeLog.createTsFileHardlink(file);
            this.realTimeQueue.offer(new TsFilePipeData(createTsFileHardlink.getParent(), createTsFileHardlink.getName(), this.maxSerialNumber));
            this.collectRealTimeDataLock.unlock();
        } catch (Throwable th) {
            this.collectRealTimeDataLock.unlock();
            throw th;
        }
    }

    public void collectRealTimeResource(File file) {
        try {
            this.pipeLog.createTsFileResourceHardlink(file);
        } catch (IOException e) {
            logger.warn(String.format("Record tsfile resource %s on disk error.", file.getPath()), e);
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public PipeData take() throws InterruptedException {
        return !this.historyQueue.isEmpty() ? this.historyQueue.take() : this.realTimeQueue.take();
    }

    public List<PipeData> pull(long j) {
        ArrayList arrayList = new ArrayList();
        if (!this.historyQueue.isEmpty()) {
            arrayList.addAll(this.historyQueue.pull(j));
        }
        if (j > 0) {
            arrayList.addAll(this.realTimeQueue.pull(j));
        }
        return arrayList;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public void commit() {
        if (!this.historyQueue.isEmpty()) {
            this.historyQueue.commit();
        }
        this.realTimeQueue.commit();
    }

    public void commit(long j) {
        if (!this.historyQueue.isEmpty()) {
            this.historyQueue.commit(j);
        }
        if (j > 0) {
            this.realTimeQueue.commit(j);
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized void stop() throws PipeException {
        if (this.status == Pipe.PipeStatus.DROP) {
            throw new PipeException(String.format("Can not stop pipe %s, because the pipe is drop.", this.name));
        }
        if (!this.isCollectingRealTimeData) {
            registerMetadata();
            registerTsFile();
            this.isCollectingRealTimeData = true;
        }
        this.status = Pipe.PipeStatus.STOP;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized void drop() throws PipeException {
        if (this.status == Pipe.PipeStatus.DROP) {
            return;
        }
        clear();
        this.status = Pipe.PipeStatus.DROP;
    }

    private void clear() {
        deregisterMetadata();
        deregisterTsFile();
        this.isCollectingRealTimeData = false;
        try {
            this.historyQueue.clear();
            this.realTimeQueue.clear();
            this.pipeLog.clear();
        } catch (IOException e) {
            logger.warn(String.format("Clear pipe %s %d error.", this.name, Long.valueOf(this.createTime)), e);
        }
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public void close() throws PipeException {
        if (this.status == Pipe.PipeStatus.DROP) {
            return;
        }
        deregisterMetadata();
        deregisterTsFile();
        this.isCollectingRealTimeData = false;
        this.historyQueue.close();
        this.realTimeQueue.close();
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public String getName() {
        return this.name;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public PipeSink getPipeSink() {
        return this.pipeSink;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public long getCreateTime() {
        return this.createTime;
    }

    @Override // org.apache.iotdb.db.sync.sender.pipe.Pipe
    public synchronized Pipe.PipeStatus getStatus() {
        return this.status;
    }

    public String toString() {
        return "TsFilePipe{createTime=" + this.createTime + ", name='" + this.name + "', pipeSink=" + this.pipeSink + ", dataStartTime=" + this.dataStartTime + ", syncDelOp=" + this.syncDelOp + ", pipeLog=" + this.pipeLog + ", isCollectingRealTimeData=" + this.isCollectingRealTimeData + ", maxSerialNumber=" + this.maxSerialNumber + ", status=" + this.status + '}';
    }

    public boolean equals(Object obj) {
        if (this == obj) {
            return true;
        }
        if (obj == null || getClass() != obj.getClass()) {
            return false;
        }
        TsFilePipe tsFilePipe = (TsFilePipe) obj;
        return this.createTime == tsFilePipe.createTime && Objects.equals(this.name, tsFilePipe.name) && Objects.equals(this.pipeSink, tsFilePipe.pipeSink);
    }

    public int hashCode() {
        return Objects.hash(Long.valueOf(this.createTime), this.name, this.pipeSink);
    }
}
