package org.apache.iotdb.db.sync.receiver.collector;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.db.exception.sync.PipeDataLoadBearableException;
import org.apache.iotdb.db.exception.sync.PipeDataLoadException;
import org.apache.iotdb.db.sync.conf.SyncPathUtil;
import org.apache.iotdb.db.sync.pipedata.PipeData;
import org.apache.iotdb.db.sync.pipedata.queue.BufferedPipeDataQueue;
import org.apache.iotdb.db.sync.pipedata.queue.PipeDataQueueFactory;
import org.apache.iotdb.db.sync.receiver.manager.PipeMessage;
import org.apache.iotdb.db.sync.receiver.manager.ReceiverManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/receiver/collector/Collector.class */
public class Collector {
    private static final Logger logger = LoggerFactory.getLogger(Collector.class);
    private static final int WAIT_TIMEOUT = 2000;
    private ExecutorService executorService;
    private Map<String, Future> taskFutures = new ConcurrentHashMap();

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/iotdb/db/sync/receiver/collector/Collector$ScanTask.class */
    public class ScanTask implements Runnable {
        private final String pipeName;
        private final String remoteIp;
        private final long createTime;

        private ScanTask(String str, String str2, long j) {
            this.pipeName = str;
            this.remoteIp = str2;
            this.createTime = j;
        }

        @Override // java.lang.Runnable
        public void run() {
            BufferedPipeDataQueue bufferedPipeDataQueue = PipeDataQueueFactory.getBufferedPipeDataQueue(SyncPathUtil.getReceiverPipeLogDir(this.pipeName, this.remoteIp, this.createTime));
            while (!Thread.currentThread().isInterrupted()) {
                PipeData pipeData = null;
                try {
                    pipeData = bufferedPipeDataQueue.take();
                    Collector.logger.info("Start load pipeData with serialize number {} and type {},value={}", new Object[]{Long.valueOf(pipeData.getSerialNumber()), pipeData.getType(), pipeData});
                    pipeData.createLoader().load();
                    bufferedPipeDataQueue.commit();
                    Collector.logger.info("Commit pipeData with serialize number {}", Long.valueOf(pipeData.getSerialNumber()));
                } catch (InterruptedException e) {
                    Collector.logger.warn("Be interrupted when waiting for pipe data");
                    Thread.currentThread().interrupt();
                    return;
                } catch (PipeDataLoadBearableException e2) {
                    Collector.logger.warn(e2.getMessage());
                    ReceiverManager.getInstance().writePipeMessage(this.pipeName, this.remoteIp, this.createTime, new PipeMessage(PipeMessage.MsgType.WARN, e2.getMessage()));
                    bufferedPipeDataQueue.commit();
                } catch (PipeDataLoadException e3) {
                    String format = pipeData != null ? String.format("Cannot load pipeData with serialize number %d and type %s, because %s", Long.valueOf(pipeData.getSerialNumber()), pipeData.getType(), e3.getMessage()) : String.format("Cannot load pipeData because %s", e3.getMessage());
                    Collector.logger.error(format);
                    ReceiverManager.getInstance().writePipeMessage(this.pipeName, this.remoteIp, this.createTime, new PipeMessage(PipeMessage.MsgType.ERROR, format));
                    return;
                }
            }
        }
    }

    public void startCollect() {
        this.executorService = IoTDBThreadPoolFactory.newCachedThreadPool(ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
    }

    public void stopCollect() {
        Iterator<Future> it = this.taskFutures.values().iterator();
        while (it.hasNext()) {
            it.next().cancel(true);
        }
        if (this.executorService != null) {
            this.executorService.shutdownNow();
            int i = WAIT_TIMEOUT;
            while (!this.executorService.isTerminated()) {
                try {
                    if (!this.executorService.awaitTermination(2000L, TimeUnit.MILLISECONDS)) {
                        logger.info("{} thread pool doesn't exit after {}ms.", ThreadName.SYNC_RECEIVER_COLLECTOR.getName(), Integer.valueOf(i));
                    }
                    i += WAIT_TIMEOUT;
                } catch (InterruptedException e) {
                    logger.error("Interrupted while waiting {} thread pool to exit. ", ThreadName.SYNC_RECEIVER_COLLECTOR.getName());
                    Thread.currentThread().interrupt();
                }
            }
            this.executorService = null;
        }
    }

    public void startPipe(String str, String str2, long j) {
        String receiverPipeDirName = SyncPathUtil.getReceiverPipeDirName(str, str2, j);
        synchronized (receiverPipeDirName.intern()) {
            if (!this.taskFutures.containsKey(receiverPipeDirName)) {
                this.taskFutures.put(receiverPipeDirName, this.executorService.submit(new ScanTask(str, str2, j)));
            }
        }
    }

    public void stopPipe(String str, String str2, long j) {
        String receiverPipeDirName = SyncPathUtil.getReceiverPipeDirName(str, str2, j);
        logger.info("try stop task key={}", receiverPipeDirName);
        synchronized (receiverPipeDirName.intern()) {
            if (this.taskFutures.containsKey(receiverPipeDirName)) {
                this.taskFutures.get(receiverPipeDirName).cancel(true);
                this.taskFutures.remove(receiverPipeDirName);
                logger.info("stop task success, key={}", receiverPipeDirName);
            }
        }
    }
}
