More Effective C# 35.學習 PLINQ 如何實作平行演算法 More Effective C# 35.學習 PLINQ 如何實作平行演算法(Learn How PLINQ Implements Parallel Algorithms)

這個做法講解 PLINQ 是怎麼做到平行運算的,以及與 LINQ 相比之下多做了什麼事情。

要將一般的 LINQ 語法轉換成 PLINQ 非常簡單,只要加上 AsParallel() 就好。

void Main()
{
	var source = Enumerable.Range(1, 100000);
	var result = source.AsParallel().Where(x => x % 2 == 0);
	var count = result.Count();
}

但要了解並不是轉換成平行處理的版本就能獲得更好的性能,首先要了解 PLINQ 操作的是 ParallelQuery 類別而不是一般的 Enumerable,它只能處理 LINQ to Objects 這種針對記憶體的操作,沒有辦法平行化處理 LINQ to SQL 或 EntityFramework。

Enumerable 類別的擴充方法類似,在 PLINQ 中則是透過 ParallelEnumerable 類別提供許多常用的擴充方法,使用上基本與 LINQ 相同, 所以可以直接把 LINQ 的知識套用在 PLINQ 上。

當我們呼叫 AsParallel() 方法時背後會透過 ParallelEnumerableWrapper 把傳入的 IEnumerable 包裝成 ParallelQuery 物件, 這樣就可以直接使用 ParallelEnumerable 類別裡面的擴充方法了,下面是包裝的程式碼。

// ParallelEnumerable.cs
public static ParallelQuery AsParallel(this IEnumerable source)
{
    ArgumentNullException.ThrowIfNull(source);

    return new ParallelEnumerableWrapper(source);
}

// ParallelEnumerableWrapper.cs
internal sealed class ParallelEnumerableWrapper : ParallelQuery<object?>
{
    private readonly IEnumerable _source; // The wrapped enumerable object.

    //-----------------------------------------------------------------------------------
    // Instantiates a new wrapper object.
    //

    internal ParallelEnumerableWrapper(Collections.IEnumerable source)
        : base(QuerySettings.Empty)
    {
        Debug.Assert(source != null);
        _source = source;
    }

    internal override IEnumerator GetEnumeratorUntyped()
    {
        return _source.GetEnumerator();
    }

    public override IEnumerator<object?> GetEnumerator()
    {
        return new EnumerableWrapperWeakToStrong(_source).GetEnumerator();
    }
}

要平行處理任務第一個步驟就是將一群任務切割成一小塊才能分配給多個執行緒處理,而這個分割演算法總共有四種,分別是:

  1. Range Partitioning
  2. Chunk Partitioning
  3. Striped Partitioning
  4. Hash Partitioning

Range Partitioning

這個分割演算法比較容易理解,假如有 1000 個任務在一台四核心的機器上運行,那麼就會分割出四個 Range 並且每個 Range 包含 250 個任務, 使用 Range Partitioning 要求傳入的資料源必須是 List<T>arraysIList<T> 這種支援 Index 的集合,通常傳入集合的時候預設就是用這個分割方法。

Chunk Partitioning

這個分割演算法通常用在不固定長度的資料來源上,首先會將系統會將資料來源切割成一個小的 Chunk 並分配給多個執行緒運行,接下來會將 Chunk 逐漸增大, 例如有 50 個任務在一台四核心的機器上運行,那麼可能會將任務切割成下面這樣,這種分割演算法的好處是可以平均分配每個核心的壓力,確保所有任務 都能在差不多的時間內完成。

1 1 1 1 
2 2 2 2 
4 4 4 4 
8 8 6	

Chunk Partitioning

這個分割演算法使用了 SkipWhileTakeWhile 將資料來源進行分組,原理是根據核心的數量分配起始值與跳過的值, 例如有多個任務在一台四核心的機器上運行,那麼可能會將任務切割成下面這樣

核心 1: 0 4 8 12
核心 2: 1 5 9 13
核心 3: 2 6 10 14
核心 4: 3 7 11 15

Hash Partitioning

當你的查詢語法有包含 JoinGroupJoinGroupByDistinctExceptUnionIntersect 才會使用 Hash Partitioning, 它會確保有相同 hash 值的運算能夠被同一個執行緒處理,避免共享的資料跨執行緒溝通。


PLINQ 還額外使用了三種執行模型來處理平行化任務,分別是:

  1. Pipelining
  2. Stop & Go
  3. Inverted Enumeration

Pipelining

Pipelining 是預設使用的,它會建立一個執行緒專門負責枚舉工作(foreach),其他執行緒負責處理查詢工作。

Stop & Go

這個模型需要搭配 ToList()ToArray() 方法使用,通常是用在要求查詢語法立即執行或者是需要做排序與搜尋這類需要提前知道結果的工作, 下面這兩個查詢語法就使用到 Stop & Go,優點是可以獲得更好的性能但會提高記憶體消耗。

var stopAndGoArray = (from n in data.AsParallel()
					  where n < 150
					  select Factorial(n)).ToArray();
