More Effective C# 32.使用 Task 物件合成非同步工作 More Effective C# 32.使用 Task 物件合成非同步工作(Compose Asynchronous Work Using Task Objects)

這個做法講解了 Task 背後執行的原理還有介紹一些常用的方法。

在前幾篇做法提到的 async/await 其實就是 Task 的語法糖,Task 可以理解成抽象化的 Thread,一般來說現在都只要使用 Task 就好並不需要 用到更底層的 Thread,並且 Task 內部會自行去 ThreadPool 取得執行緒,使用 Thread 就真的只會創建必須要用到 ThreadPool.QueueUserWorkItem 才能有 ThreadPool 的功能。

首先我們看看這個非同步方法,雖然可以正常運作但是沒有必要在每次 foreach 都等待 await ReadSymbol 執行結果。

public static async Task<IEnumerable<StockResult>> ReadStockTicker(IEnumerable<string> symbols)
{
	var results = new List<StockResult>();
	foreach (var symbol in symbols)
	{
		var result = await ReadSymbol(symbol);
		results.Add(result);
	}
	return results;
}

可以修改成以下模式,透過 List 一次運行所有的任務,最後在使用 Task.WhenAll 等待所有任務運行完成,這種方式可以更快達成任務。

public static async Task<IEnumerable<StockResult>> ReadStockTicker(IEnumerable<string> symbols)
{
   var resultTasks = new List<Task<StockResult>>();
   foreach (var symbol in symbols)
   {
       resultTasks.Add(ReadSymbol(symbol));
   }
   var results = await Task.WhenAll(resultTasks);
   return results.OrderBy(s => s.Price);
}

有時候你可能需要發出多個相同任務並只要留下最快完成的那筆任務的結果就可以用到 Task.WhenAny, 像是下面這段讀取 Stock 的程式碼,可以從多個來源中讀取相同一個股票的價格,並只等待最快的那筆結果。

public static async Task<StockResult> ReadStockTicker(string symbol, IEnumerable<string> sources)
{
   var resultTasks = new List<Task<StockResult>>();
   foreach (var source in sources)
   {
       resultTasks.Add(ReadSymbol(symbol, source));
   }
   return await Task.WhenAny(resultTasks);
}

如果想要在每個任務完成後讀取它們的結果來進行後續操作,你可能會這樣寫。

public static async Task<IEnumerable<StockResult>> ReadStockTicker(IEnumerable<string> symbols)
{
	var resultTasks = new List<Task<StockResult>>();
	var results = new List<StockResult>();
	foreach (var symbol in symbols)
	{
		resultTasks.Add(ReadSymbol(symbol));
	}
	foreach (var task in resultTasks)
	{
		var result = await task;
		results.Add(result);
	}
	return results;
}

但是這種寫法非常沒有效率,因為任務完成的順序跟你 foreach 結果的順序並不相同,所以可能有任務清單所有完成的任務都在等待其中一個任務完成的情況發生, 這時候就可以搭配 Task.WhenAnywhile 來改善這個問題,但這種寫法代表每次迴圈都會建立一個 Task 所以並不是最好的解法。

public static async Task<IEnumerable<StockResult>> ReadStockTicker(IEnumerable<string> symbols)
{
	var resultTasks = new List<Task<StockResult>>();
	var results = new List<StockResult>();
	foreach (var symbol in symbols)
	{
		resultTasks.Add(ReadSymbol(symbol));
	}
	while (resultTasks.Any())
	{
		Task<StockResult> finishedTask = await Task.WhenAny(resultTasks); // 建立新的 Task
		var result = await finishedTask;
		resultTasks.Remove(finishedTask);
		results.Add(result);
	}
	var first = await Task.WhenAny(resultTasks);
	return await first;
}

下面這個解決方案使用了 TaskCompletionSource 類別,原理是使用 Task.ContinueWith 確保傳入的 sourceTasks 執行完成後繼續執行其他操作,也就是下面 continuation 這個 Action,他的目的是將完成任務的結果按照 thread-safe 的順序 放入到 completionSources 裡面。

簡單來說就是 foreach sourceTasks 的時候會幫所有任務都綁定 continuation 這個 Action,來確保任務執行完會把結果放到 completionSources 裡面,並且使用 Interlocked.Increment 保證修改的時候的線程安全。

public static Task<T>[] OrderByCompletion<T>(this IEnumerable<Task<T>> tasks)
{
	var sourceTasks = tasks.ToList();
	var completionSources = new TaskCompletionSource<T>[sourceTasks.Count];
	var outputTasks = new Task<T>[completionSources.Length];
	for (int i = 0; i < completionSources.Length; i++)
	{
		completionSources[i] = new TaskCompletionSource<T>();
		outputTasks[i] = completionSources[i].Task;
	}

	int nextTaskIndex = -1;
	Action<Task<T>> continuation = completed =>
	{
		var bucket = completionSources[Interlocked.Increment(ref nextTaskIndex)];
		if (completed.IsCompleted)
			bucket.TrySetResult(completed.Result);
		else if (completed.IsFaulted)
			bucket.TrySetException(completed.Exception);
	};

	foreach (var inputTask in sourceTasks)
	{
		inputTask.ContinueWith(continuation,
			CancellationToken.None,
			TaskContinuationOptions.ExecuteSynchronously,
			TaskScheduler.Default
		);
	}
	return outputTasks;
}

使用上只需要傳入一個任務清單,最後就會按照任務完成的時間輸出排序過後的結果。

async Task Main()
{
	var results = new List<int>();
	var t = CreateTaskAsync(3);
	var t1 = CreateTaskAsync(2);
	var t2 = CreateTaskAsync(4);
	var t3 = CreateTaskAsync(1);

	var tl = new List<Task<int>>() {t, t1, t2 ,t3};

	foreach (var task in OrderByCompletion<int>(tl))
	{
		var result = await task;
		results.Add(result);
	}

	Console.WriteLine(results);
}

public async Task<int> CreateTaskAsync(int id)
{
	await Task.Delay(TimeSpan.FromSeconds(id));
	return id;
}

Summary

這個做法建議現在只使用 Task 因為它提供了多種方便的 API 讓我們管理非同步的任務,同時介紹了 Task.WhenAllTask.WhenAnyTask.ContinueWith 這些常用方法,並介紹 TaskCompletionSource 類別通常用在事件與委派可以讓呼叫端知道任務已經完成。