PLINQ 也是一种对数据开始展览并行处理的编制程序模型,PLINQ 也是1种对数码实行并行处理的编制程序模型

简介

当要求为多核机器实行优化的时候,最棒先检查下您的主次是否有处理能够分割开来开始展览并行处理。(例如,有2个宏大的数量集合,在那之中的要素必要三个二个拓展互动独立的耗费时间计量)。

.net framework 四 中提供了 Parallel.ForEach 和 PLINQ
来接济我们开始展览并行处理,本文研商这五头的反差及适用的景观。

原作者: Pamela Vagata, Parallel Computing Platform Group, Microsoft
Corporation

Parallel.ForEach

Parallel.ForEach 是 foreach
的二十八线程实现,他们都能对 IEnumerable<T>
类型对象举办遍历,Parallel.ForEach
的特种之处在于它利用多线程来实行循环体内的代码段。

Parallel.ForEach 最常用的样式如下:

public static ParallelLoopResult ForEach<TSource>(
    IEnumerable<TSource> source,
    Action<TSource> body)

原文pdf:http://download.csdn[.NET](http://lib.csdn.net/base/dotnet)/detail/sqlchen/7509513

PLINQ

PLINQ 也是一种对数码举办并行处理的编程模型,它经过 LINQ 的语法来完成类似
Parallel.ForEach 的拾2线程并行处理。

 

场地壹:简单数据 之 独立操作的并行处理(使用 Parallel.ForEach)

演示代码:

public static void IndependentAction(IEnumerable<T> source, Action<T> action)
{
    Parallel.ForEach(source, element => action(element));
}

理由:

  1. 即使 PLINQ 也提供了二个类似的 ForAll
    接口,但它对于简易的独自操作太重量化了。

  2. 接纳 Parallel.ForEach 你还是可以够够设定
    ParallelOptions.马克斯DegreeOfParalelism
    参数(钦赐最多需要多少个线程),那样当 ThreadPool
    财富贫乏(甚至当可用线程数<马克斯DegreeOfParalelism)的时候, Parallel.ForEach
    照旧可以顺遂运转,并且当后续有更多可用线程出现时,Parallel.ForEach
    也能立即地应用那一个线程。PLINQ 只好通过WithDegreeOfParallelism
    方法来供给固定的线程数,即:需要了多少个就是多少个,不会多也不会少。

====================================================================

景况贰:顺序数据 之 并行处理(使用 PLINQ 来维持数据顺序)

当输出的数据连串需求有限支撑原有的壹最近选择 PLINQ 的 AsOrdered
方法十分简单高效。

示范代码:

public static void GrayscaleTransformation(IEnumerable<Frame> Movie)
{
    var ProcessedMovie =
        Movie
        .AsParallel()
        .AsOrdered()
        .Select(frame => ConvertToGrayscale(frame));

    foreach (var grayscaleFrame in ProcessedMovie)
    {
        // Movie frames will be evaluated lazily
    }
}

理由:

  1. Parallel.ForEach
    达成起来供给绕1些弯路,首先你须要运用以下的重载在章程:

    public static ParallelLoopResult ForEach(

     IEnumerable<TSource> source,
     Action<TSource, ParallelLoopState, Int64> body)
    

本条重载的 Action 多含有了 index
 参数,那样你在出口的时候就能动用那么些值来维系原来的队列顺序。请看上面包车型客车例证:

public static double [] PairwiseMultiply(double[] v1, double[] v2)
{
    var length = Math.Min(v1.Length, v2.Lenth);
    double[] result = new double[length];
    Parallel.ForEach(v1, (element, loopstate, elementIndex) =>
        result[elementIndex] = element * v2[elementIndex]);
    return result;
}

你恐怕早已意识到此处有个确定的主题素材:大家运用了固定长度的数组。如若传入的是
IEnumerable 那么您有6个缓解方案:

(一) 调用 IEnumerable.Count()
来获取数据长度,然后用这一个值实例化3个恒定长度的数组,然后采纳上例的代码。

