Thursday, June 18, 2009

Parallel ForEach Loop in C# 3.5

You can take advantage of multi-core processors and execute foreach loops in parallel.  This works especially well when getting multiple chunks of data from external data sources such as a database, web service, or RESTful API.  I designed this after the upcoming parallel features coming in .NET 4.0, but for now they’re in beta and I’ve had trouble using them.  So this method is a simple implementation of it.

Usage (not that this example makes any sense…)

string[] names = { "cartman", "stan", "kenny", "kyle" };
names.EachParallel(name =>
{
    Console.WriteLine(name);
});

Of course you’ll probably want some exception handling to prevent the threads from aborting if an exception occurs.

string[] names = { "cartman", "stan", "kenny", "kyle" };
names.EachParallel(name =>
{
    try
    {
        Console.WriteLine(name);
    }
    catch { /* handle exception */ }
});

Here is the source

Update: Added special handling for the case when the enumerable only contains one element.  It just executes the method directly (in serial) to avoid the overhead of creating a thread and using the WaitHandle, since that will essentially execute it in serial anyway.

Update: Some systems have a cap of 64 threads that you can track with WaitHandle.  An anonymous developer posted a solution in the comments.  I integrated his changes and made some modifications.  It breaks up the enumerable into 64-item chunks and processes those items in parallel.

/// <summary>
/// Enumerates through each item in a list in parallel
/// </summary>
public static void EachParallel<T>(this IEnumerable<T> list, Action<T> action)
{
    // enumerate the list so it can't change during execution
    list = list.ToArray();
    var count = list.Count();

    if (count == 0)
    {
        return;
    }
    else if (count == 1)
    {
        // if there's only one element, just execute it
        action(list.First());
    }
    else
    {
        // Launch each method in it's own thread
        const int MaxHandles = 64;
        for (var offset = 0; offset <= list.Count() / MaxHandles; offset++)
        {
            // break up the list into 64-item chunks because of a limitiation 
// in WaitHandle
var chunk = list.Skip(offset * MaxHandles).Take(MaxHandles); // Initialize the reset events to keep track of completed threads var resetEvents = new ManualResetEvent[chunk.Count()]; // spawn a thread for each item in the chunk int i = 0; foreach (var item in chunk) { resetEvents[i] = new ManualResetEvent(false); ThreadPool.QueueUserWorkItem(new WaitCallback((object data) => { int methodIndex = (int)((object[])data)[0]; // Execute the method and pass in the enumerated item action((T)((object[])data)[1]); // Tell the calling thread that we're done resetEvents[methodIndex].Set(); }), new object[] { i, item }); i++; } // Wait for all threads to execute WaitHandle.WaitAll(resetEvents); } } }

13 comments:

  1. Hola, estube testiando este codigo, resulto el siguiente excepcion al momento de recorrer una lista que tiene mas de 64 items

    "El número de WaitHandles debe ser menor o igual a 64."

    ReplyDelete
  2. Ju4n5n - Thanks for finding this. Hey said that you get an error if the number of items in the list is > 64. The solution would be to process the list in 64-item chunks, so there would be at most 64 threads at one time. I'll post an update (eventually) to fix this. It's not so hard, so try to implement this solution yourself.

    ReplyDelete
  3. So what's the solution? Could you please post one?

    ReplyDelete
  4. for(var offset = 0; offset < list.Count() /64; offset++){
    int i = 0;
    foreach (var item in list.Skip(offset*64).Take(64)
    {
    resetEvents[i] = new ManualResetEvent(false);
    ThreadPool.QueueUserWorkItem(new WaitCallback((object data) =>
    {
    int methodIndex = (int)((object[])data)[0];

    // Execute the method and pass in the enumerated item
    action((T)((object[])data)[1]);

    // Tell the calling thread that we're done
    resetEvents[methodIndex].Set();
    }), new object[] { i, item });
    i++;
    }

    // Wait for all threads to execute
    WaitHandle.WaitAll(resetEvents);
    }

    ReplyDelete
  5. This worked really nicely for me. Good work.

    Is there an easy way of breaking out of the parallel for loop if a request comes to stop it? In my original for loop I had an interface that checked each time it had completed an operation to see if the user has cancelled and could then break out of the loop. Is there anyway you can still have this 'break' functionality in this loop, which would of course also kill all concurrent threads.

    Thanks again,

    Matthew

    ReplyDelete
  6. I posted an update that breaks up the list into 64-item chunks. I used the solution that somebody posted above. Thanks!

    ReplyDelete
  7. Nice idea,Thanks

    ReplyDelete
  8. any sample with sense ? thanks

    ReplyDelete
  9. Great stuff!
    i've added a counter and the max handlers as a argument:

    public static void EachParallel(this IEnumerable list, Action action, int MaxHandles)
    {
    // enumerate the list so it can't change during execution
    list = list.ToArray();
    var count = list.Count();

    if (count == 0)
    {
    return;
    }
    else if (count == 1)
    {
    // if there's only one element, just execute it
    action(list.First(),0);
    }
    else
    {
    ManualResetEvent[] resetEvents = new ManualResetEvent[MaxHandles];
    int counter = 0;

    // Launch each method in it's own thread
    for (var offset = 0; offset < list.Count() / MaxHandles; offset++)
    {
    int i = 0;
    foreach (var item in list.Skip(offset * MaxHandles).Take(MaxHandles))
    {
    resetEvents[i] = new ManualResetEvent(false);
    ThreadPool.QueueUserWorkItem(new WaitCallback((object data) =>
    {
    int methodIndex = (int)((object[])data)[0];

    counter++;
    // Execute the method and pass in the enumerated item
    action((T)((object[])data)[1], counter);

    // Tell the calling thread that we're done
    resetEvents[methodIndex].Set();
    }), new object[] { i, item });
    i++;
    }

    // Wait for all threads to execute
    WaitHandle.WaitAll(resetEvents);
    }
    }
    }

    ReplyDelete
  10. this IEnumerable list, should have Type arguments , right?

    did this compile?

    ReplyDelete
  11. System.Collections.Generic.IEnumerable

    ReplyDelete
  12. Convert to VB.NET please.
    Thanks.

    ReplyDelete
  13. I used this code in an asp.net app and the IIS crashed!

    ReplyDelete