Parallel.ForEach 的三十二线程并行管理,Parallel.ForEach 的多线程并行处理

简介

当供给为多核机器举办优化的时候,最棒先检查下你的次序是不是有处理能够分割开来实行并行管理。(举个例子,有四个光辉的多寡集结,在那之中的成分要求叁个二个开展互相独立的耗费时间划算)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ
来赞助大家举办并行管理,本文研究这两个的差距及适用的情状。

原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft
Corporation

Parallel.ForEach

Parallel.ForEach 是 foreach
的二十四线程完结,他们都能对 IEnumerable<T>
类型对象进行遍历,Parallel.ForEach
的特有之处在于它选择二十四线程来实践循环体内的代码段。

Parallel.ForEach 最常用的花样如下:

public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source,
    Action<TSource> body)

原文pdf:http://download.csdn[.NET](http://lib.csdn.net/base/dotnet)/detail/sqlchen/7509513

PLINQ

PLINQ 也是一种对数码举办并行管理的编制程序模型,它通过 LINQ 的语法来贯彻类似
Parallel.ForEach 的四线程并行管理。

 

气象一:轻松数据 之 独立操作的并行管理(使用 Parallel.ForEach)

演示代码:

public static void IndependentAction(IEnumerable<T> source, Action<T> action)
{
    Parallel.ForEach(source, element => action(element));
}

理由:

  1. 即使 PLINQ 也提供了八个近似的 ForAll
    接口,但它对于简易的独立操作太重量化了。

  2. 行使 Parallel.ForEach 你还是能够设定
    ParallelOptions.马克斯DegreeOfParalelism
    参数(钦命最多须求有个别个线程),这样当 ThreadPool
    能源紧缺(以致当可用线程数<马克斯DegreeOfParalelism)的时候, Parallel.ForEach
    还是能够胜利运行,并且当后续有更多可用线程出现时,Parallel.ForEach
    也能立刻地利用那么些线程。PLINQ 只好通过WithDegreeOfParallelism
    方法来供给一定的线程数,即:须要了几个正是多少个,不会多也不会少。

====================================================================

场景二:顺序数据 之 并行管理(使用 PLINQ 来维周到据顺序)

当输出的数码体系必要保持原本的各种时使用 PLINQ 的 AsOrdered
方法非常轻巧高效。

演示代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
    var ProcessedMovie =
        Movie
        .AsParallel()
        .AsOrdered()
        .Select(frame => ConvertToGrayscale(frame));

    foreach (var grayscaleFrame in ProcessedMovie)
    {
        // Movie frames will be evaluated lazily
    }
}

理由:

  1. Parallel.ForEach
    完结起来供给绕一些弯路,首先你要求选拔以下的重载在措施:

    public static ParallelLoopResult ForEach(

     IEnumerable<TSource> source,
     Action<TSource, ParallelLoopState, Int64> body)
    

那几个重载的 Action 多带有了 index
 参数,那样您在出口的时候就能够应用那么些值来维系原来的行列顺序。请看下边包车型客车事例:

public static double [] PairwiseMultiply(double[] v1, double[] v2)
{
    var length = Math.Min(v1.Length, v2.Lenth);
    double[] result = new double[length];
    Parallel.ForEach(v1, (element, loopstate, elementIndex) =>
        result[elementIndex] = element * v2[elementIndex]);
    return result;
}

您大概早已意识到此处有个赫赫有名的难题:大家利用了恒久长度的数组。尽管传入的是
IEnumerable 那么你有4个缓和方案:

(1) 调用 IEnumerable.Count()
来获取数据长度,然后用这一个值实例化四个一定长度的数组,然后使用上例的代码。

(2) The second option would be to materialize the original collection
before using it; in the event that your input data set is prohibitively
large, neither of the first two options will be
feasible.(没看懂贴原版的书文)

(3)
第三种格局是利用重回一个哈希集结的法子,这种模式下一般要求至少2倍于传播数据的内部存款和储蓄器,所以拍卖大额时请慎用。

(4) 自身实现排序算法(保险传入数据与传播数据经过排序后次序一致)

  1. 绝对来讲 PLINQ 的 AsOrdered
    方法这么归纳,并且该格局能处理流式的数量,进而允许传入数据是延迟兑现的(lazy materialized)

简介

