B4J Question MQTT: Unknow packet - limitation on messagesize ?

wl

Well-Known Member
Licensed User
Longtime User
I have my own MqttBroker running (v2 in B4J).

B4X:
    broker.Initialize("", 51041) 'first parameter is the event name. It is currently not used.
    broker.DebugLog = True
    broker.Start
    StartMessageLoop 'Non-UI app

When I send a message like this one:
B4X:
Type MQTTFilePartMessage (PartNumber As Long, PartCount As Long, Chunksize As Long, Data() As Byte) 'topic = "sendfile"

All is working well when the data byte array is <= approximately 8 KB.
As I understood the maximum size a MQTT packet could have is around 250 MB ?

I can't seem to set any option in this respect on the broker.

Thanks


B4X:
java.io.IOException: Unknown packet
        at io.moquette.broker.metrics.MQTTMessageLogger.logMQTTMessage(MQTTMessageLogger.java:63)
        at io.moquette.broker.metrics.MQTTMessageLogger.logMQTTMessageRead(MQTTMessageLogger.java:50)
        at io.moquette.broker.metrics.MQTTMessageLogger.channelRead(MQTTMessageLogger.java:45)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.moquette.broker.metrics.MessageMetricsHandler.channelRead(MessageMetricsHandler.java:50)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:297)
        at io.netty.handler.codec.ReplayingDecoder.callDecode(ReplayingDecoder.java:350)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:265)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.moquette.broker.metrics.BytesMetricsHandler.channelRead(BytesMetricsHandler.java:51)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:146)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Thread.java:748)
Channel closed <1627832362704>
 

wl

Well-Known Member
Licensed User
Longtime User
Erel, it seems this helped a bit.
I set the maxsize to 1 MB, but now I receive a disconnect on the publisher side ((EOFException) java.io.EOFException) when the size of a message exceeds 64 KB.

The server throws this exception:

B4X:
C<-B PUBREC <1627853902921> packetID <2>

Error processing protocol message: PUBLISH

java.util.NoSuchElementException
        at java.util.AbstractQueue.remove(Unknown Source)
        at io.moquette.broker.Session.drainQueueToConnection(Session.java:319)
        at io.moquette.broker.Session.sendPublishQos2(Session.java:256)
        at io.moquette.broker.Session.sendPublishOnSessionAtQos(Session.java:217)
        at io.moquette.broker.PostOffice.publish2Subscribers(PostOffice.java:228)
        at io.moquette.broker.PostOffice.receivedPublishQos2(PostOffice.java:253)
        at io.moquette.broker.MQTTConnection.processPublish(MQTTConnection.java:350)
        at io.moquette.broker.MQTTConnection.handleMessage(MQTTConnection.java:75)
        at io.moquette.broker.NewNettyMQTTHandler.channelRead(NewNettyMQTTHandler.java:61)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.moquette.broker.metrics.MQTTMessageLogger.channelRead(MQTTMessageLogger.java:46)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.moquette.broker.metrics.MessageMetricsHandler.channelRead(MessageMetricsHandler.java:50)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:310)
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:284)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.moquette.broker.metrics.BytesMetricsHandler.channelRead(BytesMetricsHandler.java:51)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:286)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:86)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:340)
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1414)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:362)
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:348)
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:945)
        at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:146)
        at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:645)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:580)
        at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:497)
        at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:459)
        at io.netty.util.concurrent.SingleThreadEventExecutor$5.run(SingleThreadEventExecutor.java:886)
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
        at java.lang.Thread.run(Unknown Source)
C->B PUBREC <1627853900086> packetID <1>

Unexpected exception while processing MQTT message. Closing Netty channel. CId=1627853902921
 
Last edited:
Upvote 0

DonManfred

Expert
Licensed User
Longtime User
On Monday (5 days ago) he told to check it NEXT WEEK.
It starts on monday (still in Future).

You need to be much more patient!
 
Upvote 0

Erel

B4X founder
Staff member
Licensed User
Longtime User
I've tested it with two programs and I'm able to send 100 megabytes messages:
Program #1
B4X:
Sub Process_Globals
    Private broker1 As MqttBroker
    Private client As MqttClient
End Sub

Sub AppStart (Args() As String)
    Log("Hello world!!!")
    test
    StartMessageLoop
    
End Sub

Private Sub test
    broker1.Initialize("", 51042)
    SetMaxMessageSize(broker1, 100 * 1024 * 1024) '100M
    broker1.Start
    client.Initialize("client", "tcp://localhost:51042", "1")
    client.Connect
    Wait For Client_Connected (Success As Boolean)
    Log(Success)
    If Success Then
        client.Subscribe("test", 0)
    End If
End Sub

Private Sub Client_MessageArrived (Topic As String, Payload() As Byte)
    Log("Payload length: " & Payload.Length)
End Sub

Sub SetMaxMessageSize(broker As MqttBroker, MaxSize As String)
    broker.As(JavaObject).GetFieldJO("config").RunMethod("setProperty", Array("netty.mqtt.message_size", MaxSize))
End Sub

Program #2
B4X:
Sub Process_Globals
    Private client As MqttClient
End Sub


Sub AppStart (Args() As String)
    Log("Hello world!!!")
    test
    StartMessageLoop
    
End Sub

Private Sub test
    client.Initialize("client", "tcp://localhost:51042", "2")
    client.Connect
    Wait For Client_Connected (Success As Boolean)
    Log(Success)
    If Success Then
        Dim msg(99 * 1024 * 1024) As Byte
        client.Publish("test", msg)
    End If
End Sub
 
Upvote 0
Top