Currently, I have a RingBuffer which is run by a producer and a consumer thread.
In looking for a method of terminating them orderly, I thought I'd use a flag to indicate when the producer had finished and then check that flag in my consumer along with the number of ring buffer slots that need to be written. If the producer has finished and the ring buffer has no slots that need to be written the consumer can terminate.
That works well.
However, if I artificially lengthen the time the producer takes by inserting a sleep, the consumer does not terminate. I believe this is a consequence of the semaphores being used.
Here is the code I'm working with. Notice that the program will "hang" after all slots have been written. The producer terminates, but the consumer "hangs".
Any advice on terminating both in an orderly fashion would be greatly appreciated.
Edit - Updated code with Henk's suggestion of using a Queue. +1000 points to the first person to suggest a better method of terminating the consumer/producer threads than either knowing the exact amount of items being worked with or returning a value such as null/nothing indicating that no more items exist in the queue (though this doesn't mean they aren't still being produced.)
Edit - I believe I've figured it out. Simply pass null or nothing to RingBuffer.Enqueue for each consumer and catch the null or nothing object in the consumer to terminate it. Hopefully someone finds this useful.
Imports System.Collections
Module Module1
Public Class RingBuffer
Private m_Capacity As Integer
Private m_Queue As Queue
Public Sub New(ByVal Capacity As Integer)
m_Capacity = Capacity
m_Queue = Queue.Synchronized(New Queue(Capacity))
End Sub
Public Sub Enqueue(ByVal value As Object)
SyncLock m_Queue.SyncRoot
If m_Queue.Count = m_Capacity Then
Threading.Monitor.Wait(m_Queue.SyncRoot)
End If
m_Queue.Enqueue(value)
Threading.Monitor.PulseAll(m_Queue.SyncRoot)
End SyncLock
End Sub
Public Function Dequeue() As Object
Dim value As Object = Nothing
SyncLock m_Queue.SyncRoot
If m_Queue.Count = 0 Then
Threading.Monitor.Wait(m_Queue.SyncRoot)
End If
value = m_Queue.Dequeue()
Console.WriteLine("Full Slots: {0} - Open Slots: {1}", m_Queue.Count, m_Capacity - m_Queue.Count)
Threading.Monitor.PulseAll(m_Queue.SyncRoot)
End SyncLock
Return value
End Function
End Class
Public Class Tile
Public buffer() As Byte
Public Sub New()
buffer = New Byte(1023) {}
End Sub
End Class
Public Sub Producer(ByVal rb As RingBuffer)
Dim enq As Integer = 0
Dim rng As New System.Security.Cryptography.RNGCryptoServiceProvider
For i As Integer = 0 To 1023
Dim t As New Tile
rng.GetNonZeroBytes(t.buffer)
rb.Enqueue(t)
enq += 1
Threading.Thread.Sleep(10)
Next i
rb.Enqueue(Nothing)
Console.WriteLine("Total items enqueued: " & enq.ToString())
Console.WriteLine("Done Producing!")
End Sub
Public Sub Consumer(ByVal rb As RingBuffer)
Dim deq As Integer = 0
Using fs As New IO.FileStream("c:\test.bin", IO.FileMode.Create)
While True
Dim t As Tile = rb.Dequeue()
If t Is Nothing Then Exit While
fs.Write(t.buffer, 0, t.buffer.Length)
deq += 1
Threading.Thread.Sleep(30)
End While
End Using
Console.WriteLine("Total items dequeued: " & deq.ToString())
Console.WriteLine("Done Consuming!")
End Sub
Sub Main()
Dim rb As New RingBuffer(1000)
Dim thrdProducer As New Threading.Thread(AddressOf Producer)
thrdProducer.SetApartmentState(Threading.ApartmentState.STA)
thrdProducer.Name = "Producer"
thrdProducer.IsBackground = True
thrdProducer.Start(rb)
Dim thrdConsumer As New Threading.Thread(AddressOf Consumer)
thrdConsumer.SetApartmentState(Threading.ApartmentState.STA)
thrdConsumer.Name = "Consumer"
thrdConsumer.IsBackground = True
thrdConsumer.Start(rb)
Console.ReadKey()
End Sub
End Module