下面是使用TPL数据流的非常简化的代码示例:
//This is collection where I register all items that need to be processed by pipeline
//MyData is simple class with 2 properties = int Id, bool IsCompleted
private ConcurrentBag<MyData> completedItems = new ConcurrentBag<MyData>();
//This method may be called multiple times in a very short time frame which means that
//few pipelines may be running simultaneously.
public void InitiateProcess(List<MyData> inputData)
{
inputData.ForEach(ent => completedItems.Add(ent));
StartPipeline(inputData);
}
public void StartPipeline(List<MyData> inputData)
{
//Here goes TransformBlock downloadBlock=...
//Here goes TransfromBlock processBlock=...
//In the resultBlock I would like to update corresponding item in completedItems bag.
var resultBlock = new ActionBlock<MyData>(data =>
{
var completedItem = completedItems.FirstOrDefault(ent => ent.Id == data.Id);
if (completedItem != null)
completedItem.IsCompleted = true;
});
}
我的主要目标是注册已经成功完成的项目。 不是非常复杂的,但是我对并行编程学得越多,我就越明白它有多复杂,使用它应该非常小心。 我理解可能有多个不同的线程试图同时访问completeditems
集合。 我做了一些研究,使用concurrentbag
来跟踪这些项目似乎是一个不错的方法。 所以我的问题是,使用这种方法是否有潜在的危险?
根据Microsoft文档
ConcurrentBag的所有公共成员和受保护成员都是线程安全的,可以从多个线程并发使用。
这意味着concurrentbag
被设计成可由多个线程访问,而不必担心它们会相互干扰
编辑:我刚刚注意到您正在使用firstordefault
,这也是根据文档
但是,通过ConcurrentBag实现的接口之一访问的成员(包括扩展方法)不能保证是线程安全的,并且可能需要由调用方进行同步。
因此,FirstorDefault
可能不是线程安全的,而ConcurrentBag
允许存在重复项。 在您的情况下,切换到concurrentDictionary
或使用带有lock
语句的list
会更好