package org.apache.iotdb.db.protocol.mqtt;

import io.moquette.interception.AbstractInterceptHandler;
import io.moquette.interception.messages.InterceptConnectMessage;
import io.moquette.interception.messages.InterceptDisconnectMessage;
import io.moquette.interception.messages.InterceptPublishMessage;
import io.netty.buffer.ByteBuf;
import java.time.ZoneId;
import java.util.List;
import org.apache.iotdb.common.rpc.thrift.TSStatus;
import org.apache.iotdb.commons.path.PartialPath;
import org.apache.iotdb.db.conf.IoTDBConfig;
import org.apache.iotdb.db.qp.physical.crud.InsertRowPlan;
import org.apache.iotdb.db.query.control.SessionManager;
import org.apache.iotdb.db.service.IoTDB;
import org.apache.iotdb.service.rpc.thrift.TSProtocolVersion;
import org.apache.thrift.TException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/iotdb/db/protocol/mqtt/PublishHandler.class */
public class PublishHandler extends AbstractInterceptHandler {
    private final SessionManager SESSION_MANAGER = SessionManager.getInstance();
    private long sessionId;
    private static final Logger LOG = LoggerFactory.getLogger(PublishHandler.class);
    private final PayloadFormatter payloadFormat;

    public PublishHandler(IoTDBConfig ioTDBConfig) {
        this.payloadFormat = PayloadFormatManager.getPayloadFormat(ioTDBConfig.getMqttPayloadFormatter());
    }

    protected PublishHandler(PayloadFormatter payloadFormatter) {
        this.payloadFormat = payloadFormatter;
    }

    public String getID() {
        return "iotdb-mqtt-broker-listener-" + this.sessionId;
    }

    public void onConnect(InterceptConnectMessage interceptConnectMessage) {
        try {
            this.sessionId = this.SESSION_MANAGER.openSession(interceptConnectMessage.getUsername(), new String(interceptConnectMessage.getPassword()), ZoneId.systemDefault().toString(), TSProtocolVersion.IOTDB_SERVICE_PROTOCOL_V3).getSessionId();
        } catch (TException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    public void onDisconnect(InterceptDisconnectMessage interceptDisconnectMessage) {
        this.SESSION_MANAGER.closeSession(this.sessionId);
    }

    public void onPublish(InterceptPublishMessage interceptPublishMessage) {
        String clientID = interceptPublishMessage.getClientID();
        ByteBuf payload = interceptPublishMessage.getPayload();
        LOG.debug("Receive publish message. clientId: {}, username: {}, qos: {}, topic: {}, payload: {}", new Object[]{clientID, interceptPublishMessage.getUsername(), interceptPublishMessage.getQos(), interceptPublishMessage.getTopicName(), payload});
        List<Message> format = this.payloadFormat.format(payload);
        if (format == null) {
            return;
        }
        for (Message message : format) {
            if (message != null) {
                boolean z = false;
                try {
                    InsertRowPlan insertRowPlan = new InsertRowPlan(new PartialPath(message.getDevice()), message.getTimestamp().longValue(), (String[]) message.getMeasurements().toArray(new String[0]), (String[]) message.getValues().toArray(new String[0]));
                    TSStatus checkAuthority = this.SESSION_MANAGER.checkAuthority(insertRowPlan, this.sessionId);
                    if (checkAuthority != null) {
                        LOG.warn(checkAuthority.message);
                    } else {
                        z = IoTDB.serviceProvider.executeNonQuery(insertRowPlan);
                    }
                } catch (Exception e) {
                    LOG.warn("meet error when inserting device {}, measurements {}, at time {}, because ", new Object[]{message.getDevice(), message.getMeasurements(), message.getTimestamp(), e});
                }
                LOG.debug("event process result: {}", Boolean.valueOf(z));
            }
        }
    }
}
