Parallel.For
使用Parallel.For来完成并行循环的执行。
Parallel.For(0, 10, index => {
Console.WriteLine("Task ID {0}processing index: {1}",
Task.CurrentId, index);
});
Parallel.Foreach的使用
List<string> dataList = new List<string> {
"a", "bc", "cda","ddqf", "eqw", "af"
};
// process the elements of thecollection
// using a parallel foreach loop
Parallel.ForEach(dataList, item =>{
Console.WriteLine("Item {0} has{1} characters",
item, item.Length);
});
ParallelOptions
可以设置MaxDegreeOfParallelism:最大并发Task数。用法:
ParallelOptions options
= new ParallelOptions() { MaxDegreeOfParallelism= 1 };
Parallel.For(0, 10, options, index=> {
Console.WriteLine("For Index {0}started", index);
Thread.Sleep(500);
Console.WriteLine("For Index {0}finished", index);
});
终止并行循环
Parallel.For(0, 10, (int index,ParallelLoopState loopState) => {
if (index %2 == 0) {
loopState.Stop();
}
});
还可以使用loopState.Break()。区别在于,使用Break()终止的话,可以在外部获取LowestBreakIteration.Value和LowestBreakIteration.HasValue的值来判断循环终止时的值。
还可以将CancellationToken传入ParallelOption中:
ParallelOptions loopOptions =
new ParallelOptions() {
CancellationToken = tokenSource.Token
};
try {
Parallel.For(0, Int64.MaxValue,loopOptions, index => {
// do something just to occupy the cpufor a little
double result = Math.Pow(index, 3);
// write out the current index
Console.WriteLine("Index {0},result {1}", index, result);
// put the thread to sleep, just toslow things down
Thread.Sleep(100);
});
} catch (OperationCanceledException) {
Console.WriteLine("Caughtcancellation exception...");
}
在外部就可以捕捉OperationCanceledException这个异常。
在并行循环中使用TLS(Thread Local Storage)
int total = 0;
Parallel.For(
0,
101,
() => 0,
(int index, ParallelLoopStateloopState, int tlsValue) => {
tlsValue += index;
return tlsValue;
},
value => Interlocked.Add(ref total,value));
Console.WriteLine("Total:{0}", total);
// wait for input before exiting
Console.WriteLine("Press enter tofinish");
Console.ReadLine();
在以上代码中,并行实现从1加到100的过程。
使用PLINQ
int[] sourceData = new int[100];
for (int i = 0; i <sourceData.Length; i++) {
sourceData[i] = i;
}
IEnumerable<int> results =
from item in sourceData.AsParallel()
where item % 2 == 0
select item;
foreach (int item in results) {
Console.WriteLine("Item{0}", item);
}
并行LINQ的关键方法:AsParallel()。由于没有执行ToList()、ToArray()和ToDictionary(),因此在循环拿Result时才会执行这个PLINQ。
其他选项
如果需要并行执行后结果是排序好的可以使用:sourceData.AsParallel().AsOrdered()。
如果要强制并发执行可以使用:
sourceData.AsParallel() .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
如果要限制并发量:
sourceData.AsParallel() .WithDegreeOfParallelism(2)
处理异常和Task是一样的,在外部捕捉AggregateException异常,不过要注意LINQ延迟执行的特点,要在foreach这里捕捉异常:
try {
foreach (double d in results) {
Console.WriteLine("Result{0}", d);
}
} catch (AggregateException aggException) {
aggException.Handle(exception => {
Console.WriteLine("Handledexception of type: {0}",
exception.GetType());
return true;
});
}
向PLINQ传入CancellationToken
void Main()
{
CancellationTokenSource tokenSource
= new CancellationTokenSource();
int[] sourceData = new int[1000];
for(int i = 0;i < sourceData.Length; i++) {
sourceData[i]= i;
}
//define a query that supports cancellation
IEnumerable<double> results = sourceData
.AsParallel()
.WithCancellation(tokenSource.Token)
.Select(item=> {
return Math.Pow(item, 2);
Thread.Sleep(200);
});
Task.Factory.StartNew(()=> {
Thread.Sleep(2000);
tokenSource.Cancel();
Console.WriteLine("Token source cancelled");
});
try{
//enumerate the query results
foreach(double d in results) {
Console.WriteLine("Result:{0}", d);
}
} catch(OperationCanceledException) {
Console.WriteLine("Caught cancellation exception");
}
}