(二) The second option would be to materialize the original collection
before using it; in the event that your input data set is prohibitively
large, neither of the first two options will be
feasible.(没看懂贴原来的书文)

(3)
第三种艺术是采纳再次回到三个哈希集合的点子,那种方式下1般需求至少二倍于传播数据的内部存款和储蓄器,所以拍卖大数目时请慎用。

(4) 自身达成排序算法(保障传入数据与传播数据通过排序后次序一致)

  1. 相比之下 PLINQ 的 AsOrdered
    方法这么简单,而且该措施能处理流式的多少,从而允许传入数据是延迟达成的(lazy materialized)

简介

当须求为多核机器进行优化的时候,最棒先反省下您的程序是还是不是有处理能够分割开来张开并行处理。(例如,有多个巨大的数据集合,当中的要素必要二个贰个进行互动独立的耗费时间划算)。

.net framework 4 中提供了 Parallel.ForEach 和 PLINQ
来扶持我们实行并行处理,本文研商那两者的出入及适用的气象。

情况叁:流数据 之 并行处理(使用 PLINQ)

PLINQ 能输出流数据,那么些个性在转手场所特别实惠:

壹.
结出集不需假如1个完全的处理达成的数组,即:任何时间点下内部存款和储蓄器中仅维持数组中的部分音信

  1. 您能够在三个单线程上遍历输出结果(就像他们早已存在/处理完了)

示例:

public static void AnalyzeStocks(IEnumerable<Stock> Stocks)
{
    var StockRiskPortfolio =
        Stocks
        .AsParallel()
        .AsOrdered()
        .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})
        .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));

    foreach (var stockRisk in StockRiskPortfolio)
    {
        SomeStockComputation(stockRisk.Risk);
        // StockRiskPortfolio will be a stream of results
    }
}

那里运用二个单线程的 foreach 来对 PLINQ 的出口进行一连处理,平日意况下
foreach 不须要等待 PLINQ 处理完全部数据就能开首运转。

PLINQ 也同意钦点输出缓存的办法,具体可参考 PLINQ 的 WithMergeOptions
方法,及 ParallelMergeOptions 枚举

Parallel.ForEach

Parallel.ForEach 是 foreach
的拾二线程达成,他们都能对 IEnumerable<T>
类型对象开始展览遍历,Parallel.ForEach
的不一样平常之处在于它选用十2线程来实行循环体内的代码段。

Parallel.ForEach 最常用的花样如下:

public static ParallelLoopResult ForEach<TSource>(  IEnumerable<TSource> source,        Action<TSource> body)   

境况4:处理七个汇聚(使用 PLINQ)

PLINQ 的 Zip
方法提供了同时遍历多少个集合并开始展览整合元算的艺术,并且它能够与其他查询处理操作结合,完成相当复杂的效力。

示例:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    return
        a
        .AsParallel()
        .AsOrdered()
        .Select(element => ExpensiveComputation(element))
        .Zip(
            b
            .AsParallel()
            .AsOrdered()
            .Select(element => DifferentExpensiveComputation(element)),
            (a_element, b_element) => Combine(a_element,b_element));
}

演示中的七个数据源能够并行处理,当双方都有一个可用成分时提供给 Zip
举行延续处理(Combine)。

Parallel.ForEach 也能实现类似的 Zip 处理:

public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)
{
    var numElements = Math.Min(a.Count(), b.Count());
    var result = new T[numElements];
    Parallel.ForEach(a,
        (element, loopstate, index) =>
        {
            var a_element = ExpensiveComputation(element);
            var b_element = DifferentExpensiveComputation(b.ElementAt(index));
            result[index] = Combine(a_element, b_element);
        });
    return result;
}

理所当然使用 Parallel.ForEach
后您就得本身明确是或不是要保持原来种类,并且要专注数组越界访问的主题素材。

PLINQ

