Howto wait for multiple queues (Queue.py)? 
Author Message
 Howto wait for multiple queues (Queue.py)?

Hi,

as I prefer to use queues for interthread communication, I need
something like a 'mainloop' for my threads in the sense that every
thread can have multiple input queues and I want to be able to block
the consumer thread until input within one (or more) queues is
available.  This mechanism should be somewhat comparable to 'select()'
or 'MsgWaitForMultipleObjects()' (on w32).

I've googled a bit but couldn't find a solution.  The only things I
can come up with are the following:

1) Polling:

   Iterating over all relevant queues and issue a nonblocking 'get' on
   each one; possibly yielding after each iteration to be a bit more
   polite.

   Pro:

        - straightforward

   Con:

        - performance penalty (wasting cpu time)

        - imposes implicitely a priority on the queues;  could even
          lead to the point where only the inputs of a single queue
          (say the first one in the loop) are handled while all others
          are ignored (if this single queue is *very* busy).  To avoid
          this I'd need to have some 'scheduling'/prioritization for
          the relevant queues.  This could easily get massive
          (depending on the number of queues).

         - very ugly ;-B

   Conclusion:  This is lousy.

2) Synchronization (take I):

   I associate some sort of a synchronization item with a Queue
   derived class.  Every 'put' increases a counter and every 'get'
   decreases it.  If the counter gets 0 by a 'get' the synchronization
   item gets unsignaled, if the counter is 1 after a 'put' it gets
   signaled.  Several queues can share the same synch item.  The
   consumer blocks on the unsignaled synch item and awakes when it
   gets signaled.  The simplest synch item I can come up with for this
   is a Semaphore.  Every (successful) DerivedQueue.put does a
   Semaphore.release and every (successful) DerivedQueue.get does a
   Semaphore.acquire.  The consumer does a Semaphore.acquire and has
   to issue a Semaphore.release immediately afterwards (to always have
   the semaphore counter reflect the correct number of items within
   all related queues and to avoid a deadlock on the next
   DerivedQueue.get anyway ;-)

   Pro:

        - consumer doesn't waste cpu cycles

   Con:

        - performance hit on 'get' and 'put'

        - consumer still has to test all related queues to find out
          which one signaled the synch item (some 'scheduling'
          algorithm needed as with polling)

        - when the consumer awakens from a synch item wait, there is
          no guarantee that it actually gets something from one of the
          queues due to multithreading (this might be acceptable for
          me)

        - two threads can share no queues at all or only the very same
          set of queues unless I let DerivedQueue accept several synch
          items (which makes another lock necessary within the
          DerivedQueue to maintain the list of synch items)

        - quite ugly

   Conclusion:  anything but optimal

3) Synchronization (take II):

   I implement a QueueContainer which accepts derived Queues
   (similarly as above).  The 'get' and 'put' methods call back the
   container.  The consumer uses a QueueContainer.get to get items out
   of own of its queues.  The DerivedQueue is able to call back
   different containers.  Furthermore DerivedQueue must expose an
   external lock so that 'qsize' can be used reliably when a new
   container is attached.  The problem which queue has issued a signal
   can be solved by maintaning a stack within the container.
   DerivedQueue.put corresponds to Stack.push, DerivedQueue.pop
   corresponds to Stack.pop.  Thus all events are handled in the order
   they were delivered to the input queues.

   Pro:

        - solves most of the problems from above

        - simple interface for consumers

   Con:

        - performance hit

        - what is the semantics of the QueueContainer.get when several
          threads are blocking on it?

        - deadlocks lurking everywhere

        - not all too nice either

   Conclusion:  somewhat full featured but also somewhat bloated :-B

I would very much appreciate any suggestions for a better
(performancewise, more 'pythonic', whatsoever) implementation or have
I even overseen an already exisitng solution to my problem?

TIA,

andreas



Sun, 08 May 2005 19:50:07 GMT  
 Howto wait for multiple queues (Queue.py)?

Quote:

> as I prefer to use queues for interthread communication, I need

Queues are GREAT for interthread communication, BUT:

Quote:
> something like a 'mainloop' for my threads in the sense that every
> thread can have multiple input queues and I want to be able to block

...this doesn't follow -- just make all requests to a thread onto
a single Queue (tag the requests with 'kind' or whatever as needed)
and live happily.  A Queue instance *IS* implicitly, automatically
and intrinsically "synchronized" -- several threads can be reading
and/or writing to the same Queue instance and you're guaranteed that
they'll never interfere with each other.  That's just about the
whole POINT of the Queue class.

Alex



Mon, 09 May 2005 01:16:31 GMT  
 Howto wait for multiple queues (Queue.py)?

    Andreas> ... I want to be able to block the consumer thread until input
    Andreas> within one (or more) queues is available.

How about you maintain a single Queue which holds the other Queues?  When a
Queue goes empty it gets removed from the main Queue.  When something is
put() into it, it's added to the main Queue if it's not already there.  Your
consumers then wait on the main Queue.