当须求为多核机器实行优化的时候,最棒先检查下你的次序是或不是有管理能够分割开来打开并行管理。(比如,有八个宏伟的数目集合,当中的因素供给二个三个举行相互独立的耗费时间计量)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ
来赞助我们开展并行管理,本文斟酌这两者的差距及适用的情景。

场地三:流数据 之 并行处理(使用 PLINQ)

PLINQ 能输出流数据,这几个特点在刹那间场子非常平价:

1.
结实集不需即使三个完好的处理完成的数组,即:任什么时候刻点下内部存款和储蓄器中仅维持数组中的部分音讯

  1. 你可见在二个单线程上遍历输出结果(就就如他们早已存在/管理完了)

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
    var StockRiskPortfolio =
        Stocks
        .AsParallel()
        .AsOrdered()
        .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
        .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));

    foreach (var stockRisk in StockRiskPortfolio)
    {
        SomeStockComputation(stockRisk.Risk);
        // StockRiskPortfolio will be a stream of results
    }
}

那边运用二个单线程的 foreach 来对 PLINQ 的输出进行三翻五次管理,经常状态下
foreach 不须要等待 PLINQ 管理完全部数据就能够开头运作。

PLINQ 也允许内定输出缓存的方法,具体可参照 PLINQ 的 WithMergeOptions
方法,及 ParallelMergeOptions 枚举

Parallel.ForEach

Parallel.ForEach 是 foreach
的多线程完成,他们都能对 IEnumerable<T>
类型对象进行遍历,Parallel.ForEach
的例外之处在于它选用四线程来实行循环体内的代码段。

Parallel.ForEach 最常用的款式如下:

public static ParallelLoopResult ForEach<TSource>(  IEnumerable<TSource> source,        Action<TSource> body)   

场景四:管理八个汇聚(使用 PLINQ)

PLINQ 的 Zip
方法提供了并且遍历七个集结併开展重组元算的章程,何况它可以与其余查询管理操作结合,达成极其复杂的作用。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    return
        a
        .AsParallel()
        .AsOrdered()
        .Select(element => ExpensiveComputation(element))
        .Zip(
            b
            .AsParallel()
            .AsOrdered()
            .Select(element => DifferentExpensiveComputation(element)),
            (a_element, b_element) => Combine(a_element,b_element));
}

亲自过问中的五个数据源能够并行处理,当互相都有叁个可用成分时提须求 Zip
实行一连处理(Combine)。

Parallel.ForEach 也能完结类似的 Zip 管理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    var numElements = Math.Min(a.Count(), b.Count());
    var result = new T[numElements];
    Parallel.ForEach(a,
        (element, loopstate, index) =>
        {
            var a_element = ExpensiveComputation(element);
            var b_element = DifferentExpensiveComputation(b.ElementAt(index));
            result[index] = Combine(a_element, b_element);
        });
    return result;
}

不移至理使用 Parallel.ForEach
后你就得本身认然而否要维持原有连串,何况要专一数组越界访谈的难点。

PLINQ

PLINQ 也是一种对数据实行并行管理的编制程序模型,它通过 LINQ 的语法来达成类似
Parallel.ForEach 的多线程并行管理。

场景五:线程局地变量

Parallel.ForEach 提供了二个线程局地变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal,TLocal> body,
    Action<TLocal> localFinally)

利用的亲自过问:

public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
    var results = new List<R>();
    using (SemaphoreSlim sem = new SemaphoreSlim(1))
    {
        Parallel.ForEach(source,
            () => new List<R>(),
            (element, loopstate, localStorage) =>
            {
                bool filter = filterFunction(element);
                if (filter)
                    localStorage.Add(element);
                return localStorage;
            },
            (finalStorage) =>
            {
                lock(myLock)
                {
                    results.AddRange(finalStorage)
                };
            });
    }
    return results;
}

线程局部变量有如何优势呢?请看上面包车型大巴事例(叁个网页抓取程序):

