Eduasync part 19: ordering by completion, ahead of time...
Today's post involves the MagicOrdering project in source control (project 28).
When I wrote part 16 of Eduasync, showing composition in the form of majority voting, one reader mailed me a really interesting suggestion. We don't really need to wait for any of the tasks to complete on each iteration of the loop - we only need to wait for the next task to complete. Now that sounds impossible - sure, it's great if we know the completion order of the tasks, but half the point of asynchrony is that many things can be happening at once, and we don't know when they'll complete. However, it's not as silly as it sounds.
If you give me a collection of tasks, I'll give you back another collection of tasks which will return the same results - but I'll order them so that the first returned task will have the same result as whichever of your original tasks completes first, and the second returned task will have the same result as whichever of your original tasks completes second, and so on. They won't be the same tasks as you gave me, reordered - but they'll be tasks with the same results. I'll propagate cancellation, exceptions and so on.
It still sounds impossible... until you realize that I don't have to associate one of my returned tasks with one of your original tasks until it has completed. Before anything has completed, all the tasks look the same. The trick is that as soon as I see one of your tasks complete, I can fetch the result and propagate it to the first of the tasks I've returned to you, using TaskCompletionSource<T>. When the second of your tasks completes, I propagate the result to the second of the returned tasks, etc. This is all quite easy using Task<T>.ContinueWith - barring a few caveats I'll mention later on.
Once we've built a method to do this, we can then really easily build a method which is the async equivalent of Parallel.ForEach (and indeed you could write multiple methods for the various overloads). This will execute a specific action on each task in turn, as it completes... it's like repeatedly calling Task.WhenAny, but we only actually need to wait for one task at a time, because we know that the first task in our "completion ordered" collection will be the first one to complete (duh).
Show me the code!
Enough description - let's look at how we'll demonstrate both methods, and then how we implement them.
private static async Task PrintDelayedRandomTasksAsync()
{
Random rng = new Random();
var values = Enumerable.Range(0, 10).Select(_ => rng.Next(3000)).ToList();
Console.WriteLine("Initial order: {0}", string.Join(" ", values));
var tasks = values.Select(DelayAsync);
var ordered = OrderByCompletion(tasks);
Console.WriteLine("In order of completion:");
await ForEach(ordered, Console.WriteLine);
}
private static async Task<int> DelayAsync(int delayMillis)
{
await TaskEx.Delay(delayMillis);
return delayMillis;
}
The idea is that we're going to create 10 tasks which each just wait for some random period of time, and return the same time period back. We'll create them in any old order - but obviously they should complete in (at least roughly) the same order as the returned numbers.
Once we've created the collection of tasks, we'll call OrderByCompletion to create a second collection of tasks, returning the same results but this time in completion order - so ordered.ElementAt(0) will be the first task to complete, for example.
Finally, we call ForEach and pass in the ordered task collection, along with Console.WriteLine as the action to take with each value. We await the resulting Task to mimic blocking until the foreach loop has finished. Note that we could make this a non-async method and just return the task returned by ForEach, given that that's our only await expression and it's right at the end of the method. This would be marginally faster, too - there's no need to build an extra state machine. See Stephen Toub's article about async performance for more information.
ForEach
I'd like to get ForEach out of the way first, as it's so simple: it's literally just iterating over the tasks, awaiting them and propagating the result to the action. We get the "return a task which will wait until we've finished" for free by virtue of making it an async method.
private static async Task ForEach<T>(IEnumerable<Task<T>> tasks, Action<T> action)
{
foreach (var task in tasks)
{
T value = await task;
action(value);
}
}
Simple, right? Let's get onto the meat...
OrderByCompletion
This is the tricky bit, and I've actually split it into two methods to make it slightly easier to comprehend. The PropagateResult method feels like it could be useful in other composition methods, too.
The basic plan is:
- Copy the input tasks to a list: we need to work out how many there are and iterate over them, so let's make sure we only iterate once
- Create a collection of TaskCompletionSource<T> references, one for each input task. Note that we're not associating any particular input task with any particular completion source - we just need the same number of them
- Declare an integer to keep track of "the next available completion source"
- Attach a continuation to each input task which will be increment the counter we've just declared, and propagate the just-completed task's status
- Return a view onto the collection of TaskCompletionSource<T> values, projecting each one to its Task property
Once you're happy with the idea, the implementation isn't too surprising (although it is quite long):
private static IEnumerable<Task<T>> OrderByCompletion<T>(IEnumerable<Task<T>> inputTasks)
{
var inputTaskList = inputTasks.ToList();
var completionSourceList = new List<TaskCompletionSource<T>>(inputTaskList.Count);
for (int i = 0; i < inputTaskList.Count; i++)
{
completionSourceList.Add(new TaskCompletionSource<T>());
}
int prevIndex = -1;
Action<Task<T>> continuation = completedTask =>
{
int index = Interlocked.Increment(ref prevIndex);
var source = completionSourceList[index];
PropagateResult(completedTask, source);
};
foreach (var inputTask in inputTaskList)
{
inputTask.ContinueWith(continuation,
CancellationToken.None,
TaskContinuationOptions.ExecuteSynchronously,
TaskScheduler.Default);
}
return completionSourceList.Select(source => source.Task);
}
private static void PropagateResult<T>(Task<T> completedTask,
TaskCompletionSource<T> completionSource)
{
switch (completedTask.Status)
{
case TaskStatus.Canceled:
completionSource.TrySetCanceled();
break;
case TaskStatus.Faulted:
completionSource.TrySetException(completedTask.Exception.InnerExceptions);
break;
case TaskStatus.RanToCompletion:
completionSource.TrySetResult(completedTask.Result);
break;
default:
throw new ArgumentException("Task was not completed");
}
}
You'll notice there are a couple of TODO comments there. The exception in PropagateResult really shouldn't happen - the continuation shouldn't be called when the task hasn't completed. I still need to think carefully about how tasks should propagate exceptions though.
The arguments to ContinueWith are more tricky: working through my TimeMachine class and some unit tests with Bill Wagner last week showed just how little I know about how SynchronizationContext, the task awaiters, task schedulers, and TaskContinuationOptions.ExecuteSynchronously all interact. I would definitely need to look into that more deeply before TimeMachine was really ready for heavy use... which means you should probably be looking at the TPL in more depth too.
Conclusion
Sure enough, when you run the code, the results appear in order, as the tasks complete. Here's one sample of the output:
Initial order: 335 468 1842 1991 2512 2603 270 2854 1972 1327
In order of completion:
270
335
468
1327
1842
1972
1991
2512
2603
2854
TODOs aside, the code in this post is remarkable (which I can say with modesty, as I've only refactored it from the code sent to me by another reader and Stephen Toub). It makes me smile every time I think about the seemingly-impossible job it accomplishes. I suspect this approach could be useful in any number of composition blocks - it's definitely one to remember.