var stopAndGoList = (from n in data.AsParallel()
					 where n < 150
					 select Factorial(n)).ToList();

Inverted Enumeration

這個模型需要搭配 ForAll() 方法使用,它的特點是不生成完整結果,但能直接對結果執行操作,所以它基本上是耗的記憶體最少並執行更快的模型。

var nums2 = from n in data.AsParallel()
           where n < 150
           select Factorial(n);
nums2.ForAll(item => Console.WriteLine(item));

接下來看一下使用 AsParallel() 程式會有什麼改變,首先下面這段是一個簡單的 LINQ 範例。

void Main()
{
	var answers = from n in Enumerable.Range(0, 30)
				  where n.SomeTest()
				  select n.SomeProjection();

	var iter = answers.GetEnumerator();
	Console.WriteLine("About to start iterating");
	while (iter.MoveNext())
	{
		Console.WriteLine("called MoveNext");
		Console.WriteLine(iter.Current);
	}
}

public static class Extensions{
	public static bool SomeTest(this int inputValue)
	{
		Console.WriteLine($"testing element: {inputValue}");
		return inputValue % 5 == 0;
	}
	public static string SomeProjection(this int input)
	{
		Console.WriteLine($"projecting an element: {input}");
		return $"Delivered {input} at {DateTime.Now:T}";
	}
}
About to start iterating
testing element: 0
projecting an element: 0
called MoveNext
Delivered 0 at 下午 03:01:32
testing element: 1
testing element: 2
testing element: 3
testing element: 4
testing element: 5
projecting an element: 5
called MoveNext
Delivered 5 at 下午 03:01:32
testing element: 6
testing element: 7
testing element: 8
testing element: 9
testing element: 10
projecting an element: 10
called MoveNext
Delivered 10 at 下午 03:01:32
testing element: 11
testing element: 12
testing element: 13
testing element: 14
testing element: 15
projecting an element: 15
called MoveNext
Delivered 15 at 下午 03:01:32
testing element: 16
testing element: 17
testing element: 18
testing element: 19
testing element: 20
projecting an element: 20
called MoveNext
Delivered 20 at 下午 03:01:32
testing element: 21
testing element: 22
testing element: 23
testing element: 24
testing element: 25
projecting an element: 25
called MoveNext
Delivered 25 at 下午 03:01:32
testing element: 26
testing element: 27
testing element: 28
testing element: 29

下面改成用 ParallelEnumerable 以平行的方式處理,會發現並不是從 0 開始而是隨機的。

var answers = from n in ParallelEnumerable.Range(0, 30)
			  where n.SomeTest()
			  select n.SomeProjection();
About to start iterating
testing element: 6
testing element: 4
testing element: 7
testing element: 5
testing element: 10
testing element: 0
testing element: 2
testing element: 3
testing element: 12
testing element: 13
testing element: 14
testing element: 15
testing element: 8
testing element: 9
testing element: 18
testing element: 19
testing element: 20
testing element: 16
testing element: 17
testing element: 24
testing element: 25
projecting an element: 5
projecting an element: 15
testing element: 26
projecting an element: 10
testing element: 28
projecting an element: 25
testing element: 11
testing element: 22
testing element: 23
testing element: 27
projecting an element: 0
projecting an element: 20
called MoveNext
testing element: 21
Delivered 5 at 下午 03:05:29
testing element: 29
testing element: 1
called MoveNext
Delivered 10 at 下午 03:05:29
called MoveNext
Delivered 15 at 下午 03:05:29
called MoveNext
Delivered 20 at 下午 03:05:29
called MoveNext
Delivered 25 at 下午 03:05:29
called MoveNext
Delivered 0 at 下午 03:05:29

可以使用 WithDegreeOfParallelism 限制執行緒的使用數量。

var answers = (from n in ParallelEnumerable.Range(0, 30).WithDegreeOfParallelism(1)
				   where n.SomeTest()
				   select n.SomeProjection());

使用 AsSequential 可以將 ParallelQuery 轉回 IEnumerable。

var answers = (from n in ParallelEnumerable.Range(0, 30).AsSequential()
			   where n.SomeTest()
			   select n.SomeProjection());

使用 WithExecutionMode 可以強迫 PLINQ 平行化所有查詢,預設情況下 PLINQ 只會平行化有效益的操作。

var answers = (from n in ParallelEnumerable.Range(0, 30).WithExecutionMode(ParallelExecutionMode.ForceParallelism)
				   where n.SomeTest()
				   select n.SomeProjection());

使用 WithMergeOptions 來設定是否要啟動 buffer,目的是要將分散在各個執行緒的結果結合在一起顯示給呼叫者看。

var answers = (from n in ParallelEnumerable.Range(0, 30).WithMergeOptions(ParallelMergeOptions.AutoBuffered)
			   where n.SomeTest()
			   select n.SomeProjection());

Summary

這個做法介紹了 PLINQ 背後的原理以及常用的擴充方法,基本使用上與 LINQ 相同,但是平行化後的查詢是否能提升效能還是要自行測量結果 ,誤用 AsParallel() 可能會對效能造成影響。