PLINQ 也是一种对数据开始展览并行处理的编制程序模型,它通过 LINQ 的语法来促成类似
Parallel.ForEach 的10二线程并行处理。

场景5:线程局地变量

Parallel.ForEach 提供了1个线程局地变量的重载,定义如下:

public static ParallelLoopResult ForEach<TSource, TLocal>(
    IEnumerable<TSource> source,
    Func<TLocal> localInit,
    Func<TSource, ParallelLoopState, TLocal,TLocal> body,
    Action<TLocal> localFinally)

应用的言传身教:

public static List<R> Filtering<T,R>(IEnumerable<T> source)
{
    var results = new List<R>();
    using (SemaphoreSlim sem = new SemaphoreSlim(1))
    {
        Parallel.ForEach(source,
            () => new List<R>(),
            (element, loopstate, localStorage) =>
            {
                bool filter = filterFunction(element);
                if (filter)
                    localStorage.Add(element);
                return localStorage;
            },
            (finalStorage) =>
            {
                lock(myLock)
                {
                    results.AddRange(finalStorage)
                };
            });
    }
    return results;
}

线程局地变量有啥样优势呢?请看上边包车型地铁例子(二个网页抓取程序):

public static void UnsafeDownloadUrls ()
{
    WebClient webclient = new WebClient();
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

万般第壹版代码是如此写的,可是运维时会报错“System.NotSupportedException
-> WebClient does not support concurrent I/O
operations.”。那是因为八个线程不能同时做客同2个 WebClient
对象。所以大家会把 WebClient 对象定义到线程中来:

public static void BAD_DownloadUrls ()
{
    Parallel.ForEach(urls,
        (url,loopstate,index) =>
        {
            WebClient webclient = new WebClient();
            webclient.DownloadFile(url, filenames[index] + ".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
        });
}

修改之后还是有标题,因为你的机器不是服务器,大量实例化的 WebClient
急忙达到你机器允许的虚拟连接上限数。线程局地变量能够缓解这些难点:

public static void downloadUrlsSafe()
{
    Parallel.ForEach(urls,
        () => new WebClient(),
        (url, loopstate, index, webclient) =>
        {
            webclient.DownloadFile(url, filenames[index]+".dat");
            Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            return webclient;
        },
            (webclient) => { });
}

如此的写法保险了我们能收获丰富的 WebClient 实例,同时那么些 WebClient
实例互相隔绝仅仅属于个别关联的线程。

虽说 PLINQ 提供了 ThreadLocal<T> 对象来兑现类似的功用:

public static void downloadUrl()
{
    var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());
    var res =
        urls
        .AsParallel()
        .ForAll(
            url =>
            {
                webclient.Value.DownloadFile(url, host[url] +".dat"));
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);
            });
}

可是请小心:ThreadLocal<T> 相对来说开销越来越大!

场馆一:轻巧数据 之 独立操作的并行处理(使用 Parallel.ForEach)

演示代码:

    public static void IndependentAction(IEnumerable<T> source, Action<T> action)  
    {  
        Parallel.ForEach(source, element => action(element));  
    }  

 理由:

  1. 就算 PLINQ 也提供了贰个类似的 ForAll
    接口,但它对于简易的独自操作太重量化了。
  2. 利用 Parallel.ForEach 你还能够够设定
    ParallelOptions.马克斯DegreeOfParalelism
    参数(内定最多供给有些个线程),那样当 ThreadPool
    财富不足(甚至当可用线程数<马克斯DegreeOfParalelism)的时候, Parallel.ForEach
    依旧能够顺遂运营,并且当后续有越多可用线程出现时,Parallel.ForEach
    也能登时地使用这么些线程。PLINQ 只可以通过WithDegreeOfParallelism
    方法来须要一定的线程数,即:要求了多少个正是几个,不会多也不会少。

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载注解如下,个中蕴含一个 ParallelLoopState 对象:

