/*
 * Decompiled with CFR 0.152.
 */
package com.github.netty.protocol;

import com.github.netty.core.AbstractNettyServer;
import com.github.netty.core.AbstractProtocol;
import com.github.netty.core.util.LoggerFactoryX;
import com.github.netty.core.util.LoggerX;
import com.github.netty.protocol.mqtt.MemoryQueueRepository;
import com.github.netty.protocol.mqtt.MemoryRetainedRepository;
import com.github.netty.protocol.mqtt.MemorySubscriptionsRepository;
import com.github.netty.protocol.mqtt.MqttIdleTimeoutChannelHandler;
import com.github.netty.protocol.mqtt.MqttLoggerChannelHandler;
import com.github.netty.protocol.mqtt.MqttPostOffice;
import com.github.netty.protocol.mqtt.MqttServerChannelHandler;
import com.github.netty.protocol.mqtt.MqttSessionRegistry;
import com.github.netty.protocol.mqtt.config.BrokerConfiguration;
import com.github.netty.protocol.mqtt.config.FileResourceLoader;
import com.github.netty.protocol.mqtt.interception.BrokerInterceptor;
import com.github.netty.protocol.mqtt.interception.InterceptHandler;
import com.github.netty.protocol.mqtt.security.ACLFileParser;
import com.github.netty.protocol.mqtt.security.AcceptAllAuthenticator;
import com.github.netty.protocol.mqtt.security.DenyAllAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.security.IAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.security.PermitAllAuthorizatorPolicy;
import com.github.netty.protocol.mqtt.subscriptions.CTrieSubscriptionDirectory;
import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPipeline;
import io.netty.handler.codec.mqtt.MqttDecoder;
import io.netty.handler.codec.mqtt.MqttEncoder;
import io.netty.handler.codec.mqtt.MqttPublishMessage;
import io.netty.handler.timeout.IdleStateHandler;
import java.text.ParseException;