public static void UnsafeDownloadUrls ()
{
    WebClient webclient = new WebClient();
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

常见第一版代码是那般写的,可是运维时会报错“System.NotSupportedException
-> WebClient does not support concurrent I/O
operations.”。那是因为两个线程不恐怕同不常间做客同叁个 WebClient
对象。所以大家会把 WebClient 对象定义到线程中来:

public static void BAD_DownloadUrls ()
{
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            WebClient webclient = new WebClient();
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

修改之后依旧有失常态,因为你的机器不是服务器,大批量实例化的 WebClient
火速到达你机器允许的虚构连接上限数。线程局地变量能够消除这几个难点:

public static void downloadUrlsSafe()
{
    Parallel.ForEach(urls,
        () => new WebClient(),
        (url, loopstate, index, webclient) =>
        {
            webclient.DownloadFile(url, filenames[index]+".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            return webclient;
        },
            (webclient) => { });
}

如此的写法有限支撑了大家能博得丰盛的 WebClient 实例,同期那一个 WebClient
实例互相隔开仅仅属于个别关联的线程。

固然如此 PLINQ 提供了 ThreadLocal<T> 对象来贯彻类似的遵循:

public static void downloadUrl()
{
    var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
    var res =
        urls
        .AsParallel()
        .ForAll(
            url =>
            {
                webclient.Value.DownloadFile(url, host[url] +".dat"));
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            });
}

不过请留意:ThreadLocal<T> 相对来讲开支更加大!

情景一:不难数据 之 独立操作的并行管理(使用 Parallel.ForEach)

演示代码:

    public static void IndependentAction(IEnumerable<T> source, Action<T> action)  
    {  
        Parallel.ForEach(source, element => action(element));  
    }  

 理由:

  1. 就算 PLINQ 也提供了多个临近的 ForAll
    接口,但它对于简易的独自操作太重量化了。
  2. 选择 Parallel.ForEach 你还可以够设定
    ParallelOptions.马克斯DegreeOfParalelism
    参数(内定最多须要有个别个线程),那样当 ThreadPool
    财富紧缺(乃至当可用线程数<MaxDegreeOfParalelism)的时候, Parallel.ForEach
    依旧能够顺遂运营,况且当后续有更加多可用线程出现时,Parallel.ForEach
    也能立时地使用这个线程。PLINQ 只好通过WithDegreeOfParallelism
    方法来供给固定的线程数,即:须要了多少个正是多少个,不会多也不会少。

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载申明如下,在那之中包涵二个 ParallelLoopState 对象:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop()
提供了退出循环的点子,这种措施要比别的二种方法更快。这些法子通告循环不要再起步实行新的迭代,并尽量快的推出循环。

ParallelLoopState.IsStopped 属性可用来剖断其余迭代是或不是调用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var matchFound = false;
    Parallel.ForEach(TSpace,
        (curValue, loopstate) =>
            {
                if (curValue.Equals(match) )
                {
                    matchFound = true;
                    loopstate.Stop();
                }
            });
    return matchFound;
}

ParallelLoopState.Break() 通告循环继续实施本成分前的迭代,但不举办本成分之后的迭代。最前调用
Break 的起功能,并被记录到 ParallelLoopState.LowestBreakIteration
属性中。这种处理情势平常被利用在二个里丑捧心的物色管理中,比方您有一个排序过的数组,你想在里边查找相称元素的小小
index,那么能够应用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var loopResult = Parallel.ForEach(source,
        (curValue, loopState, curIndex) =>
        {
            if (curValue.Equals(match))
            {
                loopState.Break();
            }
         });
    var matchedIndex = loopResult.LowestBreakIteration;
    return matchedIndex.HasValue ? matchedIndex : -1;
}

境况二:顺序数据 之 并行管理(使用 PLINQ 来维周密据顺序)

当输出的数据种类需求保障原本的顺序时行使 PLINQ 的 AsOrdered
方法特别轻便高效。

演示代码:

    public static void GrayscaleTransformation(IEnumerable<Frame> Movie)  
    {  
        var ProcessedMovie =  
            Movie  
            .AsParallel()  
            .AsOrdered()  
            .Select(frame => ConvertToGrayscale(frame));  

        foreach (var grayscaleFrame in ProcessedMovie)  
        {  
            // Movie frames will be evaluated lazily  
        }  
    }  

 理由:

  1. Parallel.ForEach
    达成起来要求绕一些弯路,首先你需求选择以下的重载在点子:

     public static ParallelLoopResult ForEach<TSource >(  
         IEnumerable<TSource> source,  
         Action<TSource, ParallelLoopState, Int64> body)  
    

 那几个重载的 Action 多带有了 index
 参数,这样您在输出的时候就能够动用这几个值来维系原本的行列顺序。请看上面的事例:

    public static double [] PairwiseMultiply(double[] v1, double[] v2)  
    {  
        var length = Math.Min(v1.Length, v2.Lenth);  
        double[] result = new double[length];  
        Parallel.ForEach(v1, (element, loopstate, elementIndex) =>  
            result[elementIndex] = element * v2[elementIndex]);  
        return result;  
    }  

 