public static ParallelLoopResult ForEach<TSource >(
    IEnumerable<TSource> source,
    Action<TSource, ParallelLoopState> body)

ParallelLoopState.Stop()
提供了退出循环的主意,那种艺术要比其它二种方法更加快。那么些法子布告循环不要再起步实行新的迭代,并尽量快的出产循环。

ParallelLoopState.IsStopped 属性可用来决断别的迭代是还是不是调用了 Stop 方法。

示例:

public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var matchFound = false;
    Parallel.ForEach(TSpace,
        (curValue, loopstate) =>
            {
                if (curValue.Equals(match) )
                {
                    matchFound = true;
                    loopstate.Stop();
                }
            });
    return matchFound;
}

ParallelLoopState.Break() 公告循环继续实践本元素前的迭代,但不实践本成分之后的迭代。最前调用
Break 的起效用,并被记录到 ParallelLoopState.LowestBreakIteration
属性中。这种处理情势平日被应用在八个静止的搜求处理中,比如你有多个排序过的数组,你想在个中查找相称元素的蝇头
index,那么可以利用以下的代码:

public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>
{
    var loopResult = Parallel.ForEach(source,
        (curValue, loopState, curIndex) =>
        {
            if (curValue.Equals(match))
            {
                loopState.Break();
            }
         });
    var matchedIndex = loopResult.LowestBreakIteration;
    return matchedIndex.HasValue ? matchedIndex : -1;
}

此情此景二:顺序数据 之 并行处理(使用 PLINQ 来保持数据顺序)

当输出的数额类别须求保持原来的逐近日利用 PLINQ 的 AsOrdered
方法非凡轻松高效。

演示代码:

    public static void GrayscaleTransformation(IEnumerable<Frame> Movie)  
    {  
        var ProcessedMovie =  
            Movie  
            .AsParallel()  
            .AsOrdered()  
            .Select(frame => ConvertToGrayscale(frame));  

        foreach (var grayscaleFrame in ProcessedMovie)  
        {  
            // Movie frames will be evaluated lazily  
        }  
    }  

 理由:

  1. Parallel.ForEach
    实现起来供给绕壹些弯路,首先你供给运用以下的重载在艺术:

     public static ParallelLoopResult ForEach<TSource >(  
         IEnumerable<TSource> source,  
         Action<TSource, ParallelLoopState, Int64> body)  
    

 那些重载的 Action 多带有了 index
 参数,那样你在输出的时候就能选择那么些值来维持原来的队列顺序。请看上边包车型地铁例子:

    public static double [] PairwiseMultiply(double[] v1, double[] v2)  
    {  
        var length = Math.Min(v1.Length, v2.Lenth);  
        double[] result = new double[length];  
        Parallel.ForEach(v1, (element, loopstate, elementIndex) =>  
            result[elementIndex] = element * v2[elementIndex]);  
        return result;  
    }  

 
你大概早已意识到此处有个醒目标主题材料:大家运用了固定长度的数组。借使传入的是
IEnumerable 那么您有多少个缓解方案:

(1) 调用 IEnumerable.Count()
来获取数据长度,然后用那些值实例化3个稳固长度的数组,然后选用上例的代码。

(二) The second option would be to materialize the original collection
before using it; in the event that your input data set is prohibitively
large, neither of the first two options will be
feasible.(没看懂贴原版的书文)

(三)
第三种格局是运用重返一个哈希集合的办法,那种措施下一般需求至少2倍于传播数据的内部存款和储蓄器,所以拍卖大数据时请慎用。

(肆)
自个儿完结排序www.888000ff.com,算法(保障传入数据与传播数据经过排序后次序1致)

  1. 对待 PLINQ 的 AsOrdered
    方法这么归纳,而且该办法能处理流式的多少,从而允许传入数据是延迟贯彻的(lazy materialized)

气象三:流数据 之 并行处理(使用 PLINQ)

PLINQ 能输出流数据,这几个特点在转手场合尤其实用:

