Discussion:
[asio-users] strand-like priority queues
Rutger ter Borg
2009-08-21 13:51:13 UTC
Permalink
Dear all,

Suppose I would like to implement time-prioritized handlers for serialized
execution, that have to run through an ordinary io_service. A strand-like
interface to priority queues based would be desired,

io_service.run();
...
io_service.post( prio.push( 1, ... ) )
io_service.post( prio.push( 0, ... ) )
io_service.post( prio.push( 2, ... ) )
// runs 0 // e.g., from low to high
// runs 1
// runs 2

In terms of interface and functionality, I'm looking for a combination of a
strand and a priority queue. I have looked in detail at the prioritized
handler example.

Given that example, insertion into the priority queue is delayed up to the
point of invocation. I guess for my example this would be too late --
execution priority should be determined at the point of "wrapping",
shouldn't it (otherwise you don't know if you have to wait during
invocation)? To use the terminology of the example, somehow the priority
ordering should be shared between all wrapped_handlers and queued_handlers.
Then, when invoking, run all queued_handlers as long as they have higher
priority than any other handler. Something like this?

How to build a container (prio queue) for unbound handlers? ... Is this a
matter of a whole bunch of deadline timers? Looks like that storage of
unbound handlers is a recurring theme for me :-)

Thanks in advance for any suggestion,
Cheers,

Rutger
Christopher Kohlhoff
2009-08-31 12:06:18 UTC
Permalink
Post by Rutger ter Borg
Dear all,
Suppose I would like to implement time-prioritized handlers for serialized
execution, that have to run through an ordinary io_service. A strand-like
interface to priority queues based would be desired,
io_service.run();
...
io_service.post( prio.push( 1, ... ) )
io_service.post( prio.push( 0, ... ) )
io_service.post( prio.push( 2, ... ) )
// runs 0 // e.g., from low to high
// runs 1
// runs 2
In terms of interface and functionality, I'm looking for a combination of a
strand and a priority queue. I have looked in detail at the prioritized
handler example.
Given that example, insertion into the priority queue is delayed up to the
point of invocation. I guess for my example this would be too late --
execution priority should be determined at the point of "wrapping",
shouldn't it (otherwise you don't know if you have to wait during
invocation)? To use the terminology of the example, somehow the priority
ordering should be shared between all wrapped_handlers and queued_handlers.
Then, when invoking, run all queued_handlers as long as they have higher
priority than any other handler. Something like this?
So just to clarify, do you mean if you have two long running async
operations:

sock1.async_read_some(..., prio.push(0, ...));
sock2.async_read_some(..., prio.push(1, ...));

then you:

- do want the async operations to run in parallel

- don't want the sock2 operation's handler to run until the sock1
handler has run

Is that right?
Post by Rutger ter Borg
How to build a container (prio queue) for unbound handlers? ... Is this a
matter of a whole bunch of deadline timers? Looks like that storage of
unbound handlers is a recurring theme for me :-)
If you do mean the above, you don't need to store unbound handlers. The
async operations themselves already store the unbound handlers. You just
need to store the bound handlers only after the operation has finished
just like the prioritised_handlers example. However, in addition to the
example, you would need to keep track of the highest priority operation
that is still in progress (i.e. just the priority value, not the
handler). The function object at the top of the queue simply doesn't get
popped off until its priority exceeds the highest in-progress value.

Cheers,
Chris
Rutger ter Borg
2009-09-01 07:31:31 UTC
Permalink
Christopher Kohlhoff wrote:

[snip]
Post by Christopher Kohlhoff
- do want the async operations to run in parallel
- don't want the sock2 operation's handler to run until the sock1
handler has run
Is that right?
Yes, I guess so.
Post by Christopher Kohlhoff
If you do mean the above, you don't need to store unbound handlers. The
async operations themselves already store the unbound handlers. You just
need to store the bound handlers only after the operation has finished
just like the prioritised_handlers example. However, in addition to the
example, you would need to keep track of the highest priority operation
that is still in progress (i.e. just the priority value, not the
handler). The function object at the top of the queue simply doesn't get
popped off until its priority exceeds the highest in-progress value.
Clever, this should work.

Thanks,

Rutger
Rutger ter Borg
2009-09-10 07:43:32 UTC
Permalink
Post by Christopher Kohlhoff
If you do mean the above, you don't need to store unbound handlers. The
async operations themselves already store the unbound handlers. You just
need to store the bound handlers only after the operation has finished
just like the prioritised_handlers example. However, in addition to the
example, you would need to keep track of the highest priority operation
that is still in progress (i.e. just the priority value, not the
handler). The function object at the top of the queue simply doesn't get
popped off until its priority exceeds the highest in-progress value.
Cheers,
Chris
Chris,

I do think I'm running in a bit of a problem due to uncertainties of the
guarantees provided by a strand.

Suppose I called the priority wrapping "next" which does handling by some
priority. I think Asio was designed to enable nested wrapping, so I was
thinking of using that, and allow it. Looking at some cases a user might
use:

Assumptions:
* m_next knows about 2 operations before the async ops are initiated
* the priority is taken from their handler concept
* f is supposed to be called before g

// 1
// f gets called before g, correct order of (possibly parallel) execution
async_stuff( ..., m_next.wrap( boost::bind( f, _1 ) ) );
async_stuff( ..., m_next.wrap( boost::bind( g, _1 ) ) );

