package org.apache.iotdb.db.mpp.execution.datatransfer;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.commons.lang3.Validate;
import org.apache.iotdb.common.rpc.thrift.TEndPoint;
import org.apache.iotdb.commons.client.IClientManager;
import org.apache.iotdb.commons.client.sync.SyncDataNodeDataBlockServiceClient;
import org.apache.iotdb.db.mpp.execution.datatransfer.DataBlockManager;
import org.apache.iotdb.db.mpp.execution.memory.LocalMemoryManager;
import org.apache.iotdb.mpp.rpc.thrift.TEndOfDataBlockEvent;
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
import org.apache.iotdb.mpp.rpc.thrift.TNewDataBlockEvent;
import org.apache.iotdb.tsfile.read.common.block.TsBlock;
import org.apache.iotdb.tsfile.read.common.block.column.TsBlockSerde;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle.class */
public class SinkHandle implements ISinkHandle {
    private static final Logger logger = LoggerFactory.getLogger(SinkHandle.class);
    public static final int MAX_ATTEMPT_TIMES = 3;
    private static final long DEFAULT_RETRY_INTERVAL_IN_MS = 1000;
    private final TEndPoint remoteEndpoint;
    private final TFragmentInstanceId remoteFragmentInstanceId;
    private final String remotePlanNodeId;
    private final TFragmentInstanceId localFragmentInstanceId;
    private final LocalMemoryManager localMemoryManager;
    private final ExecutorService executorService;
    private final TsBlockSerde serde;
    private final DataBlockManager.SinkHandleListener sinkHandleListener;
    private final IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> dataBlockServiceClientManager;
    private final LinkedHashMap<Integer, TsBlock> sequenceIdToTsBlock = new LinkedHashMap<>();
    private volatile ListenableFuture<Void> blocked = Futures.immediateFuture((Object) null);
    private int nextSequenceId = 0;
    private long bufferRetainedSizeInBytes = 0;
    private boolean aborted = false;
    private boolean noMoreTsBlocks = false;
    private long retryIntervalInMs = DEFAULT_RETRY_INTERVAL_IN_MS;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/iotdb/db/mpp/execution/datatransfer/SinkHandle$SendNewDataBlockEventTask.class */
    public class SendNewDataBlockEventTask implements Runnable {
        private final int startSequenceId;
        private final List<Long> blockSizes;

        SendNewDataBlockEventTask(int i, List<Long> list) {
            Validate.isTrue(i >= 0, "Start sequence ID should be greater than or equal to zero, but was: " + i + ".", new Object[0]);
            this.startSequenceId = i;
            this.blockSizes = (List) Validate.notNull(list);
        }

