這個做法在說明將方法轉成平行化內部發生例外錯誤後該如何處理。
在平行作業執行的過程中,如果一個子執行緒發生錯誤,那麼該執行緒可能會被直接中止,並且由於 Exceptions 沒辦法跨執行緒所以這個 Exceptions 也沒有辦法拋出到主執行緒的 AggregateException 當中,也就代表呼叫的執行緒沒辦法做出相對應的處理。
另外一個可能性是使用超過一個平行運算,結果會導致多個 AggregateExceptions 嵌套在一起增加處理難度,例如下面就需要透過 一個 AggregateException 的 InnerExceptions 遞迴執行直到回傳出真正的錯誤。
try
{
urls.RunAsync(
url => startDownload(url),
task => finishDownload(
task.AsyncState.ToString(), task.Result)
);
}
catch (AggregateException problems)
{
ReportAggregateError(problems);
}
private static void ReportAggregateError(AggregateException aggregate)
{
foreach (var exception in aggregate.InnerExceptions)
if (exception is AggregateException agEx)
ReportAggregateError(agEx);
else
Console.WriteLine(exception.Message);
}
但上面並不是非常好,我們可以寫出另一個方法只對關注的 Exception 進行處理就好,下面這個方法就是透過 Dictionary
將
想要處理的 Exception 類型綁定一個 Action,確保特定的 Exception 能夠進行特殊處理。
並且注意到 throw
是拋出原始的 AggregateException 避免失去錯誤資訊。
try
{
urls.RunAsync(
url => startDownload(url),
task => finishDownload(task.AsyncState.ToString(),
task.Result));
}
catch (AggregateException problems)
{
var handlers = new Dictionary<Type, Action<Exception>>();
handlers.Add(typeof(WebException),
ex => Console.WriteLine(ex.Message));
if (!HandleAggregateError(problems, handlers))
throw;
}
private static bool HandleAggregateError(
AggregateException aggregate,
Dictionary<Type, Action<Exception>> exceptionHandlers)
{
foreach (var exception in aggregate.InnerExceptions)
{
if (exception is AggregateException agEx)
{
if (!HandleAggregateError(agEx, exceptionHandlers))
{
return false;
}
else
{
continue;
}
}
else if (exceptionHandlers.ContainsKey(exception.GetType()))
{
exceptionHandlers[exception.GetType()](exception);
}
else
return false;
}
return true;
}
另外也可以使用之前學過的 TaskCompletionSource
提供的 TrySetException
和 TrySetResult
,將某些錯誤進行特定處理。
下面這個寫法修改了原有的 startDownload
方法,將 WebException 特別挑出來視為正確的流程確保它不會中止程式的運行。
private static Task<byte[]> startDownload(string url)
{
var tcs = new TaskCompletionSource<byte[]>(url);
var wc = new WebClient();
wc.DownloadDataCompleted += (sender, e) =>
{
if (e.Error is WebException)
tcs.TrySetResult(new byte[0]);
else if (e.Error != null)
tcs.TrySetException(e.Error);
else
tcs.TrySetResult(e.Result);
};
wc.DownloadDataAsync(new Uri(url), tcs);
return tcs.Task;
}
在 PLINQ 中大不分的工作都會分散到背景的子執行緒身上,所以處理上與傳統的 LINQ 不同,建議是使用 try/catch 將 PLINQ 語法包起來, 並且要注意 catch 的只能用 AggregateException,而不是某些特定的錯誤。
try
{
var results = data.AsParallel()
.Where(x => x < 100)
.Select(x => Compute(x));
foreach (var result in results)
{
Console.WriteLine(result);
}
}
catch (AggregateException ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Summary
這個做法講解在撰寫平行運算方法的時候要考慮到幾個關於錯誤處理的設計,首先要將錯誤分類成可允許的錯誤與重大錯誤,可以把可允許的錯誤挑出來 避免影響應用程式運行,也可以搭配 Dictionary 與 Action 將特定的錯誤挑出來處理,還有要注意執行 PLINQ 程式最好是用 try/catch 將語法包裝起來, 還有只能捕捉 AggregateException。