1.
结出集不需借使1个整机的处理完成的数组,即:任何时刻点下内部存款和储蓄器中仅维持数组中的部分音讯

  1. 您可见在1个单线程上遍历输出结果(就类似他们早已存在/处理完了)

示例:

    public static void AnalyzeStocks(IEnumerable<Stock> Stocks)  
    {  
        var StockRiskPortfolio =  
            Stocks  
            .AsParallel()  
            .AsOrdered()  
            .Select(stock => new { Stock = stock, Risk = ComputeRisk(stock)})  
            .Where(stockRisk => ExpensiveRiskAnalysis(stockRisk.Risk));  

        foreach (var stockRisk in StockRiskPortfolio)  
        {  
            SomeStockComputation(stockRisk.Risk);  
            // StockRiskPortfolio will be a stream of results  
        }  
    }  

 

那边运用3个单线程的 foreach 来对 PLINQ 的出口举办三番五次处理,平日境况下
foreach 不供给拭目以俟 PLINQ 处理完全部数据就能初叶运行。

PLINQ 也同意钦点输出缓存的格局,具体可参考 PLINQ 的 WithMergeOptions
方法,及 ParallelMergeOptions 枚举

现象四:处理五个聚众(使用 PLINQ)

PLINQ 的 Zip
方法提供了同时遍历四个集合并开始展览组合元算的艺术,并且它能够与其他查询处理操作结合,完结非凡复杂的功能。

示例:

    public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)  
    {  
        return  
            a  
            .AsParallel()  
            .AsOrdered()  
            .Select(element => ExpensiveComputation(element))  
            .Zip(  
                b  
                .AsParallel()  
                .AsOrdered()  
                .Select(element => DifferentExpensiveComputation(element)),  
                (a_element, b_element) => Combine(a_element,b_element));  
    }  

 示例中的八个数据源能够并行处理,当双方都有二个可用成分时提须要 Zip
进行持续处理(Combine)。

Parallel.ForEach 也能促成类似的 Zip 处理:

    public static IEnumerable<T> Zipping<T>(IEnumerable<T> a, IEnumerable<T> b)  
    {  
        var numElements = Math.Min(a.Count(), b.Count());  
        var result = new T[numElements];  
        Parallel.ForEach(a,  
            (element, loopstate, index) =>  
            {  
                var a_element = ExpensiveComputation(element);  
                var b_element = DifferentExpensiveComputation(b.ElementAt(index));  
                result[index] = Combine(a_element, b_element);  
            });  
        return result;  
    }  

 当然使用 Parallel.ForEach
后您就得和谐肯定是或不是要保险原来种类,并且要留意数组越界访问的标题。

场景伍:线程局地变量