Seems simple enough to subclass Queue and tweak its get and put methods.

Perhaps this was one of the alternatives you described but I didn't
understand.

--

http://www.mojam.com/
http://www.musi-cal.com/



Mon, 09 May 2005 01:33:52 GMT  
 Howto wait for multiple queues (Queue.py)?

Quote:

> ...this doesn't follow -- just make all requests to a thread onto
> a single Queue (tag the requests with 'kind' or whatever as needed)
> and live happily.  A Queue instance *IS* implicitly, automatically
> and intrinsically "synchronized" -- several threads can be reading
> and/or writing to the same Queue instance and you're guaranteed that
> they'll never interfere with each other.  That's just about the
> whole POINT of the Queue class.

Sorry, this confuses me a bit.  If you understood my mail such that I
want to synchronize the access of several threads to one queue, my
mail must have been totally ambiguous.

What I really want to achieve is that several threads can consume
items from several (possibly identical) _sets_ of queues.  As I read
the docs and looked at the source in Queue.py I don't doubt that this
queue implementation is threadsave.

What I don't understand is the reasoning, *why* every thread should
only have exactly one input queue.  This has several disadvantages
from my point of view.  Several threads can't share the same input
queue and I can't distinguish between several input channels within
one thread.  For example in my case the threads should have input
channels for commands and for events (such as command responses or
unsolicited events).  Furthermore there can be different functional
units which write commands and/or events to the same thread.  It is
clear to me that this can be achieved by giving the different
commands/events different types/kinds and use one input queue for all
of them.  But I have at least two reservetions against this:

- The single input queue could be a bottleneck.  If you choose to
  avoid the bottleneck by providing several threads which understand
  the same commands, the producers have to decide about the
  scheduling, i.e. which thread gets which command.

- It seems more intuitive to me to have different input channels for
  different purposes.

This is comparable to the 'select()' function which is widely used.

If you say, there is no good implementation for waiting on several
queues in python, then ok, in this case I admit that using a single
input queue per thread might be the way to go (I'm also not satisfied
by the three alternatives I have enumerated in the original post; I
think I will try alternative 3 for now).

cheers,

andreas



Mon, 09 May 2005 17:56:19 GMT  
 Howto wait for multiple queues (Queue.py)?

Quote:

> How about you maintain a single Queue which holds the other Queues?  When a
> Queue goes empty it gets removed from the main Queue.  When something is
> put() into it, it's added to the main Queue if it's not already there.  Your
> consumers then wait on the main Queue.

I think this would be another possibility which would probably look
similar to alternative 3 in my post.  My main concern is the
performance of this approach as the queue is one of *the* central
concepts in my server.

Thanks,

andreas



Mon, 09 May 2005 18:01:08 GMT  
 Howto wait for multiple queues (Queue.py)?

Quote:


>> ...this doesn't follow -- just make all requests to a thread onto
>> a single Queue (tag the requests with 'kind' or whatever as needed)
>> and live happily.  A Queue instance *IS* implicitly, automatically
>> and intrinsically "synchronized" -- several threads can be reading
>> and/or writing to the same Queue instance and you're guaranteed that
>> they'll never interfere with each other.  That's just about the
>> whole POINT of the Queue class.

> Sorry, this confuses me a bit.  If you understood my mail such that I
> want to synchronize the access of several threads to one queue, my
> mail must have been totally ambiguous.

> What I really want to achieve is that several threads can consume
> items from several (possibly identical) _sets_ of queues.  As I read
> the docs and looked at the source in Queue.py I don't doubt that this
> queue implementation is threadsave.

Why?  How would it advantage your application to have a thread peel
events off many queues rather than one?

Quote:
> What I don't understand is the reasoning, *why* every thread should
> only have exactly one input queue.  This has several disadvantages

To avoid the complications that come from having many -- why else?

Quote:
> from my point of view.  Several threads can't share the same input
> queue

Of course they can, why shouldn't they?  You claim you understand
how Queue instances are threadsafe, so why would you think you
can't have several threads peeling events from the same Queue?

Quote:
> and I can't distinguish between several input channels within
> one thread.  

But of course you can, it's trivial!  If it's too much bother to
have the posters of events tag them directly, wrap the one Queue
with as many wrapping-with-identification Channel instances as
you will, e.g.

class Channel:
    def __init__(self, identifier, queue):
        self.identifier = identifier
        self.queue = queue
    def put(self, whatever):
        self.queue.put((identifier, whatever))

there -- what's the problem?  It's quite easy to similarly
delegate put_nowait, get's, etc, etc, of course.

Quote:
> For example in my case the threads should have input
> channels for commands and for events (such as command responses or
> unsolicited events).  Furthermore there can be different functional
> units which write commands and/or events to the same thread.  It is
> clear to me that this can be achieved by giving the different
> commands/events different types/kinds and use one input queue for all
> of them.  But I have at least two reservetions against this:

