B4J Question MQTT slows down B4J app when sending 2 topics

aaronk

Well-Known Member
Licensed User
Longtime User
Hi,

I have created a B4J non-UI app that will receive a UDP message and then send the message by MQTT.

Everything had been working fine, and then all of a sudden I am noticing a strange thing happening.

As the UDP message arrives, I am then sending it by MQTT. If the message contains a value, then I want to send it to a second MQTT topic.

If I make it so that it only sends the message to a single MQTT topic, then it works fine. Soon as I add the second MQTT topic then I am noticing all MQTT messages are delayed in being sent for a few minutes and evenly stops all together.

I'm not sure if this is due to too many incoming UDP messages or some other bottleneck. I also don't know how to resolve this issue.

I am using Mosquitto as the MQTT broker.
The MQTT broker and B4J is on the same VPS server running on Linux Ubuntu.

Here is how I am doing It:

Main Module:
'Non-UI application (console / server application)
#Region Project Attributes
    #CommandLineArgs:
    #MergeLibraries: True
#End Region

Sub Process_Globals
    Private working As Boolean = True
    Public mqtt As MqttClient    'jMQTT Lib v1.25
    Private username As String = "Mqtt_username_here"
    Public MQTT_password As String = $"My_MQTT_Password_here"$
    Private MQTT_Protocol As String = "tcp" ' using TCP
    Private MQTT_Server As String = "localhost"
    Private MQTT_Port As String = "1883"
    
    Public Server_ID As String = "server1"
    
    Type MQTTMSG (message As String)
    
    Dim serializator As B4XSerializator         ' jRandomAccessFile Lib v2.34
    Dim su As StringUtils                        ' jStringUtils Lib 1.03
    
End Sub

Sub AppStart (Args() As String)
    
    MQTTInitialize
    
    StartMessageLoop
End Sub


Sub MQTTInitialize
    
    working = True

    Do While working
        If mqtt.IsInitialized Then mqtt.Close
        mqtt.Initialize("mqtt", MQTT_Protocol & "://" & MQTT_Server & ":" & MQTT_Port, "cloud_1_" & Rnd(0, 999999999))
        Dim mo As MqttConnectOptions    'jMQTT Lib v1.25
        mo.Initialize(username, MQTT_password)
        Log("MQTT Trying to connect")
        mqtt.Connect2(mo)
        Wait For Mqtt_Connected (Success As Boolean)
        If Success Then
            Log("MQTT Connected")
            incomingUDP.Initialize ' Start listening to UDP messages
            Do While working And mqtt.Connected
                mqtt.Publish2("Cloud_1_Ping", Array As Byte(0), 1, False)
                Sleep(5000)
            Loop
            Log("MQTT Disconnected")
        Else
            Log("MQTT Error connecting.")
        End If
        Sleep(5000)
    Loop
    
End Sub

Sub SendMessageMQTT(Topic As String, msg As String)
    Try
        If mqtt.Connected = False Then
            Log("MQTTClient is not connected.")
            Return
        Else
            mqtt.Publish2(Topic, CreateMessage(msg), 0,False)
        End If
    Catch
        Log("error: SendMessageMQTT")
        Log(LastException.Message)
    End Try
End Sub

public Sub CreateMessage(msg As String) As Byte()
    Dim m As MQTTMSG
    m.Initialize
    m.message = EncryptText(msg)
    Return serializator.ConvertObjectToBytes(m)
End Sub

Public Sub EncryptText(text As String) As String
    Dim c As B4XCipher    'B4XEncryption Lib v1.00
    Return su.EncodeBase64(c.Encrypt(text.GetBytes("utf8"), "MySecurePasswordHere"))
End Sub

incomingUDP:
Sub Process_Globals
    Dim Port As String = "5422"
    Dim EventSockUDP As UDPSocket        ' jNetwork Lib v1.20
End Sub

Sub Initialize

    EventSockUDP.Initialize("UDP",Port,600)

End Sub

