producer-consumer

Producer work consistently hashing to consumers via a message queue?

I have a producer that I want to distribute work consistently across consumers by consistent hashing. For example, with consumer nodes X and Y, tasks A, B, C should always go to consumer X, and D, E, F to consumer Y. But that may shift a little if Z joins the pool of consumers. I didn't want to deal with writing my own logic to connect ...

possible memory leak?

i'm profiling the below code inside a singltone and found that a lot of Rate objects are kept in memory altough i clear them. protected void FetchingRates() { int count = 0; while (true) { try { if (m_RatesQueue.Count > 0) { List<RateLog> temp = null; lock (m_RatesQueue) { te...

possible memory leak in a singleton?

I've asked this question before with no real answer. Can anybody help? I'm profiling the below code inside a singleton and found that a lot of Rate objects (List<Rate>) are kept in memory although I clear them. protected void FetchingRates() { int count = 0; while (true) { try { if (m_RatesQueue.Count > 0) { ...

Irregular Transmission Problem with Python Twisted Push Producer

I want to transmit data from a Queue using Twisted. I currently use a push producer to poll the queue for items and write to the transport. class Producer: implements(interfaces.IPushProducer) def __init__(self, protocol, queue): self.queue = queue self.protocol = protocol def resumeProducing(self): ...

ActiveMQ: Slow processing consumers

Concerning ActiveMQ: I have a scenario where I have one producer which sends small (around 10KB) files to the consumers. Although the files are small, the consumers need around 10 seconds to analyze them and return the result to the producer. I've researched a lot, but I still cannot find answers to the following questions: How do I ma...

does Monitor.Wait Needs synchronization?

Hi, I have developed a generic producer-consumer queue which pulses by Monitor in the following way: the enqueue : public void EnqueueTask(T task) { _workerQueue.Enqueue(task); Monitor.Pulse(_locker); } the dequeue: private T Dequeue() { T dequeueItem; if (_workerQueue.Count > 0) ...

Multithreading producer-consumer pattern using .Net TPL discussion

Firstly, my scenario: An application with data input, data processing and result aggregation. Input data can be huge so it is cut into batches. Batches are individual. Data processing becomes batch processing and is a relatively time-consuming procedure. Finally, each batch processing would result in a batch result. All batch results ar...

Producer Consumer queue does not dispose

Hi, i have built a Producer Consumer queue wrapping a ConcurrentQueue of .net 4.0 with SlimManualResetEvent signaling between the producing (Enqueue) and the consuming (while(true) thread based. the queue looks like: public class ProducerConsumerQueue<T> : IDisposable, IProducerConsumerQueue<T> { private bool _IsActive=true; pu...

Synchronizing producer, consumer and a producer queue.

Hello! I have a producer and a consumer. Producer fills its internal queue with objects, consumer takes these objects one by one. I want to synchronize the cosumer with the producer, so that the consumer blocks when there are no objects ready, and I want to synchronize the producer with itself, so that it stops producing when the queue i...

Boost condition deadlock using wait() in producer-consumer code

I have implemented a basic threaded producer-consumer (thread 1 = producer, thread 2 = consumer) using Boost threads and conditions. I am getting stuck in wait() indefinitely quite often. I can't really see what could be wrong here. Below is some pseudo-code: // main class class Main { public: void AddToQueue(...someData...) { b...

very unique execption for BlockingCollection on .net 4.0

i'm using the BlockingCollection for a Producer Consumer pattern and i got an excecption i think to write a patent on it- only two results in google! the expection is "CompleteAdding may not be used concurrently with additions to the collection" and it happens when i TryAdd on th BlockingCollection as Follows: public void EnqueueTask(T...

Producer-Consumer model - binary semaphore or mutex??

This is mainly about the understanding of the concept, which confuses me. Mutex means that one thread takes the control of the access of shared resource, performs operations and unlocks it, then only other thread can gain access to lock while binary semaphore is like a thread can gain access to the shared resource but gaining acces...

Variable scope in multithreading, why my object reference is lost ?

Briefly under a single producer - single consumer scenario, I used a mutable object for synchronization and passing data and messages between producer and consumer. Shared buffer is a ConcurrentQueue of byte arrays. To implement a circular buffer and prevent heap fragmentation and frequent object swapping by GC I used a ConcurrentBag of ...

Need a sample of single producer / single consumer pattern with .NET 4.0 new features

.NET for added new concurrency features under System.Collection.Concurrent and some synchronization classes. Is there any good sample for single producer - single consumer pattern using these feature ? (Actually I will add a circular buffer pattern to it if it already doesn't implement it as the shared buffer) ...

Multithreading: classical Producer Consumer algorithm

Hi all, Something I don't get about the classical algorithm for the Producer-Consumer problem (from Wikipedia:) semaphore mutex = 1 semaphore fillCount = 0 semaphore emptyCount = BUFFER_SIZE procedure producer() { while (true) { item = produceItem() down(emptyCount) down(mutex) putItemIn...