> - The single input queue could be a bottleneck.  If you choose to
>   avoid the bottleneck by providing several threads which understand
>   the same commands, the producers have to decide about the
>   scheduling, i.e. which thread gets which command.

Not at all, the several working threads can perfectly well peel events
off the same queue.  That's quite a common arrangement, to have a
pool of threads all peeling work requests off the same queue.

Quote:
> - It seems more intuitive to me to have different input channels for
>   different purposes.

So wrap the above Channel -- several instances thereof around the
one queue -- and let your intuition be happy.

Quote:
> This is comparable to the 'select()' function which is widely used.

At low-levels, yes, but it's best wrapped in a more abstract
Reactor Pattern for practical use.  Twisted hinges on that, and
so do such frameworks as ACE.

Quote:
> If you say, there is no good implementation for waiting on several
> queues in python, then ok, in this case I admit that using a single
> input queue per thread might be the way to go (I'm also not satisfied
> by the three alternatives I have enumerated in the original post; I
> think I will try alternative 3 for now).

There's no good implementation (that I know of), AND there is
absolutely no real need for it, so it's unlikely that anybody would
sweat to make such a "good implementation" happen -- why would they?

Alex



Mon, 09 May 2005 19:43:33 GMT  
 Howto wait for multiple queues (Queue.py)?

Quote:
>Hi,

>as I prefer to use queues for interthread communication, I need
>something like a 'mainloop' for my threads in the sense that every
>thread can have multiple input queues and I want to be able to block
>the consumer thread until input within one (or more) queues is
>available.  This mechanism should be somewhat comparable to 'select()'
>or 'MsgWaitForMultipleObjects()' (on w32).

[...<ideas long on possible hows, short on whys>...]

ISTM there are a lot of questions that need to be answered before anyone can
get a grasp of what your problem really is, without making a lot of assumptions.

Step back from the implementation issues a moment. You have multiple threads. Ok.
They are to communicate. Ok. But what is the possible graph of their communications?
Pipeline? Acyclic? Static? etc. Do produced messages have types that sometimes or always
require direction to a[?] certain consumer[s?]? Are there ordered subsequences of messages
that must arrive in original order at single consumers, even though several consumers might be
able to process them? Or, e.g., could a thread simply requeue an item it can't process?

I could go on, but I imagine you get the point: the specs are not all visible from here ;-)

Regards,
Bengt Richter



Tue, 10 May 2005 00:46:53 GMT  
 Howto wait for multiple queues (Queue.py)?
Andreas Ames fed this fish to the penguins on Thursday 21 November 2002
02:01 am:

Quote:

> I think this would be another possibility which would probably look
> similar to alternative 3 in my post.  My main concern is the
> performance of this approach as the queue is one of *the* central
> concepts in my server.

        I suspect Alex Martelli (sp?) has the proper argument, but may have
obscured it some.

        You seem to be asking for something similar to Ada's selective
rendezvous (not actual Ada shown below):

        Accept EventQueue()
                ....
        or accept CommandQueue()
                ....

Problem with Ada is that those accepts are specific to the task
(thread), and not shared by a pool.

        In pseudo-Python you seem to want:

        while 1:
                c = Command.get_nowait()
                e = Event.get_nowait()
                if c:
                        #command process
                elif e:
                        #event process

WITHOUT the busy wait.

        So... Rather than use two queues, on for each type of data, why not
pass the data type as the first item of the entry?

        while 1:
                d = CEQueue.get()       #use the blocking mode
                if d[0] == "COMMAND":
                        #command process
                elif d[0] == "EVENT":
                        #event process

        All the "writers" use:

        CEQueue.put(("COMMAND, data))
or
        CEQueue.put(("EVENT, data))

        IOWs; just take your current queue data, and make a tuple with that
data AND a determinant for the type of data.

        Ada would need the selective accept because of its strict static type
checking. python Queues can accept any data with each entry.

--
 > ============================================================== <


 > ============================================================== <
 >        Bestiaria Home Page: http://www.beastie.dm.net/         <
 >            Home Page: http://www.dm.net/~wulfraed/             <



Tue, 10 May 2005 09:40:04 GMT  
 
 [ 8 post ] 

 Relevant Pages 

1. Clarification of Queue Operation / Behavior when using multiple Queues

2. CLASSes and QUEUEs (was: Re: QUEUE in QUEUE)

3. How to make a queue of queue's

4. Queue in Queue?!

5. Queue of Queues problem.

6. QUEUE in QUEUE

7. QUEUE inside GROUP/QUEUE

8. Queue.Queue examples?

9. VW5i SocketAccessor listenFor: maximum (socket wait queue size)

10. Bug in timeout behavior of wait for queue element

11. Bug in timeout behavior of wait for queue element

12. Waiting on queue in external procedure

 

 
Powered by phpBB® Forum Software