I use a BlockingCollection
to implement a producer-consumer pattern in C# 4.0.
The BlockingCollection
holds items which take up quite a lot memory. I would like to let the producer takes one item out of the BlockingCollection at a time, and process it.
I was thinking that by using foreach on BlockingCollection.GetConsumingEnumerable()
, each time, the BlockingCollection
will remove the item from the underlying queue (that means all together with the reference) so at the end of the Process() method which processes the item, the item can be garbage collected.
But this is not true. It seems the foreach loop on BlockingCollection.GetConsumingEnumerable()
does hold all the references of the items entered into the queue. All items are held (thus prevented from being garbage collected) until the stepping out of the foreach loop.
Instead of using the simple foreach loop on BlockingCollection.GetConsumingEnumerable()
, I use a while loop testing BlockingCollection.IsComplete flag and inside the loop I use BlockingCollection.Take()
to grab a consumable item. I would assume that BlockingCollection.Take()
has a similar effect as List.Remove()
, which will remove the reference of the item from the BlockingCollection. But again this is wrong. All items are only garbage collected outside the while loop.
So my question is, how can we easy implement the requirement such that BlockingCollection potentially holds memory-consuming items and each item can be garbage collected once it is consumed by the consumer? Thank you very much for any help.
EDIT: as requested, a simple demo code is added:
// Entity is what we are going to process.
// The finalizer will tell us when Entity is going to be garbage collected.
class Entity
{
private static int counter_;
private int id_;
public int ID { get{ return id_; } }
public Entity() { id_ = counter++; }
~Entity() { Console.WriteLine("Destroying entity {0}.", id_); }
}
...
private BlockingCollection<Entity> jobQueue_ = new BlockingCollection<Entity>();
private List<Task> tasks_ = new List<Task>();
// This is the method to launch and wait for the tasks to finish the work.
void Run()
{
tasks_.Add(Task.Factory.StartNew(ProduceEntity);
Console.WriteLine("Start processing.");
tasks_.Add(Task.Factory.StartNew(ConsumeEntity);
Task.WaitAll(tasks_.ToArray());
}
// The producer creates Entity instances and add them to BlockingCollection.
void ProduceEntity()
{
for(int i = 0; i < 10; i ++) // We are adding totally 10 entities.
{
var newEntity = new Entity();
Console.WriteLine("Create entity {0}.", newEntity.ID);
jobQueue_.Add(newEntity);
}
jobQueue_.CompleteAdding();
}
// The consumer takes entity, process it (and what I need: destroy it).
void ConsumeEntity()
{
while(!jobQueue_.IsCompleted){
Entity entity;
if(jobQueue_.TryTake(entity))
{
Console.WriteLine("Process entity {0}.", entity.ID);
entity = null;
// I would assume after GC, the entity will be finalized and garbage collected, but NOT.
GC.Collect();
GC.WaitForPendingFinalizers();
GC.Collect();
}
}
Console.WriteLine("Finish processing.");
}
The output is that all the creation and process messages, followed by "Finish processing." and followed by all the destruction messages from the entities. And creation entities message showing Entity.ID from 0 to 9 and the destruction messages showing Entity.ID from 9 to 0.
EDIT:
Even when I set the BlockingCollection's bound capacity, all the items ever entering it are finalized only when the loop exits, which is weird.