Android Tutorial [B4X] AsyncStreams Tutorial

Status
Not open for further replies.
New video tutorial:


AsyncStreams allows you to read data from an InputStream and write data to an OutputStream without blocking your program. The reading and writing are done with two separate threads.

When new data is available the NewData event is raised with the data.
When you write data to the OutputStream the data is added to an internal queue and then sent in the background.

AsyncStreams is very useful when working with slow streams like network streams or Bluetooth streams.

If you work with such streams using the main thread your program may hang or block while waiting for a value.
The way we tried to deal with it in the Serial tutorial and Network tutorial is with a Timer that checks whether there are bytes waiting in the buffer. However even if there are bytes available there is a chance that not enough are available and then our program will hang or block until those are available.
Using AsyncStreams is usually simpler and safer.

AsyncStreams can work in two modes. Regular mode and "prefix mode". Both work as described above.

The following code demonstrates a simple program that sends text to a connected device or computer:
B4X:
Sub Process_Globals
    Dim AStreams As AsyncStreams
    Dim Server As ServerSocket
    Dim Socket1 As Socket
End Sub
Sub Globals
    Dim EditText1 As EditText
End Sub

Sub Activity_Create(FirstTime As Boolean)
    If FirstTime Then
        Server.Initialize(5500, "Server")
        Server.Listen
        Log("MyIp = " & Server.GetMyIP)
    End If
    EditText1.Initialize("EditText1")
    EditText1.ForceDoneButton = True
    Activity.AddView(EditText1, 10dip, 10dip, 300dip, 60dip)
End Sub

Sub Server_NewConnection (Successful As Boolean, NewSocket As Socket)
    If Successful Then
        ToastMessageShow("Connected", False)
        Socket1 = NewSocket
         'Can only use prefix mode if both sides of the connection implement the prefix protocol!!!
        AStreams.InitializePrefix(Socket1.InputStream, False, Socket1.OutputStream, "AStreams")
    Else
        ToastMessageShow(LastException.Message, True)
    End If
    Server.Listen
End Sub

Sub AStreams_NewData (Buffer() As Byte)
    Dim msg As String
    msg = BytesToString(Buffer, 0, Buffer.Length, "UTF8")
    ToastMessageShow(msg, False)
    Log(msg)
End Sub

Sub AStreams_Error
    ToastMessageShow(LastException.Message, True)
    Log("AStreams_Error")
End Sub

Sub AStreams_Terminated
Log("AStreams_Terminated")
End Sub

'press on the Done button to send text
Sub EditText1_EnterPressed
    If AStreams.IsInitialized = False Then Return
    If EditText1.Text.Length > 0 Then
        Dim buffer() As Byte
        buffer = EditText1.Text.GetBytes("UTF8")
        AStreams.Write(buffer)
        EditText1.SelectAll
        Log("Sending: " & EditText1.Text)
    End If
End Sub

Sub Activity_Pause(UserClosed As Boolean)
    If UserClosed Then
        Log("closing")
        AStreams.Close
        Socket1.Close
    End If
End Sub
Once there is a connection we initialize the AsyncStreams object:
B4X:
AStreams.InitializePrefix(Socket1.InputStream, False, Socket1.OutputStream, "AStreams")
Then when there is new data available we convert the bytes to string and show it:
B4X:
Sub AStreams_NewData (Buffer() As Byte)
    Dim msg As String
    msg = BytesToString(Buffer, 0, Buffer.Length, "UTF8")
    ToastMessageShow(msg, False)
End Sub
When the user enters text in the EditText, the text is being sent:
B4X:
Sub EditText1_EnterPressed
    If AStreams.IsInitialized = False Then Return
    If EditText1.Text.Length > 0 Then
        Dim buffer() As Byte
        buffer = EditText1.Text.GetBytes("UTF8")
        AStreams.Write(buffer)
        EditText1.SelectAll
        Log("Sending: " & EditText1.Text)
    End If
End Sub
Prefix mode

When the object is initialized in prefix mode the data is expected to adhere to the following protocol: every message (bytes array) should be prefixed with the bytes array length (as an Int). So if another device sends us a message made of 100 bytes. The stream is expected to include 4 bytes with a value of 100 and then the 100 bytes.
The NewData event will be raised with the 100 bytes. It will not include the 4 prefix bytes.
When you send data with Write or Write2 the bytes length will be added automatically as the prefix of this message.
If you can work in prefix mode, which is usually only possible if you are implementing both sides, it is highly recommended to do so. Under the cover AsyncStreams uses the message length prefix to make sure that the NewData event is always raised with complete messages. If for example it expects 100 bytes and only 60 bytes arrived, it will wait till the other 40 bytes arrive. In regular mode the event will be raised twice and you will need to handle the two parts of the message.
AsyncStreams also handles the case where 100 bytes are expected and more than 100 bytes arrived (which means that there are several messages).