您也许早就意识到这里有个鲜明的难题:我们接纳了原则性长度的数组。假诺传入的是
IEnumerable 那么你有4个缓慢解决方案:

(1) 调用 IEnumerable.Count()
来获取数据长度,然后用那一个值实例化叁个定点长度的数组,然后利用上例的代码。

(2) The second option would be to materialize the original collection
before using it; in the event that your input data set is prohibitively
large, neither of the first two options will be
feasible.(没看懂贴原来的文章)

(3)
第三种格局是行使重回二个哈希会集的不二等秘书技,这种措施下一般须求至少2倍于传播数据的内部存款和储蓄器,所以拍卖大数据时请慎用。

(4)
自个儿完结排序算法(保险传入数据与传播数据经过排序后次序一致)

  1. 相比较 PLINQ 的 AsOrdered
    方法这么简约,何况该办法能管理流式的数据,进而允许传入数据是延迟贯彻的(lazy materialized)

景况三:流数据 之 并行管理(使用 PLINQ)

PLINQ 能输出流数据,那些特点在弹指间场所非常实用:

1.
结果集不需假若四个总体的处理达成的数组,即:任几时间点下内部存储器中仅维持数组中的部分音信

  1. 您可见在三个单线程上遍历输出结果(就临近他们早已存在/管理完了)

示例:

    public static void AnalyzeStocks(IEnumerable<Stock> Stocks)  
    {  
        var StockRiskPortfolio =  
            Stocks  
            .AsParallel()  
            .AsOrdered()  
            .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})  
            .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));  

        foreach (var stockRisk in StockRiskPortfolio)  
        {  
            SomeStockComputation(stockRisk.Risk);  
            // StockRiskPortfolio will be a stream of results  
        }  
    }  

 

那边运用贰个单线程的 foreach 来对 PLINQ 的出口实行三回九转管理,常常境况下
foreach 不须求静观其变 PLINQ 管理完全部数据就会先河运营。

PLINQ 也允许钦定输出缓存的章程,具体可参照他事他说加以考察 PLINQ 的 WithMergeOptions
方法,及 ParallelMergeOptions 枚举

气象四:管理三个聚众(使用 PLINQ)

PLINQ 的 Zip
方法提供了而且遍历多个集结併开展组合元算的点子,并且它能够与别的查询管理操作结合,达成极其复杂的机能。

示例:

    public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)  
    {  
        return  
            a  
            .AsParallel()  
            .AsOrdered()  
            .Select(element => ExpensiveComputation(element))  
            .Zip(  
                b  
                .AsParallel()  
                .AsOrdered()  
                .Select(element => DifferentExpensiveComputation(element)),  
                (a_element, b_element) => Combine(a_element,b_element));  
    }  

 示例中的八个数据源能够并行管理,当互相都有二个可用元素时提需求 Zip
进行持续管理(Combine)。

Parallel.ForEach 也能促成类似的 Zip 管理:

    public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)  
    {  
        var numElements = Math.Min(a.Count(), b.Count());  
        var result = new T[numElements];  
        Parallel.ForEach(a,  
            (element, loopstate, index) =>  
            {  
                var a_element = ExpensiveComputation(element);  
                var b_element = DifferentExpensiveComputation(b.ElementAt(index));  
                result[index] = Combine(a_element, b_element);  
            });  
        return result;  
    }  

 当然使用 Parallel.ForEach
后你就得温馨承认是不是要保障原有体系,何况要细心数组越界访谈的难点。

场景五:线程局地变量

