B4J Question MQTT Publish Acknowledge

aminoacid

Active Member
Licensed User
Longtime User
Is there any way to determine if "mqtt.publish" has sent a publish ACK message back?

The reason why I ask is because I need to buffer some data packets being published if the MQTT connection breaks. Then when the connection comes back up, I want to publish all the buffered data packets at once (if possible). So if I write a simple "for" loop with a "mqtt.publish" in it, without checking for a "publish ACK" each time, it seems that it would overwhelm the server. I would assume that the proper way to do it would be to publish and then check for an ACK before publishing again - especially if you were doing this in a loop and had hundreds of saved packets being sent out at once.

I would appreciate any advise on this.
Thanks.
 

aminoacid

Active Member
Licensed User
Longtime User
I am using Mosquitto and planned to use the default QOS which I believe is 1.
What I am concerned about is if I send a publish before Mosquitto has had a chance to process and ACK the previous publish that was sent, it may disconnect the client.
Thanks!
 
Upvote 0

aminoacid

Active Member
Licensed User
Longtime User
OK... so I wrote some test code to see how it would react if we send publish messages continuously to the Mosquitto Broker. Basically all I am doing is sending a 58 byte packet continuously to the Broker without waiting for any PUB-ACK. In the packet, I place some dummy data as well as a sequence number that increments with every packet sent. The packet is also check-summed. That way I can check to see if there was any packet loss or corruption. I use a separate MQTT client to subscribe to the same topic so that I can see the packets that were published.