// 2
// strand(f) called before strand(g), order of execution: undetermined?
async_stuff( ..., m_next.wrap( m_strand.wrap( boost::bind( f, _1 ) ) ) );
async_stuff( ..., m_next.wrap( m_strand.wrap( boost::bind( g, _1 ) ) ) );

// 3
// either next(f) or next(g) gets called first, correct order of execution
async_stuff( ..., m_strand.wrap( m_next.wrap( boost::bind( f, _1 ) ) );
async_stuff( ..., m_strand.wrap( m_next.wrap( boost::bind( g, _1 ) ) );

I would really like to see case 2 have a determined order of execution.
I.e., I think it is necessary that a strand guarantees subsequent execution.
It improves consistency, and enables more elegant mixing of concurrent and
non-concurrent parts of the program.

What do you think?

Cheers,

Rutger
Christopher Kohlhoff
2009-09-10 08:40:41 UTC
Permalink
Post by Rutger ter Borg
I do think I'm running in a bit of a problem due to uncertainties of the
guarantees provided by a strand.
...
Post by Rutger ter Borg
I would really like to see case 2 have a determined order of execution.
I.e., I think it is necessary that a strand guarantees subsequent execution.
It improves consistency, and enables more elegant mixing of concurrent
and non-concurrent parts of the program.
What do you think?
The original intent was that only strand.post() would provide ordering
guarantees (e.g. see the TR2 proposal based on asio). This was to permit
dispatch() to speculatively invoke handlers if no handler was currently
executing, even if already queued. However, the current implementation
doesn't do this, and you're right about the utility of also providing
the guarantee for dispatch.

So, I'm happy to codify the following guarantee on strand ordering...

Given:

— a strand object "s"

— an object "a" meeting completion handler requirements

— an object "a1" which is an arbitrary copy of "a" made by the implementation

— an object "b" meeting completion handler requirements

— an object "b1" which is an arbitrary copy of "b" made by the implementation

if any of the following conditions are true:

- "s.post(a)" happens-before "s.post(b)"

- "s.post(a)" happens-before "s.dispatch(b)", where the latter is
performed outside the strand

- "s.dispatch(a)" happens-before "s.post(b)", where the former is
performed outside the strand

- "s.dispatch(a)" happens-before "s.dispatch(b)", where both are
performed outside the strand

then "asio_handler_invoke(a1, &a1)" happens-before
"asio_handler_invoke(b1, &b1)".

Note that in the following case:

async_op_1(..., s.wrap(a));
async_op_2(..., s.wrap(b));

the completion of the first async operation will perform
"s.dispatch(a)", and the second will perform "s.dispatch(b)", but the
order in which those are performed is unspecified. That is, you cannot
state whether one happens-before the other. Therefore none of the above
conditions are met and no ordering guarantee is made.

Cheers,
Chris
Rutger ter Borg
2009-09-10 09:10:31 UTC
Permalink
Post by Christopher Kohlhoff
The original intent was that only strand.post() would provide ordering
guarantees (e.g. see the TR2 proposal based on asio). This was to permit
dispatch() to speculatively invoke handlers if no handler was currently
executing, even if already queued. However, the current implementation
doesn't do this, and you're right about the utility of also providing
the guarantee for dispatch.
So, I'm happy to codify the following guarantee on strand ordering...
...
That's great news, thanks a lot!

Cheers,

Rutger
Rutger ter Borg
2009-09-11 08:15:36 UTC
Permalink
Post by Christopher Kohlhoff
then "asio_handler_invoke(a1, &a1)" happens-before
"asio_handler_invoke(b1, &b1)".
Suppose I'm queuing up bound handlers until its priority is at least that of
one of the operations in progress, so I have now

// learn next of priorities in progress
m_next.push_in_progress( 1 );
m_next.push_in_progress( 2 );

// do some async stuff
// wrap signature: wrap( in_progress_prio, handler );
async_op( ..., m_next.wrap( 1, ... ) );
async_op( ..., m_next.wrap( 2, ... ) );

etc. It works very well, also in conjunction with strands.

With this, it possible to, e.g., let a multi-timer behave like a single
timer with predefined intervals. The only thing is, is that you'll get your
handlers called with a delay, because the priority queue will have to be
sure that you get the highest priority. But in practice this is not a
problem.

Now, going to my question :-), suppose I have queued up a couple of handlers
which are already invoked through asio_handler_invoke. I have a loop that
dispatches handlers if they're ready for it, like

while( ... ) {
m_prio_queue.top().m_handler();
}

then I assume these handlers this will never run concurrent, in the case
that they have been queued up. If different threads call
asio_handler_invoke, then there's possible concurrent execution. It would be
nice to see if there's an idle thread available, so the lock will not be
held for long if there are many handlers that become dispatched. Are there
facilities for that, or should I just use io_service.dispatch?

Thanks,

Rutger
Shiv Chawla
2011-10-27 14:06:17 UTC
Permalink
Post by Christopher Kohlhoff
However, in addition to the
example, you would need to keep track of the highest priority operation
that is still in progress (i.e. just the priority value, not the
handler). The function object at the top of the queue simply doesn't get
popped off until its priority exceeds the highest in-progress value.
Chris, How to keep a track of highest priority operation? How to demarcate the
in-progress function and it's priority?

Thanks
Shiv

Loading...