[Spread-users] Flow Control

John Schultz jschultz at d-fusion.net
Thu May 16 15:39:48 EDT 2002


I have appended a message to the bottom of this email which describes a 
couple of flow controls algorithm that don't require every member 
ACK'ing every message. In particular take a look at the algorithm called 
"FC Method 4" in the message. This algorithm might give you some ideas 
on how to solve your flow control problem. In general most methods will 
slow the flow to the slowest member.  If a member is "too slow" for your 
standards, your implementation of FC will have to somehow exclude that 
member from the algorithm.

Why are you using Berkeley DB for a message queue? If you are only doing 
flow control and not process recovery / resynchronization there should 
be no need for a persistent store of your messages. So you don't need to 
the overhead of writing to BDB for flow control. For FC you don't need 
to keep the whole messages in memory but just unique identifiers, which 
shouldn't be too memory intensive.

If you do need BDB for other reasons then you might want to turn off 
auto-commit. You can do this by adding the following line to your 
DB_CONFIG file (there are other methods too, check BDB's docs).

set_flags DB_TXN_NOSYNC

This changes the Berkeley DB behavior so that the log files are not 
flushed when transactions are committed. Although this change will 
greatly increase your transaction throughput, it means that transactions 
will exhibit the ACI (atomicity, consistency, and isolation) properties, 
but not D (durability). Database integrity will be maintained, but it is 
possible that some number of the most recently committed transactions 
may be undone during recovery instead of being redone. If your app can 
tolerate that behavior then do it. If you do this you might want to set 
up another thread that periodically (once a minute or even slower) calls
txn_checkpoint() with large time and data values (check the docs).

Anyway, I hope this helps.

John Schultz
Co-Founder, Lead Engineer
D-Fusion, Inc. (http://www.d-fusion.net)
Phn: 443-838-2200 Fax: 707-885-1055

koorosh.alahiari at ids.allianz.com wrote:

> Hi All,
> This is more of a twicking your brain kind of question than an spread one!
> Does anyone have any bright ideas on how to do flow control (and reliable
> delivery) at the application level:
> I have implemented an ACK mechanism that is not good enough(because the
> overhead does degrade my clients' user programs by a factor that I am not
> happy
> with). Here is how I do it:
> I use Berkeley DB for persisting my message queue (which adds some overhead
> even if it is done in a seperate thread). For every message that an
> application sends
> my interface expects and ACK message (which it does NOT deliver to the
> application).
> It only deletes the original message when it receives the ACK. This is
> straight forward
> enough BUT the problematic part arises due to group-wide mode of operation.
> Should I wait for all members to acknowledge, what would I do with a slow
> member
> (which is going to slow down the whole "club"?). This is also true for flow
> control, do
> I do flow control on the fastest member, slowest member, etc.
> If someone could put me at the direction of some publications - or even
> better
> some practical solutions - I would appreciate it.
> I probably have not explained things very well but I am sure that you have
> come
> across this issue many times before and can work out what I am trying to
> say!
> Many thanks for this fine piece of software (spread) & regards,
> Koorosh
> _______________________________________________
> Spread-users mailing list
> Spread-users at lists.spread.org
> http://lists.spread.org/mailman/listinfo/spread-users

 > Begin Included Message:

I don't know how much everyone on this list knows about flow control,
but this email is an attempt to foster discussion over how best to do
flow control in Spread. I have heard complaints about application level
flow control before and the general response is that "you have to do
it." But I have yet to see any learning resources provided by
Spread/CNDS that actually helps an application builder truly handle this
problem. So, this email is an attempt at providing such a resource --
feel free to add suggestions, better algorithms, questions, critiques or
correct any errors I make.

I think, if there is enough interest, someone (possibly I) should
implement a flow control library that works on top of the normal Spread
library. This library should implement the best flow control algorithm
this discussion comes up with. Then users of Spread who need flow
control can simply use this library and group-level, application flow
control will no longer be a problem. Those who don't need it, won't be
bothered by it.

I'm sure that Yair, Jonathan, Claudiu, and most of the people in the
CNDS can propose methods at least as good as what follows...

Recommendations: First, to reduce the chances of having your application
kicked by Spread in general, do the following:

(1) Highly Recommended: Edit spread_params.h, raise the parameter
MAX_SESSION_MESSAGES and recompile Spread. For my applications I have
the parameter set to 10,000 instead of the usual 1,000. This will make
the Spread Daemon have a larger memory footprint, but it generally isn't
that big of a deal. This will make Spread/your application more
resilient to bursts of msgs, but won't solve any real underlying flow
control problem.

(2) Highly Recommended: In your application, create a high priority
thread (or an equivalent mechanism) that does nothing but call
SP_receive and _ONLY_ queues messages (in memory) that it receives for 
other threads to process. Again, this will make your program more 
resilient to burstiness, but will not solve any real underlying flow 
control problems.

(3) Lowly Recommended: If possible, you may want to collapse
application-level "messages" into a smaller number of Spread messages,
which are then unpacked at the receiver's end. Spread 3 does some
packing already, but it only collapses msgs in the time it takes for a
token to make one circuit around your Spread ring, which only takes a
few milli-seconds. Your application may be able to do more compacting
over a longer period of time. Again, this doesn't really solve your
underlying flow control problem, it just makes you more resilient to

After all of that you still may need to address real underlying flow
control problems. I would say that most Spread applications currently
use one of few different forms of flow control:

FC Method 1 (Hope and Pray): I know that my particular application
doesn't generate more messages than my receiver can handle under normal
operating conditions. Furthermore, bursts of messages won't exceed my
applications' buffers (a combo of any internal application buffering and
Spread buffering).

This is the current method I think you are using, and as you can see it
doesn't work when extremely large numbers of messages are being generated.

FC Method 2 (Pipelining Senders): My sending application is a member of
the group to which it is sending. When it needs to send, it keeps track
of how many messages that it has sent out that it has not yet received
back from Spread. I allow each sender to have some floating maximum
number of outstanding messages. When it hits that maximum, it will not
send any more messages to the group until it receives back one of its
own outstanding msgs. At such a time, it allows itself to send the next
message that needs to go out. If the sender (optionally) processes all
msgs like a receiver, all the members are of approximately equal
strength+load and your window of outstanding messages isn't too big this
(along w/ buffering) should usually keep Spread from kicking your
applications. This method doesn't guarantee that a strong sender won't
run merrily along while driving a slower receiver into the ground.
(Note: what is a "good" window size? In small groups I often use
MAX_SESSION_MESSAGES / (2 * current # of members in the group))

This is the method used by the flooder application that comes with Spread.

FC Method 3 (Yelling Fire!): My receiving applications implement
Recommendation 2 from above. They notice when their internal buffers are
growing too fast and/or are getting too large. When they detect this
condition, they send a message to the group warning all of the other
members. This warning should make your senders slow down their sending
or stop. You could have the senders lower their maximum outstanding msgs
parameter if you are using the "Pipelining Senders" method or completely
stop altogether. After the condition has cleared the member who yelled
fire, can indicate an "All Clear," which could raise the max outstanding
messages or allow senders to begin sending again. This method depends on
the fact that the receiver is receiving fast enough and yells "FIRE!"
early enough so that Spread doesn't kick it anyway.

FC Method 4 ("Real" Application Flow Control): At this level we begin
implementing flow control that is similar to what exists between the
Spread daemons.

One moderately simple (not great) method is for each sender to mark each
message it sends with a local counter that it increments for each
message it sends. Each receiver keeps a nxn ACK matrix where rows
represent receivers, columns represent senders and entry x = (row i, col
j) indicates that receiver i has _PROCESSED_ (not just received) up to
msg x from sender j. Each receiver immediately updates its own row after
each message it is done processing. Receivers will periodically, and
possibly piggy-backed on other messages, announce their own ACK vector
to the group. Senders will also use the "Pipelining Senders" max
outstanding messages method.

When an application wants to send a msg, it first looks at its ACK
matrix and determines whether any receiver is in danger of being too far
behind from its point of view. It does this by computing the difference
between each receiver's ACK vector and its own ACK vector (these results
should be intelligently cached between send executions and modified only
upon receiving ACK vectors from other receivers, or after updating its
own ACK vector). If the difference is greater than some set threshold
(which should be some percentage of MAX_SESSION_MESSAGES, and probably
equal to our max outstanding window) then the send is blocked.
Otherwise, if we still have room in our outstanding msg window, the
sender is allowed to send.

The max outstanding window ensures that if all of the applications are
sending at full speed that we shouldn't be overrunning receivers. The
nxn matrix allows us to pessimistically check whether or not we believe
we are actually overrunning receivers.

If you use only AGREED messages, this method can be improved. Rather
than keeping ACK vectors, we only track the most recent message up to
which we have handled (ACK singleton). Since all members will receive
those messages in the same order we can see how far behind they are
based off only this information.

I just thought this last method up while writing this email (although
I'm sure its not original), so there are some details to be filled in,
such as how to handle membership changes and the like. Also, I need to
think more and determine whether or not this _really_ solves the flow
control problem always, but my gut tells me that it does, because in my
understanding it is similar to the flow control in Spread 3 (although
that flow control is token based).

Other General Comments:

All flow control methods will depend on the fact that your application
can be blocked from generating "application-level" messages before
feeding them into your flow control system. If it can't block the
generations of those messages and the average generation speed exceeds
your processing capability, you are hosed in any case in the long run.

Finally, some of the above methods can still fail due to flow control
failures and your application can be kicked from Spread for other myriad
reasons (your tcp/ip link breaks for some reason, or the OS kills Spread
due to memory consumption on the system). So regardless of the
discussion above, you application or Spread can die at any point due to
external influences. Therefore, recovery from crashes is an important,
orthogonal issue to flow control. Either your application can tolerate
missing messages, or it can recover important state from surviving
members or some other external means.

Please comment back with suggestions, questions, interest, etc.

John Schultz
Co-Founder, Lead Engineer
D-Fusion, Inc. (http://www.d-fusion.net)
Phn: 443-838-2200 Fax: 707-885-1055

More information about the Spread-users mailing list