Please excuse some bad programming practices, I did this in a hurry :(


B4X:
Sub AppStart (Args() As String)
    Private i,j As Int
    clientID = "BUFTEST" & Rnd(0, 999999999) & DateTime.Now
   
    working=True
    ConnectAndReconnect                        'Connect to MQTT Server

    Do While mqtt.Connected=False
    Loop
   
    For i= 0 To 57
        wsnpkt(i)=72                            ' CREATE a 58 byte packet - fill with 72 (dummy data)
    Next
   
    ' Fill out the fields that remain the same
    wsnpkt(0)=0xFF                                           ' Start of Frame
    wsnpkt(1)=0xFF
    wsnpkt(56)=0xAA                                      ' End of Frame
    wsnpkt(57)=0xAA

    seqno=0
    For i = 1 To 3000
        wsnpkt(2)=make8(seqno,1)                                 ' insert Seq# - High Byte
        wsnpkt(3)=make8(seqno,0)

        csum=checksum(wsnpkt)                                ' Calculate a checksum
        wsnpkt(54)=make8(csum,1)                             ' insert checksum - High Byte
        wsnpkt(55)=make8(csum,0)
        If mqtt.Connected Then
            Try
                mqtt.Publish(ctopic,wsnpkt)
                seqno=seqno+1
            Catch
                Log ("Exception..")              
            End Try          
            For j = 1 To 10000                    ' 10000 seems to be the right delay
                'delay  
            Next
        End If  
       
    Next

    Log("DONE    ...")
    StartMessageLoop
   
End Sub

Sub ConnectAndReconnect
    Do While working
        If mqtt.IsInitialized Then mqtt.Close
        mqtt.Initialize("mqtt", server, clientID)
        Private mo As MqttConnectOptions
        mo.Initialize(user, password)
        Log("Connecting to MQTT Server: " & server)
        mqtt.Connect2(mo)
        Wait For mqtt_Connected (Success As Boolean)
        If Success Then
            Log("Mqtt is connected")
            Do While working And mqtt.Connected
                Sleep(10000)
            Loop
            Log("Mqtt is Disconnected")
        Else
            Log("Mqtt Error connecting.")
        End If
        Sleep(10000)
    Loop
End Sub

Here are my results:

1. The broker will accept and preserve the published packets being sent out so quickly but only if there is a delay between each packet. If there is no delay or too little of a delay you get a considerable amount of packet loss and the following exceptions (which are trapped) as the code executes:

Error occurred on line: 58
Too many publishes in progress (32202)


2. For some reason you cannot send more than (approximately) 2500 packets continuously. The program crashes with the message ( java.lang.RuntimeException: java.net.SocketException: Socket closed) when about 2489 packets have been published. I cannot figure out how to trap this exception. I looked at the Mosquitto Broker console when this happened and could see that the socket connection definitely got disconnected. But who initiated the disconnect (client or broker), I can't figure out.

So in summary, if there no way to check for a PUB-ACK then you can send the packets continuously with a delay. I wish there was some more precise way to set the delay (sleep() does not work) since the delay time will vary from one computer to another - I guess using a timer would be better. You have to limit the packets sent to under 2500. This may vary from one broker to another and/or the size of the packet.
 
Last edited:
Upvote 0

aminoacid

Active Member
Licensed User
Longtime User
- Update to the above Post -

If you use a timer instead of a delay loop, you get MUCH better results - More than 2500 packets can be sent. I have tried a timer tick count of 10msec per publish which works out fine. Here's the code:

B4X:
Sub AppStart (Args() As String)
.
.
    seqno=0
    pcount=0
    
    Timer1.Initialize("Timer1",10)
    Timer1.Enabled=True
.
.
End Sub

Sub Timer1_tick

       wsnpkt(2)=make8(seqno,1)                                 ' insert Seq# - High Byte
        wsnpkt(3)=make8(seqno,0)

        csum=checksum(wsnpkt)                                ' Calculate a checksum
        wsnpkt(54)=make8(csum,1)                             ' insert checksum - High Byte
        wsnpkt(55)=make8(csum,0)

    If mqtt.Connected Then
        Try
            mqtt.Publish(ctopic,wsnpkt)
            seqno=seqno+1
        Catch
            Log ("Exception..")
        End Try
        'Log("Sending ...")
    End If
    pcount=pcount+1
    If pcount > 3000 Then
        Timer1.Enabled=False
        Log("SENT ALL PACKETS    ...")
    End If
    
End Sub
 
Upvote 0

barx

Well-Known Member
Licensed User
Longtime User
How often do you 'need' to send the data. I'm no MQTT expert but if you are looking for a constant stream I'm not sure MQTT is the right choice for you.
 
Upvote 0

aminoacid

Active Member
Licensed User
Longtime User
How often do you 'need' to send the data. I'm no MQTT expert but if you are looking for a constant stream I'm not sure MQTT is the right choice for you.
I agree, MQTT is not a good choice for sending a stream of data. However in this case I am sending small 58 byte packets every 5 to 15 minutes. But the network connection can go down for hours and sometimes days at a remote location. When it comes back up, all the accumulated packets need to be sent out in the same protocol and format. Hence this need. Typically we would look for a Publish ACK message that the Broker returns each time a PUB packet is received by the Broker. However as Erel confirmed, there are no events in B4X to detect a PUB-ACK.

But after doing some more testing with my code above (timer version), I am quite impressed by how well the B4X MQTT library can handle this situation without any PUB-ACK feedback. It works very well.
 
Upvote 0

barx

Well-Known Member
Licensed User
Longtime User
I agree, MQTT is not a good choice for sending a stream of data. However in this case I am sending small 58 byte packets every 5 to 15 minutes. But the network connection can go down for hours and sometimes days at a remote location. When it comes back up, all the accumulated packets need to be sent out in the same protocol and format. Hence this need. Typically we would look for a Publish ACK message that the Broker returns each time a PUB packet is received by the Broker. However as Erel confirmed, there are no events in B4X to detect a PUB-ACK.

But after doing some more testing with my code above (timer version), I am quite impressed by how well the B4X MQTT library can handle this situation without any PUB-ACK feedback. It works very well.
Ah ok, I see.
I agree it would be usefull for that to happen. Kind of an important part of the QoS system.

Maybe you can simply use the _Disconnected methd of the MQTT client object to check if still connected and begin buffering if not?
Also, if you are using b4j, there appears to be a .Connected property. I don't know exacly how these options work as I don't have the source code but it may also be able to be used to start buffering.

I gues you will have to test. start publishing to a good broker, then disconnect the broker in a dirty manner and see what happens and how reliable it is.
 
Upvote 0

DonManfred

Expert
Licensed User
Longtime User
Upvote 0

aminoacid

Active Member
Licensed User
Longtime User
Ah ok, I see.
I agree it would be usefull for that to happen. Kind of an important part of the QoS system.

Maybe you can simply use the _Disconnected methd of the MQTT client object to check if still connected and begin buffering if not?
Also, if you are using b4j, there appears to be a .Connected property. I don't know exacly how these options work as I don't have the source code but it may also be able to be used to start buffering.

I gues you will have to test. start publishing to a good broker, then disconnect the broker in a dirty manner and see what happens and how reliable it is.

Yeah... agreed. Thanks for your suggestions.
Buffering the data is not a problem. That's been implemented and working fine. Getting it to send the buffered data all at once without a PUB-ACK is the issue. I'm testing using a local Mosquitto Broker and simulating disconnects. So far the code I have above with the timer seems to be doing the job. So I'm happy :)
 
Last edited:
Upvote 0
Top