        @Override // java.lang.Runnable
        public void run() {
            SinkHandle.logger.info("{} send new data block event [{}, {})", new Object[]{SinkHandle.this, Integer.valueOf(this.startSequenceId), Integer.valueOf(this.startSequenceId + this.blockSizes.size())});
            int i = 0;
            TNewDataBlockEvent tNewDataBlockEvent = new TNewDataBlockEvent(SinkHandle.this.remoteFragmentInstanceId, SinkHandle.this.remotePlanNodeId, SinkHandle.this.localFragmentInstanceId, this.startSequenceId, this.blockSizes);
            while (i < 3) {
                i++;
                try {
                    SyncDataNodeDataBlockServiceClient syncDataNodeDataBlockServiceClient = (SyncDataNodeDataBlockServiceClient) SinkHandle.this.dataBlockServiceClientManager.borrowClient(SinkHandle.this.remoteEndpoint);
                    try {
                        syncDataNodeDataBlockServiceClient.onNewDataBlockEvent(tNewDataBlockEvent);
                        if (syncDataNodeDataBlockServiceClient != null) {
                            syncDataNodeDataBlockServiceClient.close();
                        }
                        return;
                    } finally {
                    }
                } catch (Throwable th) {
                    SinkHandle.logger.error("{} failed to send new data block event due to {}, attempt times: {}", new Object[]{SinkHandle.this, th.getMessage(), Integer.valueOf(i), th});
                    if (i == 3) {
                        SinkHandle.this.sinkHandleListener.onFailure(SinkHandle.this, th);
                    }
                    try {
                        Thread.sleep(SinkHandle.this.retryIntervalInMs);
                    } catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                        SinkHandle.this.sinkHandleListener.onFailure(SinkHandle.this, th);
                    }
                }
            }
        }
    }

    public SinkHandle(TEndPoint tEndPoint, TFragmentInstanceId tFragmentInstanceId, String str, TFragmentInstanceId tFragmentInstanceId2, LocalMemoryManager localMemoryManager, ExecutorService executorService, TsBlockSerde tsBlockSerde, DataBlockManager.SinkHandleListener sinkHandleListener, IClientManager<TEndPoint, SyncDataNodeDataBlockServiceClient> iClientManager) {
        this.remoteEndpoint = (TEndPoint) Validate.notNull(tEndPoint);
        this.remoteFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId);
        this.remotePlanNodeId = (String) Validate.notNull(str);
        this.localFragmentInstanceId = (TFragmentInstanceId) Validate.notNull(tFragmentInstanceId2);
        this.localMemoryManager = (LocalMemoryManager) Validate.notNull(localMemoryManager);
        this.executorService = (ExecutorService) Validate.notNull(executorService);
        this.serde = (TsBlockSerde) Validate.notNull(tsBlockSerde);
        this.sinkHandleListener = (DataBlockManager.SinkHandleListener) Validate.notNull(sinkHandleListener);
        this.dataBlockServiceClientManager = iClientManager;
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public synchronized ListenableFuture<Void> isFull() {
        if (this.aborted) {
            throw new IllegalStateException("Sink handle is aborted.");
        }
        return Futures.nonCancellationPropagating(this.blocked);
    }

    private void submitSendNewDataBlockEventTask(int i, List<Long> list) {
        this.executorService.submit(new SendNewDataBlockEventTask(i, list));
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public synchronized void send(List<TsBlock> list) {
        Validate.notNull(list, "tsBlocks is null", new Object[0]);
        if (this.aborted) {
            throw new IllegalStateException("Sink handle is aborted.");
        }
        if (!this.blocked.isDone()) {
            throw new IllegalStateException("Sink handle is blocked.");
        }
        if (this.noMoreTsBlocks) {
            return;
        }
        long j = 0;
        Iterator<TsBlock> it = list.iterator();
        while (it.hasNext()) {
            j += it.next().getRetainedSizeInBytes();
        }
        ArrayList arrayList = new ArrayList();
        int i = this.nextSequenceId;
        this.blocked = this.localMemoryManager.getQueryPool().reserve(this.localFragmentInstanceId.getQueryId(), j);
        this.bufferRetainedSizeInBytes += j;
        Iterator<TsBlock> it2 = list.iterator();
        while (it2.hasNext()) {
            this.sequenceIdToTsBlock.put(Integer.valueOf(this.nextSequenceId), it2.next());
            this.nextSequenceId++;
        }
        for (int i2 = i; i2 < this.nextSequenceId; i2++) {
            arrayList.add(Long.valueOf(this.sequenceIdToTsBlock.get(Integer.valueOf(i2)).getRetainedSizeInBytes()));
        }
        submitSendNewDataBlockEventTask(i, arrayList);
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public synchronized void send(int i, List<TsBlock> list) {
        throw new UnsupportedOperationException();
    }

    private void sendEndOfDataBlockEvent() throws Exception {
        int i;
        logger.info("{} send end of data block event", this);
        int i2 = 0;
        TEndOfDataBlockEvent tEndOfDataBlockEvent = new TEndOfDataBlockEvent(this.remoteFragmentInstanceId, this.remotePlanNodeId, this.localFragmentInstanceId, this.nextSequenceId - 1);
        while (i2 < 3) {
            i2++;
            try {
                SyncDataNodeDataBlockServiceClient syncDataNodeDataBlockServiceClient = (SyncDataNodeDataBlockServiceClient) this.dataBlockServiceClientManager.borrowClient(this.remoteEndpoint);
                try {
                    syncDataNodeDataBlockServiceClient.onEndOfDataBlockEvent(tEndOfDataBlockEvent);
                    if (syncDataNodeDataBlockServiceClient != null) {
                        syncDataNodeDataBlockServiceClient.close();
                    }
                    return;
                } finally {
                }
            } finally {
                if (i2 == i) {
                }
            }
        }
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public synchronized void setNoMoreTsBlocks() {
        logger.info("{} start to set no-more-tsblocks", this);
        if (this.aborted) {
            return;
        }
        try {
            sendEndOfDataBlockEvent();
            logger.info("{} set noMoreTsBlocks to true", this);
            this.noMoreTsBlocks = true;
            if (isFinished()) {
                logger.info("{} revoke onFinish() of sinkHandleListener", this);
                this.sinkHandleListener.onFinish(this);
            }
            logger.info("{} revoke onEndOfBlocks() of sinkHandleListener", this);
            this.sinkHandleListener.onEndOfBlocks(this);
        } catch (Exception e) {
            throw new RuntimeException("Send EndOfDataBlockEvent failed", e);
        }
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public synchronized void abort() {
        logger.info("{} is being aborted.", this);
        this.sequenceIdToTsBlock.clear();
        this.aborted = true;
        this.bufferRetainedSizeInBytes -= this.localMemoryManager.getQueryPool().tryCancel(this.blocked);
        if (this.bufferRetainedSizeInBytes > 0) {
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), this.bufferRetainedSizeInBytes);
            this.bufferRetainedSizeInBytes = 0L;
        }
        this.sinkHandleListener.onAborted(this);
        logger.info("{} is aborted", this);
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public boolean isAborted() {
        return this.aborted;
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public boolean isFinished() {
        return this.noMoreTsBlocks && this.sequenceIdToTsBlock.isEmpty();
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public long getBufferRetainedSizeInBytes() {
        return this.bufferRetainedSizeInBytes;
    }

    public int getNumOfBufferedTsBlocks() {
        return this.sequenceIdToTsBlock.size();
    }

    ByteBuffer getSerializedTsBlock(int i, int i2) {
        throw new UnsupportedOperationException();
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public ByteBuffer getSerializedTsBlock(int i) throws IOException {
        TsBlock tsBlock = this.sequenceIdToTsBlock.get(Integer.valueOf(i));
        if (tsBlock == null) {
            throw new IllegalStateException("The data block doesn't exist. Sequence ID: " + i);
        }
        return this.serde.serialize(tsBlock);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void acknowledgeTsBlock(int i, int i2) {
        long j = 0;
        synchronized (this) {
            if (this.aborted) {
                return;
            }
            Iterator<Map.Entry<Integer, TsBlock>> it = this.sequenceIdToTsBlock.entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Integer, TsBlock> next = it.next();
                if (next.getKey().intValue() >= i) {
                    if (next.getKey().intValue() >= i2) {
                        break;
                    }
                    j += next.getValue().getRetainedSizeInBytes();
                    this.bufferRetainedSizeInBytes -= next.getValue().getRetainedSizeInBytes();
                    it.remove();
                }
            }
            if (isFinished()) {
                this.sinkHandleListener.onFinish(this);
            }
            this.localMemoryManager.getQueryPool().free(this.localFragmentInstanceId.getQueryId(), j);
        }
    }

    public TEndPoint getRemoteEndpoint() {
        return this.remoteEndpoint;
    }

    public TFragmentInstanceId getRemoteFragmentInstanceId() {
        return this.remoteFragmentInstanceId;
    }

    public String getRemotePlanNodeId() {
        return this.remotePlanNodeId;
    }

    @Override // org.apache.iotdb.db.mpp.execution.datatransfer.ISinkHandle
    public TFragmentInstanceId getLocalFragmentInstanceId() {
        return this.localFragmentInstanceId;
    }

    public String toString() {
        return String.format("Query[%s]-[%s-%s-SinkHandle]:", this.localFragmentInstanceId.queryId, Integer.valueOf(this.localFragmentInstanceId.fragmentId), this.localFragmentInstanceId.instanceId);
    }

    public void setRetryIntervalInMs(long j) {
        this.retryIntervalInMs = j;
    }
}