public class MqttProtocol
extends AbstractProtocol {
    private LoggerX logger = LoggerFactoryX.getLogger(MqttProtocol.class);
    private int messageMaxLength;
    private int nettyReaderIdleTimeSeconds;
    private boolean enableMetrics = false;
    private String metricsLibratoEmail;
    private String metricsLibratoToken;
    private String metricsLibratoSource;
    private MqttIdleTimeoutChannelHandler timeoutHandler = new MqttIdleTimeoutChannelHandler();
    private MqttLoggerChannelHandler mqttMessageLoggerChannelHandler = new MqttLoggerChannelHandler();
    private BrokerInterceptor interceptor = new BrokerInterceptor(1);
    private MqttServerChannelHandler mqttServerChannelHandler;
    private MqttPostOffice mqttPostOffice;

    public MqttProtocol() {
        this(8092, 10, 0);
    }

    public MqttProtocol(int messageMaxLength, int nettyReaderIdleTimeSeconds, int autoFlushIdleTime) {
        this.messageMaxLength = messageMaxLength;
        this.nettyReaderIdleTimeSeconds = nettyReaderIdleTimeSeconds;
        this.setAutoFlushIdleMs(autoFlushIdleTime);
    }

    @Override
    public String getProtocolName() {
        return "mqtt";
    }

    @Override
    public boolean canSupport(ByteBuf msg) {
        if (msg.readableBytes() < 9) {
            return false;
        }
        return msg.getByte(4) == 77 && msg.getByte(5) == 81 && msg.getByte(6) == 84 && msg.getByte(7) == 84;
    }

    @Override
    public void addPipeline(Channel channel) throws Exception {
        super.addPipeline(channel);
        ChannelPipeline pipeline = channel.pipeline();
        pipeline.addFirst("idleStateHandler", (ChannelHandler)new IdleStateHandler(this.nettyReaderIdleTimeSeconds, 0, 0));
        pipeline.addAfter("idleStateHandler", "idleEventHandler", (ChannelHandler)this.timeoutHandler);
        pipeline.addLast("decoder", (ChannelHandler)new MqttDecoder(this.messageMaxLength));
        pipeline.addLast("encoder", (ChannelHandler)MqttEncoder.INSTANCE);
        pipeline.addLast("messageLogger", (ChannelHandler)this.mqttMessageLoggerChannelHandler);
        pipeline.addLast("handler", (ChannelHandler)this.mqttServerChannelHandler);
    }

    @Override
    public int getOrder() {
        return 300;
    }

    @Override
    public <T extends AbstractNettyServer> void onServerStart(T server) throws Exception {
        IAuthorizatorPolicy authorizatorPolicy = this.initializeAuthorizatorPolicy();
        CTrieSubscriptionDirectory subscriptions = new CTrieSubscriptionDirectory(new MemorySubscriptionsRepository());
        MqttSessionRegistry sessions = new MqttSessionRegistry(subscriptions, new MemoryQueueRepository());
        this.mqttPostOffice = new MqttPostOffice(subscriptions, authorizatorPolicy, new MemoryRetainedRepository(), sessions, this.interceptor);
        this.mqttServerChannelHandler = new MqttServerChannelHandler(this.interceptor, new BrokerConfiguration(), new AcceptAllAuthenticator(), sessions, this.mqttPostOffice);
    }

    @Override
    public <T extends AbstractNettyServer> void onServerStop(T server) throws Exception {
        if (this.interceptor != null) {
            this.interceptor.stop();
        }
    }

    protected IAuthorizatorPolicy initializeAuthorizatorPolicy() {
        IAuthorizatorPolicy authorizatorPolicy;
        String aclFilePath = null;
        if (null == null || aclFilePath.isEmpty()) {
            authorizatorPolicy = new PermitAllAuthorizatorPolicy();
        } else {
            authorizatorPolicy = new DenyAllAuthorizatorPolicy();
            try {
                FileResourceLoader resourceLoader = new FileResourceLoader();
                authorizatorPolicy = ACLFileParser.parse(resourceLoader.loadResource(aclFilePath));
            }
            catch (ParseException pex) {
                this.logger.error("Unable to parse ACL file. path=" + aclFilePath, pex);
            }
        }
        return authorizatorPolicy;
    }

    public void internalPublish(MqttPublishMessage msg, String clientId) {
        int messageID = msg.variableHeader().packetId();
        this.logger.trace("Internal publishing message CId: {}, messageId: {}", (Object)clientId, (Object)messageID);
        this.mqttPostOffice.internalPublish(msg);
    }

    public void addInterceptHandler(InterceptHandler interceptHandler) {
        this.logger.info("Adding MQTT message interceptor. InterceptorId={}", (Object)interceptHandler.getID());
        this.interceptor.addInterceptHandler(interceptHandler);
    }

    public void removeInterceptHandler(InterceptHandler interceptHandler) {
        this.logger.info("Removing MQTT message interceptor. InterceptorId={}", (Object)interceptHandler.getID());
        this.interceptor.removeInterceptHandler(interceptHandler);
    }

    public boolean isEnableMetrics() {
        return this.enableMetrics;
    }

    public void setEnableMetrics(boolean enableMetrics) {
        this.enableMetrics = enableMetrics;
    }

    public String getMetricsLibratoEmail() {
        return this.metricsLibratoEmail;
    }

    public void setMetricsLibratoEmail(String metricsLibratoEmail) {
        this.metricsLibratoEmail = metricsLibratoEmail;
    }

    public String getMetricsLibratoToken() {
        return this.metricsLibratoToken;
    }

    public void setMetricsLibratoToken(String metricsLibratoToken) {
        this.metricsLibratoToken = metricsLibratoToken;
    }

    public String getMetricsLibratoSource() {
        return this.metricsLibratoSource;
    }

    public void setMetricsLibratoSource(String metricsLibratoSource) {
        this.metricsLibratoSource = metricsLibratoSource;
    }
}

