/*
 * Decompiled with CFR 0.152.
 */
package com.github.netty.protocol.mqtt;

import com.github.netty.core.AbstractChannelHandler;
import com.github.netty.core.AutoFlushChannelHandler;
import com.github.netty.protocol.mqtt.MqttConnection;
import com.github.netty.protocol.mqtt.MqttInflightResenderChannelHandler;
import com.github.netty.protocol.mqtt.MqttPostOffice;
import com.github.netty.protocol.mqtt.MqttSessionRegistry;
import com.github.netty.protocol.mqtt.MqttUtil;
import com.github.netty.protocol.mqtt.config.BrokerConfiguration;
import com.github.netty.protocol.mqtt.interception.BrokerInterceptor;
import com.github.netty.protocol.mqtt.security.IAuthenticator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.mqtt.MqttMessage;
import io.netty.util.AttributeKey;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;

@ChannelHandler.Sharable
public class MqttServerChannelHandler
extends AbstractChannelHandler<MqttMessage, Object> {
    private static final AttributeKey<MqttConnection> ATTR_KEY_CONNECTION = AttributeKey.valueOf((String)(MqttConnection.class + "#MQTTConnection"));
    private final BrokerConfiguration brokerConfig;
    private final IAuthenticator authenticator;
    private final MqttSessionRegistry sessionRegistry;
    private final MqttPostOffice postOffice;
    private final BrokerInterceptor interceptor;

    public MqttServerChannelHandler(BrokerInterceptor interceptor, BrokerConfiguration brokerConfig, IAuthenticator authenticator, MqttSessionRegistry sessionRegistry, MqttPostOffice postOffice) {
        super(true);
        this.interceptor = interceptor;
        this.brokerConfig = brokerConfig;
        this.authenticator = authenticator;
        this.sessionRegistry = sessionRegistry;
        this.postOffice = postOffice;
    }

    private MqttConnection mqttConnection(Channel channel) {
        return (MqttConnection)channel.attr(ATTR_KEY_CONNECTION).get();
    }

    private void mqttConnection(Channel channel, MqttConnection connection) {
        channel.attr(ATTR_KEY_CONNECTION).set((Object)connection);
    }

    @Override
    public void onMessageReceived(ChannelHandlerContext ctx, MqttMessage msg) throws Exception {
        if (msg.fixedHeader() == null) {
            throw new IOException("Unknown packet");
        }
        MqttConnection mqttConnection = this.mqttConnection(ctx.channel());
        mqttConnection.setAuthFlushed(AutoFlushChannelHandler.isAutoFlush(ctx.pipeline()));
        try {
            mqttConnection.handleMessage(msg);
        }
        catch (Throwable ex) {
            this.logger.error("Error processing protocol message: " + msg.fixedHeader().messageType(), ex);
            ctx.channel().close().addListener((GenericFutureListener)new ChannelFutureListener(){

                public void operationComplete(ChannelFuture future) {
                    MqttServerChannelHandler.this.logger.debug("Closed client channel due to exception in processing");
                }
            });
        }
    }

    public void channelActive(ChannelHandlerContext ctx) {
        Channel channel = ctx.channel();
        MqttConnection connection = new MqttConnection(this.interceptor, channel, this.brokerConfig, this.authenticator, this.sessionRegistry, this.postOffice);
        connection.setAuthFlushed(AutoFlushChannelHandler.isAutoFlush(ctx.pipeline()));
        this.mqttConnection(channel, connection);
    }

    public void channelInactive(ChannelHandlerContext ctx) {
        MqttConnection mqttConnection = this.mqttConnection(ctx.channel());
        mqttConnection.handleConnectionLost();
    }

    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        this.logger.error("Unexpected exception while processing MQTT message. Closing Netty channel. CId=" + MqttUtil.clientID(ctx.channel()), cause);
        ctx.close().addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
    }

    public void channelWritabilityChanged(ChannelHandlerContext ctx) {
        MqttConnection mqttConnection = this.mqttConnection(ctx.channel());
        mqttConnection.writabilityChanged();
        ctx.fireChannelWritabilityChanged();
    }

    @Override
    public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
        if (evt instanceof MqttInflightResenderChannelHandler.ResendNotAckedPublishes) {
            MqttConnection mqttConnection = this.mqttConnection(ctx.channel());
            mqttConnection.resendNotAckedPublishes();
        }
        ctx.fireUserEventTriggered(evt);
    }
}