Note that the WriteStream method that is only available in prefix mode uses an internal protocol which includes error detection.

Errors

The Error event will be raised if there is any error. You can use LastException to find the reason for the error. In most cases you will want to close the connection when an error occurs.

The Terminated event will be raised when the connection is unexpectedly terminated.

Related links:
AsyncStreamsText class offers an alternative to prefix mode when working with streams of text.
New example that uses the Starter service and B4XSerializator: https://www.b4x.com/android/forum/threads/network-asyncstreams-b4xserializator.72149/

Which mode / framework to choose?

There are three modes or frameworks available: AsyncStreams, AsyncStreams in prefix mode and AsyncStreamsText class.

Prefix mode can only be used if both sides of the connection follow the "prefix" protocol. In most cases you can only use this mode when you implement both sides of the connection.


If you are communicating with a non B4X app (and cannot implement the prefix protocol) then you have two options:
- If the data sent and received is text based and each message ends with an end of line character(s) then you should use AsyncStreamsText. For example if you are connecting to an external GPS. Note that it is also possible to modify the class and support different delimiters.
The advantage of using AsyncStreamsText is that it will build the messages correctly. You will not receive partial messages (or multiple messages together).
- In other cases you should use AsyncStreams in regular mode. This means that there is no guarantee that each sent message will be received as a single message. In fact it is more or less guaranteed not to happen. So your code is responsible for correctly collecting and receiving the message.
You can use BytesBuilder class to collect and parse the messages.

Notes

When a new message is available the background thread that is responsible for reading data sends a message to the main thread message queue. This message causes the NewData event to be raised. If you are receiving many messages repeatedly and you show a modal dialog (like Msgbox) then it is possible that the order of events will be changed.
 
Last edited:

dcafuta

New Member
Licensed User
Longtime User
Problem with astream and arduino project

Connected android over blietooth serial to arduino. Arduiono receives the data and answers (9600bps).
Created global array data(50) as byte and length as int

Astream_newdata contains code (sorry for syntax errors)
For i=0 to buffer.length-1
Data[length+i]=buffer
next
Length=length+buffer.length
End sub

Arduino is sending FF000281829C (checked with .net code on pc using the same bluetooth to serial adapter)
Android is receiving most of the time wrong results. Most of the times rearenged data as 0002818FF29C. Error is every time different.

I suspect that thread with newer data is executed before the older data.

Is there any simpler and faster way to fill my global buffer (array data) to check it on timer tick?

Is there any way to postpone receiving of newdata event (i can predict how much byte need to receive from arduino)?

Arduino is using format FF as message start and 0002 dictate length of 2 byte plus one last checksm byte. Can i rearange serial lib and make prefix compatible with this protocol?

Any help apprichiated

:sign0085:
 

flyingbag

Member
Licensed User
Longtime User
Sample working code posted

Hi,
As a beginner I know that it is difficult to wrap your head around Sockets, Server Sockets and Asyncstreams...

Here is sample code that can help you to debug and understand better - it does TCP/IP Socket communication with a server using B4A android app

Please find attached:
- SocketServer.zip (this has java code for a socket that runs on a server, listens to the text that client sends, and echoes it back)

-TestClientSocket.zip (this has B4a code - you can install on your device and debug to test Asyncstreams as well as Socket connections etc)

How to run?:
-unzip the SocketServer.zip. Copy the two files inside this zip into a directory. You need Java installed on this machine. If you do not have Java installed - you can download Java from here
- On commandline - go to the folder which has the above two files
- In that folder: Run on commandline : java SocketServer
- the server will now start and by default listen on port 9092
- if you want to choose a different port - run commandline : java SocketServer <yourchosenportnumber>
- eg: java SocketServer 8081
- IMPORTANT: Make sure that firewall on the machine where you install this server allows connections on the port you have chosen!

-Now unzip the TestClientSocket.zip
- Open the b4a code - make sure that you set the "ip" and "port" fields in sub globals to match the ip of the machine running the above Socketserver and the port that it is listening on
- Install this on a real device (not emulator)
- Connect to server - type the text in lower text box - you should see the echo from the server
- Debug and understand Connect, AsyncStreams Read and Write etc ...

I spent a lot of time trying to understand this - eventually it was very simple. Hope this helps the beginners to get started quickly - just like others out here helped me :)

Enjoy
flyingbag
 

Attachments

  • SocketServer.zip
    2.9 KB · Views: 1,980
  • TestClientSocket.zip
    7.2 KB · Views: 2,356
Last edited:

Sbleck

Member
Licensed User
Longtime User
Is there any other way to check this ?

Hi,
As a beginner I know that it is difficult to wrap your head around Sockets, Server Sockets and Asyncstreams...