Parallel.ForEach 提供了三个线程局地变量的重载,定义如下:

    public static ParallelLoopResult ForEach<TSource, TLocal>(  
        IEnumerable<TSource> source,  
        Func<TLocal> localInit,  
        Func<TSource, ParallelLoopState, TLocal,TLocal> body,  
        Action<TLocal> localFinally)  

 使用的演示:

    public static List<R> Filtering<T,R>(IEnumerable<T> source)  
    {  
        var results = new List<R>();  
        using (SemaphoreSlim sem = new SemaphoreSlim(1))  
        {  
            Parallel.ForEach(source,  
                () => new List<R>(),  
                (element, loopstate, localStorage) =>  
                {  
                    bool filter = filterFunction(element);  
                    if (filter)  
                        localStorage.Add(element);  
                    return localStorage;  
                },  
                (finalStorage) =>  
                {  
                    lock(myLock)  
                    {  
                        results.AddRange(finalStorage)  
                    };  
                });  
        }  
        return results;  
    }  

 线程局地变量有怎样优势呢?请看上面包车型客车例证(1个网页抓取程序):

    public static void UnsafeDownloadUrls ()  
    {  
        WebClient webclient = new WebClient();  
        Parallel.ForEach(urls,  
            (url,loopstate,index) =>  
            {  
                webclient.DownloadFile(url, filenames[index] + ".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
            });  
    }  

 平常第壹版代码是这般写的,然则运营时会报错“System.NotSupportedException
-> WebClient does not support concurrent I/O
operations.”。这是因为四个线程一点都不大概同时做客同2个 WebClient
对象。所以我们会把 WebClient 对象定义到线程中来:

    public static void BAD_DownloadUrls ()  
    {  
        Parallel.ForEach(urls,  
            (url,loopstate,index) =>  
            {  
                WebClient webclient = new WebClient();  
                webclient.DownloadFile(url, filenames[index] + ".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
            });  
    }  

 修改之后照旧有标题,因为你的机器不是服务器,大量实例化的 WebClient
急忙到达你机器允许的杜撰连接上限数。线程局地变量能够消除那些主题材料:

    public static void downloadUrlsSafe()  
    {  
        Parallel.ForEach(urls,  
            () => new WebClient(),  
            (url, loopstate, index, webclient) =>  
            {  
                webclient.DownloadFile(url, filenames[index]+".dat");  
                Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
                return webclient;  
            },  
                (webclient) => { });  
    }  

 那样的写法保险了我们能收获丰富的 WebClient 实例,同时那几个 WebClient
实例互相隔开仅仅属于个别关联的线程。

就算 PLINQ 提供了 ThreadLocal<T> 对象来贯彻类似的功效:

    public static void downloadUrl()  
    {  
        var webclient = new ThreadLocal<WebClient>(()=> new WebClient ());  
        var res =  
            urls  
            .AsParallel()  
            .ForAll(  
                url =>  
                {  
                    webclient.Value.DownloadFile(url, host[url] +".dat"));  
                    Console.WriteLine("{0}:{1}", Thread.CurrentThread.ManagedThreadId, url);  
                });  
    }  

 然而请留意:ThreadLocal<T> 相对来讲费用更加大!

场景五:退出操作 (使用 Parallel.ForEach)

Parallel.ForEach 有个重载申明如下,个中带有二个 ParallelLoopState 对象:

 

    public static ParallelLoopResult ForEach<TSource >(  
        IEnumerable<TSource> source,  
        Action<TSource, ParallelLoopState> body)  

ParallelLoopState.Stop()
提供了脱离循环的章程,那种办法要比此外三种格局越来越快。那几个点子布告循环不要再开发银行实行新的迭代,并尽量快的推出循环。

ParallelLoopState.IsStopped 属性可用来剖断其余迭代是还是不是调用了 Stop
方法。

示例:

    public static boolean FindAny<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>  
    {  
        var matchFound = false;  
        Parallel.ForEach(TSpace,  
            (curValue, loopstate) =>  
                {  
                    if (curValue.Equals(match) )  
                    {  
                        matchFound = true;  
                        loopstate.Stop();  
                    }  
                });  
        return matchFound;  
    }  

 ParallelLoopState.Break() 布告循环继续实施本成分前的迭代,但不施行本成分之后的迭代。最前调用
Break 的起成效,并被记录到 ParallelLoopState.LowestBreakIteration
属性中。那种处理方式常常被运用在贰个稳步的检索处理中,比如您有二个排序过的数组,你想在其间查找相称成分的纤维
index,那么能够选择以下的代码:

    public static int FindLowestIndex<T,T>(IEnumerable<T> TSpace, T match) where T: IEqualityComparer<T>  
    {  
        var loopResult = Parallel.ForEach(source,  
            (curValue, loopState, curIndex) =>  
            {  
                if (curValue.Equals(match))  
                {  
                    loopState.Break();  
                }  
             });  
        var matchedIndex = loopResult.LowestBreakIteration;  
        return matchedIndex.HasValue ? matchedIndex : -1;  
    }  

 尽管 PLINQ 也提供了脱离的体制(cancellation
token
),但相对来讲退出的时机并从未
Parallel.ForEach 那么及时。

 

相关文章