Sub UDP_PacketArrived (Packet As UDPPacket)
    Try
        ' incomining messages will look like this: xxxxxxxx//0//xxxxx.....xxxxx

        Dim msg As String     ' This will hold the incoming message
        msg = BytesToString(Packet.Data, Packet.Offset, Packet.Length, "Windows-1252")
    
        Dim msg_string() As String = Regex.Split("//",msg)
        
        Try
            If msg_string.Length < 3 Then
                Log("Invalid Message: " & msg)
                Log("Length is too short.. Ignore Message")
                Log("IP: " & Packet.Host & ", Port: " & Packet.Port)
                Return
            End If
        Catch
            Log(LastException.Message)
        End Try
        
        '     0      1         2
        '  xxxxxxxx//0//xxxxx.....xxxxx
        '            ^
        If msg_string(1) = "1" Then
            ' Ignore any that are set to 1
            Return
        End If
        
        '     0      1         2
        '  xxxxxxxx//0//xxxxx.....xxxxx
        '                      ^
        msg = msg_string(2)
        
        ' If the message is nothing then ignore message
        If msg = Null Then Return
        If msg = "" Then Return
    
        ' remove Carriage Return Line Feed
        msg = msg.Replace(Chr(10),"")
        msg = msg.Replace(Chr(13),"")
        
        ' Send MQTT Message
        ' msg_string(0) = Topic
        ' msg_string(0)//msg = Message to send
        Main.SendMessageMQTT(msg_string(0),msg_string(0) & "//" & msg)
        
        ' If the message (MSG) 3 and 4 value is "AA" Or "BB" then send a second MQTT message to a different MQTT Topic
        If msg.SubString2(2,4) = "AA" Or msg.SubString2(2,4) = "AB" Then
            Main.SendMessageMQTT("FCM",msg_string(0) & "//" & msg)
        End If
        
    Catch
        Log(LastException.Message)
        Log("IP: " & Packet.Host & ", Port: " & Packet.Port)
    End Try

End Sub

The above code used to work, but now it runs really slow and the it will crash my B4J app.

If I remove (or comment out) the following, then the code works fine. The first MQTT message is sent in almost realtime.
B4X:
Main.SendMessageMQTT("FCM",msg_string(0) & "//" & msg)

If I add the above line back in again, the first MQTT message is delayed by many minutes before my B4J crashes.

If I run the code as above (so it sends the 2 MQTT messages) and only one UDP message being received to the B4J app then it works fine and both MQTT topics are sent fine.

Soon as many messages are received then it crashes.

To work out how many MQTT messages are being sent, I added a Global int in the main module and commented out the MQTT being sent.

The first MQTT message is sent 450 messages per second (on average).
The second MQTT message is sent 320 messages per second (on average).

I am trying to learn and work out why it is causing this issue when I try and send the second MQTT message.
 

Erel

B4X founder
Staff member
Licensed User
Longtime User
message is delayed by many minutes before my B4J crashes
Post the logs when it crashes.

The first MQTT message is sent 450 messages per second (on average).
The second MQTT message is sent 320 messages per second (on average).
Is this expected? 700 messages per second is quite rapid.

Are you testing in release mode?
 
Upvote 0

aaronk

Well-Known Member
Licensed User
Longtime User
Are you testing in release mode?
Yes it's running in release mode when running on the Linux VPS.


Post the logs when it crashes.

I can't make it crash when it's running in debug mode on my Windows computer since only one device is sending the UDP data to my computer during testing.

When running on the VPS it creates the dump on the server but I have deleted that file by mistake. I will wait for it to crash again so it create the dump file.

From what I can tell (based on my quick tests I have done today), it's like there are too many MQTT message being sent and it can't keep up with the demand.

I did read online that it should be able to handle 82k messages per second. So I wasn't sure if that was the issue or not.


Is this expected? 700 messages per second is quite rapid.
Yes, it's expected that many.

Strange thing is, it's running OK now. I assume it's because there it is less messages now. Which is normal due to being a quite time of day.

The first MQTT message is sent 250 messages per second (on average).
The second MQTT message is sent 100 messages per second (on average).
 
Upvote 0

aaronk

Well-Known Member
Licensed User
Longtime User
How do I increase the heap size ?

I am running the B4J app as a service like:

[Unit]
Description=CloudServer Service
After=network.target

[Service]
Type=simple
ExecStart=/usr/bin/java -Xmx4G -Xms4G -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=/opt/dump.hprof -jar /opt/CloudServer/CloudServer.jar
StandardOutput=syslog
StandardError=syslog
Restart=on-failure

SuccessExitStatus=143
Restart=on-abort

[Install]
WantedBy=multi-user.target

My VPS did have 16GB memory but I increased it 2 days ago to 32GB. Today I am noticing my B4J is having the issue as per post 1 all of a sudden.

Based on the service, I think it only has 4GB of allocated memory. If I change it to -Xmx8G -Xms8G would that mean it would now have 8GB or do I need to do something else ?

It currently using:
36% of 8 vCPU
14.42GB of 32GB of memory
12.65GB of 60GB storage
 
Upvote 0
Top