Here is sample code that can help you to debug and understand better - it does TCP/IP Socket communication with a server using B4A android app

Please find attached:
- SocketServer.zip (this has java code for a socket that runs on a server, listens to the text that client sends, and echoes it back)

How to run?:
-unzip the SocketServer.zip. Copy the two files inside this zip into a directory. You need Java installed on this machine. If you do not have Java installed - you can download Java from here
- On commandline - go to the folder which has the above two files
- In that folder: Run on commandline : java SocketServer
- the server will now start and by default listen on port 9092
...
Enjoy
flyingbag

Hello,

Wanted to check this example, but encountered some problems that I mentioned below:

Microsoft Windows [Version 5.2.3790]
(C) Copyright 1985-2003 Microsoft Corp.

C:\Documents and Settings\Someone>cd\Scripts\Basic4Android

C:\Scripts\Basic4Android>dir *.class
Volume in drive C is XYZ
Volume Serial Number is A8B4-4854

Directory of C:\Scripts\Basic4Android

31/08/2012 22:29 2.594 ProcessClient.class
31/08/2012 22:37 2.137 SocketServer.class
2 File(s) 4.731 bytes
0 Dir(s) 64.361.914.368 bytes free

C:\Scripts\Basic4Android>java SocketServer
Exception in thread "main" java.lang.UnsupportedClassVersionError: SocketServer : Unsupported major.minor version 51.0
at java.lang.ClassLoader.defineClass1(Native Method)
at java.lang.ClassLoader.defineClassCond(Unknown Source)
at java.lang.ClassLoader.defineClass(Unknown Source)
at java.security.SecureClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.defineClass(Unknown Source)
at java.net.URLClassLoader.access$000(Unknown Source)
at java.net.URLClassLoader$1.run(Unknown Source)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
Could not find the main class: SocketServer. Program will exit.

C:\Scripts\Basic4Android>java -version
java version "1.6.0_35"
Java(TM) SE Runtime Environment (build 1.6.0_35-b10)
Java HotSpot(TM) Client VM (build 20.10-b01, mixed mode, sharing)

Wanted to know what could be done to see the mentioned results, as expected. Was not mentioned what release of Java is needed, also...

Regards,
Sven
 

IanMc

Well-Known Member
Licensed User
Longtime User
You can queue the messages and process them with a timer every x milliseconds.

Would you have any examples of how to do this?

I am sending a string of data from my Arduino but it always splits it up so that if for example I send the word hello I get

h
ello

I don't want to have to use prefix mode, is there anything I can easily do in the AStream_NewData Subroutine?
 

mc73

Well-Known Member
Licensed User
Longtime User
In the newData sub, you can add your new data to a string, name it 'myBuffer' declared in globals, and at the same time, we can declare a variable, name it 'ticksOfInactivity'.
B4X:
myBuffer=myBuffer & BytesToString(Buffer, 0, Buffer.Length, "UTF8")
At the same time, you enable a time (after the addition) and set the ticksOfInactivity to zero.
B4X:
ticksOfInactivity=0
timerStreams.enabled=true
At the timerStreams_tick, you increase the 'ticksOfInactivity', and if they reach a certain value, you process the 'myBuffer' string,empty it, disable the timer and set the ticks to zero (optionally).
B4X:
sub timerStreams_tick
   ticksOfInactivity=ticksOfInactivity+1
   if ticksOfInactivity=ourSetMaxTicks then
     'process myBuffer
     myBuffer=""      
     timerStreams.enabled=false
   end if
end sub
You have to set a reasonable interval for the timer, in order for it to wait a bit until it's decided that we received a 'full' string.
Surely there are other ways to do this as well and probably more efficiently, cause I am not sure what will happen, if there is a 'sleep' interval, and then data are received at the very same moment that the timer is processing the buffer ;)
 

IanMc

Well-Known Member
Licensed User
Longtime User
Thanks mc73 I'll study that code and see if I can use it.

Well it works!

But instead of using another 'ticksOfInactivity' variable I just used:

timerStreams.Initialize("timerStreams", 100)

initialising the timer to 100 milliseconds.

Thanks again!

perhaps in the future Erel might make an AsyncStreams2 which has this timer function built in but this works fine.
 
Last edited:

dddvvv

Member
Licensed User
Longtime User
asyncstreamprefix troubles

Hi everyone

I have been trying asyncstream on my galaxy tab. Works with the bluetooth chat example, although im getting mixed data, like;

Bl
Uetooth

So im trying the prefix mode, and not having any luck. The app force closes immediately i connect.

Im just confused about the requirements of the four prefix bytes.
How are they suppossed to be formatted?
Is it : 0 0 0 3 "234"
OR : 3 3 3 3 "234"

