package org.apache.iotdb.db.sync.sender.service;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
import org.apache.iotdb.commons.concurrent.ThreadName;
import org.apache.iotdb.db.exception.SyncConnectionException;
import org.apache.iotdb.db.sync.conf.SyncConstant;
import org.apache.iotdb.db.sync.sender.pipe.IoTDBPipeSink;
import org.apache.iotdb.db.sync.sender.pipe.Pipe;
import org.apache.iotdb.db.sync.transport.client.ITransportClient;
import org.apache.iotdb.db.sync.transport.client.TransportClient;
import org.apache.iotdb.service.transport.thrift.RequestType;
import org.apache.iotdb.service.transport.thrift.SyncRequest;
import org.apache.iotdb.service.transport.thrift.SyncResponse;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/sync/sender/service/TransportHandler.class */
public class TransportHandler {
    private static final Logger logger = LoggerFactory.getLogger(TransportHandler.class);
    private static TransportHandler DEBUG_TRANSPORT_HANDLER = null;
    private String pipeName;
    private long createTime;
    private final String localIp;
    protected ITransportClient transportClient;
    protected ExecutorService transportExecutorService;
    private Future transportFuture;
    protected ScheduledExecutorService heartbeatExecutorService;
    private Future heartbeatFuture;

    public TransportHandler(Pipe pipe, IoTDBPipeSink ioTDBPipeSink) {
        String str;
        this.pipeName = pipe.getName();
        this.createTime = pipe.getCreateTime();
        this.transportClient = new TransportClient(pipe, ioTDBPipeSink.getIp(), ioTDBPipeSink.getPort());
        this.transportExecutorService = IoTDBThreadPoolFactory.newSingleThreadExecutor(ThreadName.SYNC_SENDER_PIPE.getName() + "-" + this.pipeName);
        this.heartbeatExecutorService = IoTDBThreadPoolFactory.newSingleThreadScheduledExecutor(ThreadName.SYNC_SENDER_HEARTBEAT.getName() + "-" + this.pipeName);
        try {
            str = InetAddress.getLocalHost().getHostAddress();
        } catch (UnknownHostException e) {
            logger.error(String.format("Get local host error when create transport handler, because %s.", e));
            str = SyncConstant.UNKNOWN_IP;
        }
        this.localIp = str;
    }

    public void start() {
        this.transportFuture = this.transportExecutorService.submit(this.transportClient);
        this.heartbeatFuture = this.heartbeatExecutorService.scheduleWithFixedDelay(this::sendHeartbeat, 0L, SyncConstant.HEARTBEAT_DELAY_SECONDS.longValue(), TimeUnit.SECONDS);
    }

    public void stop() {
        if (this.transportFuture != null) {
            this.transportFuture.cancel(true);
        }
        if (this.heartbeatFuture != null) {
            this.heartbeatFuture.cancel(true);
        }
    }

    public boolean close() throws InterruptedException {
        this.transportExecutorService.shutdownNow();
        boolean awaitTermination = this.transportExecutorService.awaitTermination(SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS.longValue(), TimeUnit.MILLISECONDS);
        this.heartbeatExecutorService.shutdownNow();
        return awaitTermination & this.heartbeatExecutorService.awaitTermination(SyncConstant.DEFAULT_WAITING_FOR_STOP_MILLISECONDS.longValue(), TimeUnit.MILLISECONDS);
    }

    public SyncResponse sendMsg(RequestType requestType) throws SyncConnectionException {
        return this.transportClient.heartbeat(new SyncRequest(requestType, this.pipeName, this.localIp, this.createTime));
    }

    private void sendHeartbeat() {
        try {
            SenderService.getInstance().receiveMsg(this.transportClient.heartbeat(new SyncRequest(RequestType.HEARTBEAT, this.pipeName, this.localIp, this.createTime)));
        } catch (SyncConnectionException e) {
            logger.warn(String.format("Pipe %s sends heartbeat to receiver error, skip this time, because %s.", this.pipeName, e));
        }
    }

    public static TransportHandler getNewTransportHandler(Pipe pipe, IoTDBPipeSink ioTDBPipeSink) {
        if (DEBUG_TRANSPORT_HANDLER == null) {
            return new TransportHandler(pipe, ioTDBPipeSink);
        }
        DEBUG_TRANSPORT_HANDLER.resetTransportClient(pipe);
        return DEBUG_TRANSPORT_HANDLER;
    }

    public static void setDebugTransportHandler(TransportHandler transportHandler) {
        DEBUG_TRANSPORT_HANDLER = transportHandler;
    }

    protected void resetTransportClient(Pipe pipe) {
        this.pipeName = pipe.getName();
        this.createTime = pipe.getCreateTime();
    }
}
