Visualizing Google Suggest with Web Seer

Unfortunately this was down at the time I found it, but I like how it can show relationships between the results of different queries. 

 

WebGL Globe

Found this Globe data visualizer on the web.  Free, and really awesome.  Click the image to explore...

 

Aurora from a few weeks back

A little scenery from behind my house a few weeks back.  Pretty good pictures by my son Mikael with just the DSLR, not the 6" scope.

 

I really like this one, later that night, on top of the aurora.  The bright streak is the ISS as it made a 4 minute transit across the sky (I'd say this was a 30 second exposure).  The dotted line is an airplane flying by.  Thankfully, they didn't crash!

 

Code from Philly.NET Session on Concurrency

For those of you that showed interest, here is the code I demonstrated.  In particular you would be interested in the NDC.Parallel.WinRTHost project. Probably not a comment in the thing, but I'll get around to explaining it on the blog very soon.

NDC.Parallel.ConsoleHost.zip (15.96 mb) 

NDC 2013 Concurrency Talk Content

For anyone looking for the code and presentation, you can find them here.

 

TPL Cancellation Pattern #1: Cancellation by Polling

The orderly shutdown of threads in a multithreaded application is an important task to ensure proper disposal of resources and completion of work items without losing data. This is usually handled by setting a shared variable to a signaled value for the thread(s) to monitor, or in extreme cases through the use of Thread.Abort method. Either of these are problematic to control and ensure proper operation, especially as the # of threads gets to be over one! And even still, the process of determining when all threads have stopped also is wrought with problems.

To alleviate these problems, the TPL provides a new mechanism for controlling the shutdown of tasks, referred to a CancellationToken. CancellationToken’s provide a model of cooperative shutdown of tasks, where cancellation tokens provide a well defined means of being able to signal task cancellation, and to also provide information to the signaler that all tasks have indeed shutdown.

Additionally, TPL tasks may not even be implemented by threads, and therefore a new mechanism is required. It is currently the case that tasks are implemented on the thread pool, but other task factories can be implemented to schedule code execution in other means, for example on the GPU which would not use the normal threading API’s. The cancellation token model then also provides a unified cancellation model that can work with multiple task implemenations, as well as coordinating shutdown of tasks running in different implementations.

Understanding of the different patterns of implementing shutdown is important to being able to interoperate successfully in a coordinated model of cancellation. As a general practice, you should not assume that your task works in isolation, that it may need to be cancelled in combination to other tasks upon the request of a supervisory task. And your task should be implemented in a means that they can always identify and cancel operation in a timely manner.

To be a good citizen of TPL, task implementations and coordination should follow a number of patterns, all of which have slightly different ramifications on how orchestration of cancellation occurs. This is the purpose of this sequence of posts, to introduce the reader to these different patterns.

The first pattern we'll examine I refer to as "cancellation via polling".  The following code exemplifies the pattern.  A function is declared that will run run as a task.  One of the parameters to the function will be a CancellationToken, which the implementation of the function can use to identify that a cancellation has been signaled.  The function in this examples continuously loops, with each loop pausing for 100ms to simulate some work, and incrementing a count through each iteration.  When cancellation is identified, the function will return the number of iterations performed.

            var f = new Func(
                (token) =>
                {
                    var count = 0;
                    while (!token.IsCancellationRequested)
                    {
                        count++;
                        Task.Delay(100).Wait();
                    }
                    return count;
                }
                );

            var cts = new CancellationTokenSource();
            var ct = cts.Token;

            var task = Task.Factory.StartNew(() => f(ct));
            Task.Delay(500).Wait();

            cts.Cancel();
            task.Wait();

            Console.WriteLine("{0}", task.Status);
            if (task.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task.Result);

Cancellation tokens are not directly instantiated.  Instead they are created via a factory class called CancellationTokenSource.  The actual token is available via the CancellationTokenSource's Token property.  The token is then passed to methods to track signaling of cancellation via the tokens IsCancellationRequested property.  The actual signal for cancellation is set on the cancellation token source, not on the token.  Also, you can only signal cancellation once, and you can't reverse a cancellation request.

This example creates a cancellation token, and then runs the method as a task and passes the cancellation token to the task.  It then delays for 500ms, which gives the task time to do roughly 5 iterations.  After this short delay, the cancellation token is signaled via the Cancel method of the token source, which will cause the task to see the signal during its next iteration, which in this case may take up to 100ms.  Note that setting the cancellation does not terminate / abort any tasks, it is just setting a value that all tasks that have been passed this token can identify.

After setting the cancellation signal, this code then waits for the task to complete.  If the task is implemented to work well with collaborative cancellation, then the task will complete work and move into a state that the wait method on the task will eventually, and if done correctly, complete in a timely manner.  In this example, this could take upwards to 100ms.

The example then writes to the console the value of the state of the task, and then the Result property of the task.  The output from this example should be the following (perhaps a slight variance in the count due to processor differences).

RanToCompletion
5
Press any key to continue . . .

This is the result that was expected. While the main thread/task waits for 500ms, the sub-task will iterate five times, see the signal, exit the while loop, and then return the value. The task will notice the method call completing, and transition to the RanToCompletion state.

This example does exhibit a problem: the fact that it may take 100ms for the signal to be identified.  This can be rectified by passing the token to the Task.Delay method (modified method shown):

            var f = new Func(
                (token) =>
                    {
                        var count = 0;
                        while (!token.IsCancellationRequested)
                        {
                            count++;
                            Task.Delay(100, token).Wait();
                        }
                        return count;
                    }
                );

The Delay method will then watch the cancellation token for it's signal, and return immediately upon recognition. When running, the output is then:

Unhandled Exception: System.AggregateException: One or more errors occurred. ---> System.AggregateException: One or more
 errors occurred. ---> System.Threading.Tasks.TaskCanceledException: A task was canceled.
   --- End of inner exception stack trace ---
   at System.Threading.Tasks.Task.ThrowIfExceptional(Boolean includeTaskCanceledExceptions)
   at System.Threading.Tasks.Task.Wait(Int32 millisecondsTimeout, CancellationToken cancellationToken)

Whoopsies! What's going on?

The answer is that it is, in many cases, considered proper form in processing a signal for cancellation to execute the ThrowIfCancellationRequested method of the cancellation token. This will throw a TaskCanceledException and exit the method. The Task.Delay method does just this.  And since the code does not catch the exception, execution of the app completely aborts.

To handle the exception, there are three sub-patterns that you can follow.  The first is to wrap the code starting the task that propagates the exception with a try/catch.  A modification like this could look like the following:

            try
            {
                cts.Cancel();
                task.Wait();
            }
            catch (AggregateException ex)
            {
                Console.WriteLine(ex.Message);
                Console.WriteLine(ex.InnerExceptions[0].InnerException.Message);
            }

And the results would be the following:

One or more errors occurred.
A task was canceled.
Faulted
Press any key to continue . . .

The exception that is thrown in this case is actually an AggregateException. This is an exception class that wraps one or more other exceptions. Each task that executes the token's ThrowIfCancellationRequested method will have a TaskAbortedException object included as one of the InnerExceptions.

This pattern of aggregating exceptions like this is actually very convenient. It is a way of a task that has received the cancellation signal to notify the signaler that it has canceled effectively.

Let's look at how this can be used by modifying the example a little. The modification allows the passing in of the delay value to the method, and also has the method stop iterating / complete after 3 iterations. Two tasks are then started, one with a 100ms interval and the other with 250ms. The same delay of 500ms is performed, and then the cancel is signaled.

            var f = new Func(
                (interval, token) =>
                    {
                        var count = 0;
                        while (!token.IsCancellationRequested & count < 3)
                        {
                            count++;
                            Task.Delay(interval, token).Wait();
                        }
                        return count;
                    }
                );

            var cts = new CancellationTokenSource();
            var ct = cts.Token;

            var task1 = Task.Factory.StartNew(() => f(100, ct));
            var task2 = Task.Factory.StartNew(() => f(250, ct));
            
            Task.Delay(500).Wait();

            try
            {
                cts.Cancel();
                Task.WaitAll(task1, task2);
            }
            catch (AggregateException)
            {
            }

            Console.WriteLine("task1.Status == {0}", task1.Status);
            if (task1.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task1.Result);
            Console.WriteLine("task2.Status == {0}", task2.Status);
            if (task2.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task2.Result);

When executed, the following is the result:

task1.Status == RanToCompletion
3
task2.Status == Faulted
Press any key to continue . . .

The first task can complete 3 iterations within 500ms, and therefore moves to RanToCompletion status, and the second task does not, throws a TaskAbortedException, and moves to the Faulted state. The exception can then be used to identify which one of the tasks has not completed and responded to the cancellation request. Normally, a lot more work when using threads.

What if we wanted to see how many iterations were completed by a task before the cancel signal was identified? The method can be changed to catch the exception thrown by the Task.Delay task and exit normally, as follows:

            var f = new Func(
                (interval, token) =>
                    {
                        var count = 0;
                        while (!token.IsCancellationRequested & count < 3)
                        {
                            count++;
                            try
                            {
                                Task.Delay(interval, token).Wait();
                            }
                            catch (AggregateException)
                            {
                            }
                        }
                        return count;
                    }
                );

            var cts = new CancellationTokenSource();
            var ct = cts.Token;

            var task1 = Task.Factory.StartNew(() => f(100, ct));
            var task2 = Task.Factory.StartNew(() => f(250, ct));

            Task.Delay(500).Wait();

            try
            {
                cts.Cancel();
                Task.WaitAll(task1, task2);
            }
            catch (AggregateException)
            {
            }

            Console.WriteLine("task1.Status == {0}", task1.Status);
            if (task1.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task1.Result);
            Console.WriteLine("task2.Status == {0}", task2.Status);
            if (task2.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task2.Result);

And the results:

task1.Status == RanToCompletion
3
task2.Status == RanToCompletion
2
Press any key to continue . . .

Because the exception was caught in the task, and the method executed by the task completed, both now exit cleanly, no exceptions thrown, the state of both move to RanToCompletion, and the results from each retrieved.

Now as if these weren't enough permutations on how to do a cancellation, there are still more. We have not examined actually throwing a TaskAbortedException to get out of our task. Lets look at the following modification of the code.

            var f = new Func(
                (interval, token) =>
                    {
                        while (true)
                        {
                            Task.Delay(100).Wait();
                            token.ThrowIfCancellationRequested();
                        }
                    }
                );

            var cts = new CancellationTokenSource();
            var ct = cts.Token;

            var task1 = Task.Factory.StartNew(() => f(100, ct));

            Task.Delay(500).Wait();

            try
            {
                cts.Cancel();
                task1.Wait();
            }
            catch (AggregateException ex)
            {
                Console.WriteLine("Caught exception {0}", ex.InnerExceptions[0].Message);
            }

            Console.WriteLine("task1.Status == {0}", task1.Status);
            if (task1.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task1.Result);

The method executed by the task now is a while(true) loop, which delays for 100ms (not checking the token in the delay), and then calls the tokens ThrowCancellationIfRequested method. So, every 100ms this task will check the flag, and to exit it will throw an exception. When this happens, the exception is caught and the task's status is reported as Faulted.

Caught exception: The operation was canceled.
task1.Status == Faulted
Press any key to continue . . .

Now let's change one single line of code: 15. We are now passing in the cancellation token as a parameter to the Task.StartNew method. This associates the task with that token. If the method being run by the task throws an OperationAbortedException which contains this token (the case in this example), then TPL transitions the status of the task to Canceled instead of Faulted.

            var f = new Func(
                (interval, token) =>
                    {
                        while (true)
                        {
                            Task.Delay(100).Wait();
                            token.ThrowIfCancellationRequested();
                        }
                    }
                );

            var cts = new CancellationTokenSource();
            var ct = cts.Token;

            var task1 = Task.Factory.StartNew(() => f(100, ct), ct);

            Task.Delay(500).Wait();

            try
            {
                cts.Cancel();
                task1.Wait();
            }
            catch (AggregateException ex)
            {
                Console.WriteLine("Caught exception {0}", ex.InnerExceptions[0].Message);
            }

            Console.WriteLine("task1.Status == {0}", task1.Status);
            if (task1.Status == TaskStatus.RanToCompletion) Console.WriteLine("{0}", task1.Result);
Caught exception: A task was canceled.
task1.Status == Canceled
Press any key to continue . . .

The primary ramification of this is that if you want to have a task that responds to a cancellation event move to it's Canceled state, the task needs to throw an OperationAbortedException, which is the case when calling ThrowIfCancellationRequested method on the token and the IsCancellationRequested flag is true.

A second reason for passing the token to the StartNew method is that if the token already has its cancellation value set, then the task will not be started at all.  The reasoning for this is that the token could have been passed in from elsewhere, and by the time this code tries to start other tasks the flag may have been set by another task.  This prevents other tasks from being started and having to then cancel immediately, saving those resources from being unnecessarily consumed.

SUMMARY

Phew!!  That was a lot, and this is just the one pattern of cancellation: polling.  There are two more.  Those will be covered in the next posts in the series.  Fortunately, I can just show how they work without all these combinations of handing cancellation flags / throwing exceptions / try/catching.  Those will all still apply, but be able to be assumed.

Also, here is a useful link to MSDN on cancellation for your reference.

TPL Pattern #2: Scatter and Gather with Timeout (Part 2: Adding Cancellation)

In the previous post in the series (TPL Pattern #2: Scatter and Gather with Timeout (Guarding with Task.Delay)), I demonstrated how to wrap sub-tasks with another task to be able to recognize a timeout.  There was however a small problem in that although the code was able to run a continuation after the timeout and before the sub-tasks completing, the sub-tasks would still run to completion beyond the duration of the timeout.  So how do we stop those sub-tasks?  Remember, that is no abort method on tasks, and we'd want a more elegant solution than just aborting the task.

This is the domain of the CancellationToken.  Cancellation with tasks is a cooperative process, orchestrated through the use of a CancellationToken.  A single token can be passed to one or more tasks, and upon signaling that cancel is desired all tasks can monitor the tokens state representing a request for cancellation.  Tasks need to actively check the token for cancellation; simply signaling a token to cancel does not abort the task(s).  It is your responsibility, if supporting cancellation in your task, to make sure you code the task to allow frequent checking of this flag and appropriate and quick shutdown of the task.

Lets look at how the code in the previous example is modified to handle cancellation of the tasks after the timeout.  Here's the new code:

          
           var add = new Func<int, int, CancellationToken, int>(
                (a, b, t) =>
                {
                    log("add({0},{1})", a, b);
                    Task.Delay(1000, t).Wait();
                    log("add({0},{1})=={2}", a, b, a + b);
                    return a + b;
                });
            var mult = new Func<int, int, CancellationToken, int>(
                (a, b, t) =>
                    {
                        log("mult({0},{1})", a, b);
                        try
                        {
                            Task.Delay(5000, t).Wait();
                        }
                        catch (AggregateException ex)
                        {
                            log("Exception: {0}", ex.InnerExceptions[0].Message);
                        }
                        log("mult({0},{1})=={2}", a, b, a * b);
                        return a * b;
                    });

            var cts = new CancellationTokenSource();
            var token = cts.Token;

            var workers = new[]
                {
                    Task.Factory.StartNew(() => add(1, 2, token)),
                    Task.Factory.StartNew(() => mult(3, 4, token))
                };

            var guarded = new[]
                {
                    Task.Delay(1500, token),
                    Task.WhenAll(workers)
                };

            log("started guard");

            var guard = Task.WhenAny(guarded);

            guard.ContinueWith(
                completed =>
                {
                    log("guard task completed");
                    if (guarded[0].IsCompleted)
                    {
                        log("there was a timeout.");
                        
                        log("workers[0].Status=={0}, Result=={1}", workers[0].Status,
                            workers[0].Status != TaskStatus.RanToCompletion
                                ? workers[0].Status.ToString()
                                : workers[0].Result.ToString());
                        log("workers[1].Status=={0}, Result=={1}", workers[1].Status,
                            workers[1].Status != TaskStatus.RanToCompletion
                                ? workers[1].Status.ToString()
                                : workers[1].Result.ToString());

                        cts.Cancel();
                        Task.WaitAll(workers);
                        log(workers[1].Status.ToString());
                    }
                    else
                    {
                        log("results == {0} {1}", workers[0].Result, workers[1].Result);
                    }

                    log("all done!");
                });

            log("falling through");

The first difference is in the functions that will be executed as tasks.  They now have an additional parameter, a CancellationToken, that will be passed to the function.  The Thread.Sleep method, which was being used to simulate a period of time the method works, is replaced by Task.Delay([time], token).Wait().  Threads don't have a concept of cancellation, so the code needs to use the task based equivalent, the Task.Delay method, which creates another task (it does not delay the current task), which reaches RanToCompletion status in the specified time.  But, and like many of the task methods, Delay also accepts a CancellationToken, which if it has its cancel state signaled, the Delay method will throw an exception at that point and return before the end of the specified interval.

In this example, the Task.Delay method of the mult function will notice the signal to the CancellationToken and throw an Exception.  Specifically, it will throw an AggregateException.  Therefore, the Task.Delay in this method is wrapped with a try/catch.  When the exception is thrown, it will be caught, and the method will exit promptly through the normal path of execution.  Because this exits cleanly, the task will have a RanToCompletion status when done.  If this exception was not caught, the status would be Faulted.  We'll come back to the importance of this in a little.

Lines 25-26 then create a CancellationToken.  You create a CancellationToken by creating an instance of CancellationTokenSource, and retrieving the .Token property.  The token can then be passed to the methods when they are started as a task, as is done in lines 30-31.

Code then executes as in the previous example until line 61.  At this point, it has been determined that there was a timeout, and that the mult task is still Running.  To notify it to stop, we call the Cancel method of the tokens source object.  This will set the cancellation requested state of the token and any interested tasks will be able to check this state.

The code then waits once more on all of the tasks, and very quickly the task running the mult function will notice the cancellation request, and exit cleanly and change its state to RanToCompletion, and hence the WaitAll method will complete.  We have now just shut down the task(s) that ran past the timeout!

Running the code produces the following, which demonstrates this is the case:

60.5592 started guard
44.8436 add(1,2)
44.8436 mult(3,4)
45.8281 started guard
54.6067 falling through
1055.0915 add(1,2)==3
1545.0274 guard task completed
1545.0274 there was a timeout.
1551.9088 workers[0].Status==RanToCompletion, Result==3
1552.8641 workers[1].Status==Running, Result==Running
1594.8094 Exception: A task was canceled.
1594.8094 RanToCompletion
1595.7698 all done!

This is identical in flow to the previous example through line 10.  It is at this point that the Cancel is signaled and the main task waits for all the specified tasks to complete.  The exception is caught, logged, and the task completes.  The WaitAll returns, the state of the mult task is then logged, and it has transitioned from Running to RanToCompletion, and the program exits with all tasks in a completed state.

Voila!

I will have to say that there are some subtleties to the task cancellation process that are not covered in this example.  There are actually several patterns for cancellation, each with different ramifications for how you signal, identify, and handle cancellation requests.  Those will be the subject of the next post.

TPL Pattern #2: Scatter and Gather with Timeout (Guarding with Task.Delay)

In my previous post in this series, A First Pattern of TPL: SCATTER AND GATHER, I introduced the scatter and gather pattern. While the code in the example demonstrates the pattern well, it’s not as robust as would often be needed.

This post discusses how to make add robustness through infusing the concept of a timeout into the code. As implemented, the continuation (gather) will not occur until all tasks are completed. What if we want to put a time limit on the completion of all tasks so that the program use any data retrieved, and handle the case for those tasks that not complete in the specified timeframe?

Fundamentally this is a fairly easy thing to accomplish in TPL, but like many things in programming, getting it exactly correct takes a little practice.  Let's examine the following code...

            var add = new Func<int, int, int>(
                (a, b) =>
                {
                    log("add({0},{1})", a, b);
                    Thread.Sleep(1000);
                    log("add({0},{1})=={2}", a, b, a + b);
                    return a + b;
                });
            var mult = new Func<int, int, int>(
                (a, b) =>
                {
                    log("mult({0},{1})", a, b);
                    Thread.Sleep(5000);
                    log("mult({0},{1})=={2}", a, b, a * b);
                    return a * b;
                });

            var workers = new[]
                {
                    Task.Factory.StartNew(() => add(1, 2)),
                    Task.Factory.StartNew(() => mult(3, 4))
                };

            var guarded = new[]
                {
                    Task.Delay(1500),
                    Task.WhenAll(workers)
                };

            log("started guard");

            Task.WhenAny(guarded)
                .ContinueWith(completed =>
                    {
                        log("guard task completed");
                        if (guarded[0].IsCompleted && !guarded[1].IsCompleted)
                        {
                            log("there was a timeout.");
                            log("workers[0].Status=={0}, Result=={1}", workers[0].Status, workers[0].Result);
                            log("workers[1].Status=={0}, Result=={1}", workers[1].Status, workers[1].Result);
                        }
                        else
                        {
                            log("tesults == {0} {1}", workers[0].Result, workers[1].Result);
                        }

                        log("all done!");
                    });

            log("falling through");

Let's walk through the code.  First, the log method writes the given string to the console, started with the number of milliseconds since the start of the program.  It's important to show the time of when things execute as there are certain aspects of the code that need this clarified.

The code at lines 1 and 9 declare two functions to do some math, the first adding two numbers and the second multiplying them.  Both methods write to the console that they are starting, then sleep for a period of time to simulate a lengthy calculation, then write the result to the console and return the value.

I gave both methods different sleep durations so that it is easy to show how it can be signaled to stop execution at a point where one task has completed its work and the other is still operating.

Line 18 declares an array of two tasks, scheduling an execution of the add and mult methods.

Line 24 is where we introduce a task based time "guard" to wrap the execution of the two tasks declared starting on line 18.  Line 26 declares a task that will run and complete in 1.5 seconds, and line 27 declare a task that will run when all the tasks in the given array have completed their execution.

Line 30 then does the magic.  It states that when any of the tasks in the array of tasks represented by "guarded" completes, that another task should be executed.  This is basically stating that the continuation code will run when either:

  • 1.5 seconds has expired, or
  • Both of the tasks running the add and mult methods completes
Therefore, we have set up the code to run 2 or more tasks to scatter/gather, and also specified how to signal the app in a predetermined amount of time if all the tasks do not complete.

When run, the following output is produced.  It demonstrates that we have accomplished our goal, albeit with an unintended side-effect:

31.2514 add(1,2)
46.8225 mult(3,4)
46.8225 add(1,2)
46.8225 falling through
1057.4352 add(1,2)==3
1541.3348 guard task completed
1541.3348 there was a timeout.
1541.3348 workers[0].Status==RanToCompletion, Result==3
5057.1723 mult(3,4)==12
5057.1723 workers[1].Status==Running, Result==12
5057.1723 all done!

The mult and add tasks start almost immediately, and then go into their respective delays.  As a matter of fact, the guard/timer task declared in line 26 actually is started before the mult task reports it has started. Line 5 then shows that this code completes its straight through execution, but there is a continuation task scheduled to run when either the timer task or the calculation tasks (all three still executing) complete.

Line 6 shows that the add task is completing at 1.068 seconds into execution.  Line 7 is the delay task completing, which means that either the delay task has completed, or both of the calculation tasks.  But how do we determine which of those two tasks has completed?  We can check the IsCompleted property of both of the tasks that were passed to 'WhenAny' in line 32.  Line 36 checks that if the delay task is complete, and the WaitAll task is not complete, then there was a timeout situation. 

In this situation, this is the case.  The delay task has completed (it's reporting a status of RanToCompletion) by the log statement in line 39.  But notice that the logging of the status of the mult task and the "all done!' message is not shown until after 5 seconds have expired!  What's going on?

This is one of the subtleties of TPL.  Line 40 of the program tries to write the value of the Result property of the WaitAny task to the console.  Since that task has not completed (the mult task is still running), this blocks until the task is completed.  Hence why output does not commence until after 5 seconds into execution, where we first see the exit log in the mult method, then the log that is waiting on that tasks Result property released, and then the execution of the program completes.

In summary, if you try and access the Result property of a task, and the task has not completed, you will be blocked.

Given this, how can we fix this?  Let's change the code to the following.

            var add = new Func<int, int, int>(
                (a, b) =>
                {
                    log("add({0},{1})", a, b);
                    Thread.Sleep(1000);
                    log("add({0},{1})=={2}", a, b, a + b);
                    return a + b;
                });
            var mult = new Func<int, int, int>(
                (a, b) =>
                {
                    log("mult({0},{1})", a, b);
                    Thread.Sleep(5000);
                    log("mult({0},{1})=={2}", a, b, a * b);
                    return a * b;
                });

            var workers = new[]
                {
                    Task.Factory.StartNew(() => add(1, 2)),
                    Task.Factory.StartNew(() => mult(3, 4))
                };

            var guarded = new[]
                {
                    Task.Delay(1500),
                    Task.WhenAll(workers)
                };

            log("started guard");

            var guard = Task.WhenAny(guarded);

            guard.ContinueWith(
                completed =>
                    {
                        log("guard task completed");
                        if (guarded[0].IsCompleted)
                        {
                            log("there was a timeout.");
                            log("workers[0].Status=={0}, Result=={1}", workers[0].Status,
                                workers[0].Status == TaskStatus.Running
                                    ? "[still running!]"
                                    : workers[0].Result.ToString());
                            log("workers[1].Status=={0}, Result=={1}", workers[1].Status,
                                workers[1].Status == TaskStatus.Running
                                    ? "[still running!]"
                                    : workers[1].Result.ToString());
                        }
                        else
                        {
                            log("results == {0} {1}", workers[0].Result, workers[1].Result);
                        }

                        log("all done!");
                    });

            log("falling through");

Notice how I changed the log statements in lines x-y to check to see if the task Status is Running prior to accessing the Result property.  This changes the output to the following:

46.8706 started guard
46.8706 mult(3,4)
46.8706 add(1,2)
46.8706 falling through
1062.306 add(1,2)==3
1562.3686 guard task completed
1562.3686 there was a timeout.
1562.3686 workers[0].Status==RanToCompletion, Result==3
1562.3686 workers[1].Status==Running, Result==[still running!]
1577.3484 all done!
5062.8903 mult(3,4)==12

Notice now that the log of the mult task status reports that it is Running, and since in that case the program now does not access the Result property of that task, the log statement does not block.

But...

Why does the mult task report that it is done its work at 5062ms?  Well, this is what we told it to do.  And this gets to another important point in TPL to consider.  Since we did a WhenAny on the guard tasks, and the timeout completed, and it happens that the add task also was completed, but the mult task was not complete as it is still running even though the program went along its merry way.

Which leads to an issue... What if I want in this scenario to terminate any tasks still executing after the timeout fires?  After all, those tasks still running could be dragging down the system performance.  Well, a thread programmer would jump to saying call the kill thread method on tasks not having a RanToCompletion status.  But, there is no equivalent to this in TPL!

This will be the topic of my next post, the use of CancellationTokens to control task lifetime.  But in a nutshell, a task in TPL can't really "kill/terminate" another task.  It's considered an inelegant and indeed problematic solution to the problem.  So, TPL introduces the concept of cooperative coordination and cancellation via cancellations with tokens.  It's an important concept to understand as code you write, and if you want it to integrate will with other TPL code, which you perhaps didn't write, should be written to function properly in this model.  

This will be the focus of the next post in the series...

A First Pattern of TPL: SCATTER AND GATHER

This post is an article that I submitted to be in NDC magazine.  It's intended to be the first of a series of posts on practical uses of TPL in your applications.

OVERVIEW

TPL (Task Parallel Library) and Dataflow have been around the .NET framework for a while now, and I'm still surprised at how relatively obscure they are in the general .NET developers knowledge set. Both provide elegant solutions to a number of hard problems in concurrent application development, providing the ability to create much simpler code than when just using the regular threading libraries in .NET. This article is the first in a series of articles, blog posts, and conference presentations that I will be giving on using TPL and dataflow to solve many common concurrency patterns in desktop, and in particular, desktop trading systems.

THE FIRST PATTERN OF TPL: SCATTER AND GATHER

This first article will explain a common pattern that I refer to as "scatter-gather", which is a common need of many financial systems when they start execution (as well as other types of systems). At the start of these applications it is commonly needed to retrieve data from many external systems while continuing to allow the user to continue with other actions while the data is gathered. Often, there are dependencies in this data, which requires all or a subset of the data to first be gathered before other data, and before any data is organized for the rest of the application.

This "scatter-gather" pattern has at least the three following requirements that must be handled and which I’ll discuss implementation with TPL:

  • Execute 2 .. N data requests in parallel and preferably asynchronously
  • While requests are pending, the user can still perform other functions (UI is not blocked)
  • When data is received from all requests, execution continues to take action on all data items

Traditionally coding a solution for this with threads tends to get complicated, with the need for creating multiple threads, monitoring those threads for completion, needing to pass data from the threads back through shared and concurrent data buffers, and also the issue of synchronization with the user interface thread when the data is ready. TPL has been designed to take all of these issues into account without the need to explicitly code all of these constructs. Lets examine some of the constructs in TPL that facilitate coordinating this complexity, using a few simple examples.

Tasks, unlike threads, can return data upon their completion. Threads are in a sense fire-and-forget. If you want to return data from them, you need to code the thread to put the data into memory somewhere. And to do this, you need to make sure that if the threads servicing all of your tasks share a buffer, that you provide synchronization around that buffer.

In comparison I like to think of tasks as being "promises" to perform a unit of work at sometime in the future and then potentially return a result (tasks can, if you want, return void). The unit of work may or may not be executed in parallel by TPL, but using the default task scheduler in .NET it will run on the thread pool. When the task is finished with the unit of work, it will provide a simple means of synchronizing and retrieving the result of the unit of work back to its creator.

If you think of the unit of work in terms of a function, then you are on your way to being able to utilize tasks effectively. Consider the following code:

            
            var f = new Func<int, int, int>(
                (a, b) =>
                    {
                        Thread.Sleep(1000);
                        return a + b;
                    });
            Console.WriteLine("Waiting for completion");
            var r = f(1, 2);
            Console.WriteLine(r);
            Console.WriteLine("Continuing on to do some other work");

This declares a function to add two integers (imposing a nominal delay of 1 second), adds 1 + 2, and writes the result to the console.  The function f(1,2) is executed synchronously, meaning flow of code that calls the function halts until the function returns. This program will halt for one second before writing result of the computation.  This is normal program execution, albeit a little more functional in nature with the inline function being declared.

Now let’s change this to execute as a task to introduce asynchronous processing with TPL:

            var f = new Func<int, int, int>(
                (a, b) =>
                {
                    Thread.Sleep(1000);
                    return a + b;
                });
            Console.WriteLine("Waiting for completion");
            var task = Task.Factory.StartNew(() => f(1, 2));
            task.ContinueWith(t => Console.WriteLine(t.Result));
            Console.WriteLine("Continuing on to do some other work");
            Console.ReadLine();

Instead of directly calling the function, the program instructs the default task scheduler to execute the processing of the function asynchronously to the caller via the StartNew method on the static Factory property of the Task class. StartNew returns a task object, which is the representation of the “promise” to do work and return a value in the future. Execution continues for this method, and the console message that the program is continuing is displayed, and the task completes later on its own schedule.

So, how do we know when this task is complete? And how do we get the result of the task? This is the purpose of the ‘ContinueWith’ method, as well as the ‘Result’ property, of a task. When a method that is executed via a task is completed, the TPL will signal that fact, and this can be automatically hooked into by passing a method to the task via its ContinueWith method, which the TPL will execute when the task completes.

This method that the TPL executes upon task completion is passed a single parameter which is the task that has completed. In this example, the value of ‘t’ would be a reference to the same object as the variable ‘task’. This is convenient as the continuation method may not be inline like this example and therefore this provides the reference to the completed task. From that reference, the value that is returned from the method executed by the task is available with the ‘Result’ property, which in is an integer value and will be ‘3’.

It is worth noting how elegant this code is compared to using normal threads. To handle this situation in that model we would have to introduce one of the asynchronous patterns, either APM (async programming model) or EAP (event-based async pattern). For reference on these please see http://msdn.microsoft.com/en-us/library/jj152938.aspx. The pattern that this program uses is referred to as the TAP task-based async pattern, and compared to EAP or APM, TAP has already saved us a lot of code reduced potential errors.

But this is not what I consider the greatest benefit that TPL provides for us. The TPM model also allows us to do composition of tasks, providing simple orchestration of processing by taking the results of those tasks and flowing them into other tasks. (There are other benefits, but they are beyond the scope of this paper).

Extending this example a little, let’s change the code to execute the function twice in parallel (adding 1+2, and 3+4), and when both are complete, add the results of those two values, which will result in output of 10 to the console.

            var f = new Func<int, int, int>(
                (a, b) =>
                {
                    Thread.Sleep(1000);
                    return a + b;
                });

            Console.WriteLine("Waiting for completion");

            var task1 = Task.Factory.StartNew(() => f(1, 2));
            var task2 = Task.Factory.StartNew(() => f(3, 4));
            var task = Task.WhenAll(task1, task2).ContinueWith(tasks => f(tasks.Result[0], tasks.Result[1]));
            task.ContinueWith(t => Console.WriteLine(t.Result));

            Console.WriteLine("Continuing on to do some other work");
            Console.ReadLine();

The program starts the two tasks, each similarly to the earlier example, but passing different values to each. It then uses the Task.WhenAll static method of the Task class to wait for all specified tasks to complete. When all tasks are complete, the TPL will execute the specified method. This is actually executed as another task. This continuation task is passed an array of integers, one for each task passed into the Task.WhenAll method. The method then calls the method to add those results, returning that value as the result of that third task, which when complete writes its output to the console with another continuation.

Reconstructing this with equivalent thread-based code is actually non-trivial compared to the previous example. There are a number of techniques to implement this with threads, all of which are nowhere as simple, and all more error prone than this simple TPL code.

SUMMARY

Although trivial in its computations, this example has shown the simplicity in creating and composing tasks and their results into further processing actions.  The simple calculations in this example can be replaced with much more complicated actions, with a real-world example from the domain of trading systems being the request of current equity positions in parallel from multiple exchanges, combined together with enrichment data from internal systems that is added after the market data is retrieved, and then the subsequent creation of view model and view representation of the data.

This article has also only focused on this first pattern, and not covered other valuable TPL concepts such as exception handling and automatic task synchronization with other contexts such as the UI thread.  Also, this process of composition of tasks with continuations, although elegant in its simplicity, is limited in its ability to be extremely flexible in decision making, and in other complicated constructs such as batching of results and decision making of continuation paths based upon results from finished tasks.  These actions are more suited for an additional framework, the Task Dataflow Library.

Also not covered, the .NET 4.5 compilers provide the async / await constructs, which simplify this model of TPM code with continuations even more by moving providing direct compiler support for handling continuations with closures of code in the same method.  But that’s a story for another article…

I'm currently the top listed speaker for NDC Oslo :)

 

About the author

I'm a .NET, XAML, and iOS polyglot that loves playing with new things and making cool and innovative stuff.  I am also a Mac junkie.

I am Principal Technologist for SunGard Global Services in NYC, in their Advanced Technologies practice, and I work extensively with SunGard's energy and financial customers.

Note the the posting on this blog are my own and do not represent the position, strategies or opinions of SGS.

Widget TwitterFeed not found.

The file '/widgets/TwitterFeed/widget.ascx' does not exist.X

Recent Comments

None

Month List