Ive been struggling with this for weeks, on and off. Data is coming from a pic microcontroller, and can be formatted to support the prefix mode.

Thanks
Ddd:eek:
 

derez

Expert
Licensed User
Longtime User
I wrote a basic4ppc program that communicates with asyncstream.
I found that the four bytes are converted to int32 with this:

B4X:
ArrayCopy(Q(),0,4,tbr(),0)
msgsize = bc.Int32FromBytes(tbr(),0)
where Q() is the big buffer that recieve everything, and tbr() is a small buffer to hold the four bytes.
byte 0 is the least byte in the four.
As can be understood - the int32 value is the size of the message.

For sending data I use this code (b4ppc):
B4X:
Sub send_msg(st As String)
Dim L As Integer
tbuf() = stream.StringToBytes(st)
L = ArrayLen(tbuf())
Dim buffer(4+L) As byte
tbs() = bc.Int32ToBytes(L)
ArrayCopy(tbs(),0,4,buffer(),0)
ArrayCopy(tbuf(),0,L,buffer(),4)
stream.WriteBytes(buffer())
End Sub
 
Last edited:

dddvvv

Member
Licensed User
Longtime User
Thanks for the quick reply.

My problem is that im trying to receive bytes /string from a microcontroller, not send bytes. (At least, not yet). Still having same issues. App force closes, since im not sending prefix bytes the right way.

How should I send them from the microcontroller? Hex, dec, ascii?
Thanks
 

derez

Expert
Licensed User
Longtime User
My answer give both ways - send and recieve. The Prefix is 4 bytes which form an int32 (each byte is 8 bits, the least significant byte is the first in the four).
 

rhouri

New Member
Licensed User
Longtime User
AsyncStream always not initialized

I'm having a problem with AsyncStreams. IsInitialized always returns false.
I tried the flyingbag example above and it does the same thing.
Any pointers to what the problem might be?
Thanks.
 

rhouri

New Member
Licensed User
Longtime User
I am running the exact program that was posted by flyingbag earlier in the posts.
I also have this code in a class which also fails. This one connects to a different server. The connection in both cases seems to be successful.
Thx



B4X:
'Class module
Sub Class_Globals
   Dim AStreams As AsyncStreams
   Dim TCPSocket As Socket
   Dim TCPAddress As String
   Dim TCPPort As Int
   Dim TCPDelay As Int
   
End Sub

'Initializes the object. You can add parameters to this method if needed.
Public Sub Initialize (ipAddress As String, port As Int, wait As Int)
   TCPAddress =ipAddress
   TCPPort = port
   TCPDelay =wait
    TCPSocket.Initialize("TCPSocket")
    TCPSocket.Connect(TCPAddress , TCPPort, TCPDelay)
   
End Sub

Public Sub TCPSocket_Connected (Successful As Boolean)
    If Successful = False Then
        ToastMessageShow("Connection Failed",False)
    End If
   AStreams.Initialize  (TCPSocket.InputStream,TCPSocket.OutputStream, "AStreams")
End Sub

Sub AStreams_Error
    ToastMessageShow(LastException.Message, True)
End Sub

Sub AStreams_NewData (Buffer() As Byte)
    Dim msg As String
    msg = BytesToString(Buffer, 0, Buffer.Length, "UTF8")
    ToastMessageShow(msg, False)
    Log(msg)
End Sub

Public Sub communicate
   Dim s As String
    s="Hello"
   
    If AStreams.IsInitialized = False Then 
      ToastMessageShow("AStreams not Initialized", False)
         Return
   End If
   
    Dim buffer() As Byte
    buffer = s.GetBytes("UTF8")
    AStreams.Write(buffer)
    Log("Sending: " & s)
    
End Sub


Public Sub Close
   AStreams.Close
   TCPSocket.Close
End Sub
 

rhouri

New Member
Licensed User
Longtime User
After messing around a bit, I re installed the original Network test from flyingbag and it now works correctly. I'll have to review my own code for errors.
Thanks
 

qsrtech

Active Member
Licensed User
Longtime User
AsyncStreams with mulitple clients?

Hi I'm trying to setup a client/server service. I'm little perplexed on the use of asyncstreams. The question is how can you handle multiple clients with one stream/event? I don't see and property to determine which client is sending the data. Please Advise. Thanks!

EDIT: Never mind, I build a small class to handle multiple instances. Will post for others eventually.
 
Last edited:

kiki78

Active Member
Licensed User
Longtime User
Hello Erel,

I use AsyncStream to write data in file.
To optimize memory use, my byte array argument is reused between each call to "Write" or "Write2".
My question is : Is it safe ?
In other word, when "Write" come back, is array copied in temporary queue buffer, and so I can overwrite it ?

Regards
 
Status
Not open for further replies.
Top