병렬로 중첩이 대기합니다.
메트로 앱에서는 여러 WCF 호출을 실행해야합니다. 많은 호출이 이루어져야하므로 병렬 루프에서 호출해야합니다. 문제는 WCF 호출이 모두 완료되기 전에 병렬 루프가 종료된다는 것입니다.
예상대로 작동하도록 이것을 리팩토링 하시겠습니까?
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customers = new System.Collections.Concurrent.BlockingCollection<Customer>();
Parallel.ForEach(ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
foreach ( var customer in customers )
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
배후의 전체 아이디어 Parallel.ForEach()
는 스레드 세트가 있고 각 스레드가 콜렉션의 일부를 처리한다는 것입니다. 알다시피, 이것은 비동기 호출 기간 동안 스레드를 해제하려는 async
- 와 함께 작동하지 않습니다 await
.
ForEach()
쓰레드 를 막아서“수정”할 수는 있지만 async
- 의 전체 요점을 무너 뜨 await
립니다.
할 수있는 일은 대신 비동기식을 지원 하는 TPL Dataflow 를 사용 Parallel.ForEach()
하는 Task
것입니다.
특히, TransformBlock
각 ID를 Customer
사용하는 async
람다 로 변환 하는 을 사용하여 코드를 작성할 수 있습니다 . 이 블록은 병렬로 실행되도록 구성 할 수 있습니다. 해당 블록을 콘솔에 ActionBlock
쓰는 블록에 연결합니다 Customer
. 블록 네트워크를 설정 한 후 Post()
각 ID를에 연결할 수 있습니다 TransformBlock
.
코드에서 :
var ids = new List<string> { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var getCustomerBlock = new TransformBlock<string, Customer>(
async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
}, new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var writeCustomerBlock = new ActionBlock<Customer>(c => Console.WriteLine(c.ID));
getCustomerBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
foreach (var id in ids)
getCustomerBlock.Post(id);
getCustomerBlock.Complete();
writeCustomerBlock.Completion.Wait();
비록 당신은 아마도 TransformBlock
약간의 상수로 의 병렬성을 제한하고 싶을 것입니다 . 또한 예를 들어 모음이 너무 큰 경우 등을 TransformBlock
사용하여 용량을 제한하고 항목을 비동기식으로 추가 할 수 있습니다 SendAsync()
.
코드와 비교할 때 추가 이점으로 (작동하는 경우) 단일 항목이 완료되는 즉시 쓰기가 시작되고 모든 처리가 완료 될 때까지 기다리지 않는다는 것입니다.
svick의 답변 은 (평소대로) 훌륭합니다.
그러나 실제로 대량의 데이터를 전송할 때 Dataflow가 더 유용하다는 것을 알았습니다. 또는 async
호환 가능한 대기열 이 필요할 때 .
귀하의 경우 더 간단한 해결책은 async
-style 병렬 처리를 사용하는 것입니다 .
var ids = new List<string>() { "1", "2", "3", "4", "5", "6", "7", "8", "9", "10" };
var customerTasks = ids.Select(i =>
{
ICustomerRepo repo = new CustomerRepo();
return repo.GetCustomer(i);
});
var customers = await Task.WhenAll(customerTasks);
foreach (var customer in customers)
{
Console.WriteLine(customer.ID);
}
Console.ReadKey();
svick이 제안한대로 DataFlow를 사용하는 것은 과도 할 수 있으며 Stephen의 답변은 작업의 동시성을 제어하는 수단을 제공하지 않습니다. 그러나 그것은 간단하게 달성 할 수 있습니다.
public static async Task RunWithMaxDegreeOfConcurrency<T>(
int maxDegreeOfConcurrency, IEnumerable<T> collection, Func<T, Task> taskFactory)
{
var activeTasks = new List<Task>(maxDegreeOfConcurrency);
foreach (var task in collection.Select(taskFactory))
{
activeTasks.Add(task);
if (activeTasks.Count == maxDegreeOfConcurrency)
{
await Task.WhenAny(activeTasks.ToArray());
//observe exceptions here
activeTasks.RemoveAll(t => t.IsCompleted);
}
}
await Task.WhenAll(activeTasks.ToArray()).ContinueWith(t =>
{
//observe exceptions in a manner consistent with the above
});
}
ToArray()
호출은 배열 대신 목록을 사용하여 완료된 작업을 대체하여 최적화 할 수 있습니다,하지만 난 그것을 훨씬 대부분의 경우에서 차이 만들 것이라고 의심한다. OP 질문에 따른 샘플 사용량 :
RunWithMaxDegreeOfConcurrency(10, ids, async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
});
편집 Fellow SO 사용자와 TPL wiz Eli Arbel 은 Stephen Toub 의 관련 기사를 알려주었습니다 . 평소와 같이, 그의 구현은 우아하고 효율적입니다.
public static Task ForEachAsync<T>(
this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current).ContinueWith(t =>
{
//observe exceptions
});
}));
}
질문이 처음 게시되었을 때 4 년 전에 존재하지 않았던 새로운 AsyncEnumerator NuGet 패키지를 사용하면 노력을 절약 할 수 있습니다 . 병렬 처리 수준을 제어 할 수 있습니다.
using System.Collections.Async;
...
await ids.ParallelForEachAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = await repo.GetCustomer(i);
customers.Add(cust);
},
maxDegreeOfParallelism: 10);
면책 조항 : 저는 공개 소스이며 MIT에 따라 라이센스가 부여 된 AsyncEnumerator 라이브러리의 작성자이며 커뮤니티를 돕기 위해이 메시지를 게시하고 있습니다.
랩 Parallel.Foreach
로 Task.Run()
대신의 await
키워드 사용[yourasyncmethod].Result
(UI 스레드를 차단하지 않으려면 Task.Run 작업을 수행해야합니다)
이 같은:
var yourForeachTask = Task.Run(() =>
{
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
var cust = repo.GetCustomer(i).Result;
customers.Add(cust);
});
});
await yourForeachTask;
이것은 전체 TPL Dataflow를 작동시키는 것보다 매우 효율적이고 쉬워야합니다.
var customers = await ids.SelectAsync(async i =>
{
ICustomerRepo repo = new CustomerRepo();
return await repo.GetCustomer(i);
});
...
public static async Task<IList<TResult>> SelectAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector, int maxDegreesOfParallelism = 4)
{
var results = new List<TResult>();
var activeTasks = new HashSet<Task<TResult>>();
foreach (var item in source)
{
activeTasks.Add(selector(item));
if (activeTasks.Count >= maxDegreesOfParallelism)
{
var completed = await Task.WhenAny(activeTasks);
activeTasks.Remove(completed);
results.Add(completed.Result);
}
}
results.AddRange(await Task.WhenAll(activeTasks));
return results;
}
나는 파티에 조금 늦었지만 동기화 컨텍스트에서 비동기 코드를 실행하기 위해 GetAwaiter.GetResult () 사용을 고려할 수도 있지만 아래와 같이 병렬화됩니다.
Parallel.ForEach(ids, i =>
{
ICustomerRepo repo = new CustomerRepo();
// Run this in thread which Parallel library occupied.
var cust = repo.GetCustomer(i).GetAwaiter().GetResult();
customers.Add(cust);
});
많은 헬퍼 메소드를 도입 한 후 다음과 같은 간단한 구문으로 병렬 쿼리를 실행할 수 있습니다.
const int DegreeOfParallelism = 10;
IEnumerable<double> result = await Enumerable.Range(0, 1000000)
.Split(DegreeOfParallelism)
.SelectManyAsync(async i => await CalculateAsync(i).ConfigureAwait(false))
.ConfigureAwait(false);
여기서 발생하는 일은 소스 컬렉션을 10 개의 청크로 분할 .Split(DegreeOfParallelism)
한 다음 각 항목을 하나씩 처리하는 10 개의 작업을 실행 .SelectManyAsync(...)
하고 ( ) 단일 목록으로 다시 병합하는 것입니다.
더 간단한 접근 방식이 있다고 말할 가치가 있습니다.
double[] result2 = await Enumerable.Range(0, 1000000)
.Select(async i => await CalculateAsync(i).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
그러나 예방 조치 가 필요합니다 . 너무 큰 소스 컬렉션이있는 Task
경우 모든 항목에 대해 즉시 일정을 예약하면 성능이 크게 저하 될 수 있습니다.
위 예제에서 사용 된 확장 방법은 다음과 같습니다.
public static class CollectionExtensions
{
/// <summary>
/// Splits collection into number of collections of nearly equal size.
/// </summary>
public static IEnumerable<List<T>> Split<T>(this IEnumerable<T> src, int slicesCount)
{
if (slicesCount <= 0) throw new ArgumentOutOfRangeException(nameof(slicesCount));
List<T> source = src.ToList();
var sourceIndex = 0;
for (var targetIndex = 0; targetIndex < slicesCount; targetIndex++)
{
var list = new List<T>();
int itemsLeft = source.Count - targetIndex;
while (slicesCount * list.Count < itemsLeft)
{
list.Add(source[sourceIndex++]);
}
yield return list;
}
}
/// <summary>
/// Takes collection of collections, projects those in parallel and merges results.
/// </summary>
public static async Task<IEnumerable<TResult>> SelectManyAsync<T, TResult>(
this IEnumerable<IEnumerable<T>> source,
Func<T, Task<TResult>> func)
{
List<TResult>[] slices = await source
.Select(async slice => await slice.SelectListAsync(func).ConfigureAwait(false))
.WhenAll()
.ConfigureAwait(false);
return slices.SelectMany(s => s);
}
/// <summary>Runs selector and awaits results.</summary>
public static async Task<List<TResult>> SelectListAsync<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, Task<TResult>> selector)
{
List<TResult> result = new List<TResult>();
foreach (TSource source1 in source)
{
TResult result1 = await selector(source1).ConfigureAwait(false);
result.Add(result1);
}
return result;
}
/// <summary>Wraps tasks with Task.WhenAll.</summary>
public static Task<TResult[]> WhenAll<TResult>(this IEnumerable<Task<TResult>> source)
{
return Task.WhenAll<TResult>(source);
}
}
An extension method for this which makes use of SemaphoreSlim and also allows to set maximum degree of parallelism
/// <summary>
/// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
/// </summary>
/// <typeparam name="T">Type of IEnumerable</typeparam>
/// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
/// <param name="action">an async <see cref="Action" /> to execute</param>
/// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
/// Must be grater than 0</param>
/// <returns>A Task representing an async operation</returns>
/// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
public static async Task ForEachAsyncConcurrent<T>(
this IEnumerable<T> enumerable,
Func<T, Task> action,
int? maxDegreeOfParallelism = null)
{
if (maxDegreeOfParallelism.HasValue)
{
using (var semaphoreSlim = new SemaphoreSlim(
maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
{
var tasksWithThrottler = new List<Task>();
foreach (var item in enumerable)
{
// Increment the number of currently running tasks and wait if they are more than limit.
await semaphoreSlim.WaitAsync();
tasksWithThrottler.Add(Task.Run(async () =>
{
await action(item).ContinueWith(res =>
{
// action is completed, so decrement the number of currently running tasks
semaphoreSlim.Release();
});
}));
}
// Wait for all tasks to complete.
await Task.WhenAll(tasksWithThrottler.ToArray());
}
}
else
{
await Task.WhenAll(enumerable.Select(item => action(item)));
}
}
Sample Usage:
await enumerable.ForEachAsyncConcurrent(
async item =>
{
await SomeAsyncMethod(item);
},
5);
참고URL : https://stackoverflow.com/questions/11564506/nesting-await-in-parallel-foreach
'IT박스' 카테고리의 다른 글
bash 스크립트에서 파일 이름의 확장자를 확인하는 방법은 무엇입니까? (0) | 2020.06.12 |
---|---|
Java에서 현재 타임 스탬프를 문자열 형식으로 얻는 방법은 무엇입니까? (0) | 2020.06.10 |
문자열 변수 보간 자바 (0) | 2020.06.10 |
Laravel 5-View 내 저장소에 업로드 된 이미지에 액세스하는 방법 (0) | 2020.06.10 |
원래 배열을 변경하지 않고 Javascript로 배열을 뒤집습니다. (0) | 2020.06.10 |