Parallel.ForEach 提供了多个线程局地变量的重载,定义如下:

    public static ParallelLoopResult ForEach<TSource, TLocal>(  
        IEnumerable<TSource> source,  
        Func<TLocal> localInit,  
        Func<TSource, ParallelLoopState, TLocal,TLocal> body,  
        Action<TLocal> localFinally)  

 使用的示范:

    public static List<R> Filtering<T,R>(IEnumerable<T> source)  
    {  
        var results = new List<R>();  
        using (SemaphoreSlim sem = new SemaphoreSlim(1))  
        {  
            Parallel.ForEach(source,  
                () => new List<R>(),  
                (element, loopstate, localStorage) =>  
                {  
                    bool filter = filterFunction(element);  
                    if (filter)  
                        localStorage.Add(element);  
                    return localStorage;  
                },  
                (finalStorage) =>  
                {  
                    lock(myLock)  
                    {  
                        results.AddRange(finalStorage)  
                    };  
                });  
        }  
        return results;  
    }  

 线程局地变量有何优势呢?请看下边包车型大巴例子(一个网页抓取程序):

    public static void UnsafeDownloadUrls ()  
    {  
        WebClient webclient = new WebClient();  
        Parallel.ForEach(urls,  
            (url,loopstate,index) =>  
            {  
                webclient.DownloadFile(url, filenames[index] + ".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
            });  
    }  

 常常第一版代码是这么写的,可是运维时会报错“System.NotSupportedException
-> WebClient does not support concurrent I/O
operations.”。那是因为多少个线程无法相同的时间做客同一个 WebClient
对象。所以我们会把 WebClient 对象定义到线程中来:

    public static void BAD_DownloadUrls ()  
    {  
        Parallel.ForEach(urls,  
            (url,loopstate,index) =>  
            {  
                WebClient webclient = new WebClient();  
                webclient.DownloadFile(url, filenames[index] + ".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
            });  
    }  

 修改以往还是不平时,因为您的机械不是服务器,大批量实例化的 WebClient
飞快达到你机器允许的虚构连接上限数。线程局地变量能够消除那么些难点:

    public static void downloadUrlsSafe()  
    {  
        Parallel.ForEach(urls,  
            () => new WebClient(),  
            (url, loopstate, index, webclient) =>  
            {  
                webclient.DownloadFile(url, filenames[index]+".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
                return webclient;  
            },  
                (webclient) => { });  
    }  

 那样的写法保证了大家能赢得丰硕的 WebClient 实例,同不经常间那个 WebClient
实例彼此隔离仅仅属于个别关联的线程。

即便 PLINQ 提供了 ThreadLocal<T> 对象来落实类似的功用:

    public static void downloadUrl()  
    {  
        var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());  
        var res =  
            urls  
            .AsParallel()  
            .ForAll(  
                url =>  
                {  
                    webclient.Value.DownloadFile(url, host[url] +".dat"));  
                    Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
                });  
    }  

 可是请留意:ThreadLocal<T> 相对来讲开支越来越大!

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载证明如下,当中满含八个 ParallelLoopState 对象:

 

    public static ParallelLoopResult ForEach<TSource >(  
        IEnumerable<TSource> source,  
        Action<TSource, ParallelLoopState> body)  

ParallelLoopState.Stop()
提供了退出循环的主意,这种艺术要比别的三种方法越来越快。那个法子通告循环不要再起步实行新的迭代,并尽量快的生产循环。

ParallelLoopState.IsStopped 属性可用来判别别的迭代是还是不是调用了 Stop
方法。

示例:

    public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>  
    {  
        var matchFound = false;  
        Parallel.ForEach(TSpace,  
            (curValue, loopstate) =>  
                {  
                    if (curValue.Equals(match) )  
                    {  
                        matchFound = true;  
                        loopstate.Stop();  
                    }  
                });  
        return matchFound;  
    }  

 ParallelLoopState.Break() 公告循环继续推行本成分前的迭代,但不进行本成分之后的迭代。最前调用
Break 的起作用,并被记录到 ParallelLoopState.LowestBreakIteration
属性中。这种管理格局常常被运用在一个稳步的索求管理中,举个例子您有一个排序过的数组,你想在里面查找相称成分的矮小
index,那么能够应用以下的代码:

    public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>  
    {  
        var loopResult = Parallel.ForEach(source,  
            (curValue, loopState, curIndex) =>  
            {  
                if (curValue.Equals(match))  
                {  
                    loopState.Break();  
                }  
             });  
        var matchedIndex = loopResult.LowestBreakIteration;  
        return matchedIndex.HasValue ? matchedIndex : -1;  
    }  

 纵然 PLINQ 也提供了退出的建制(cancellation
token
),但相对来讲退出的时机并从未
Parallel.ForEach 那么及时。

 

相关文章