views:

112

answers:

1

Currently, I have a RingBuffer which is run by a producer and a consumer thread.

In looking for a method of terminating them orderly, I thought I'd use a flag to indicate when the producer had finished and then check that flag in my consumer along with the number of ring buffer slots that need to be written. If the producer has finished and the ring buffer has no slots that need to be written the consumer can terminate.

That works well.

However, if I artificially lengthen the time the producer takes by inserting a sleep, the consumer does not terminate. I believe this is a consequence of the semaphores being used.

Here is the code I'm working with. Notice that the program will "hang" after all slots have been written. The producer terminates, but the consumer "hangs".

Any advice on terminating both in an orderly fashion would be greatly appreciated.

Edit - Updated code with Henk's suggestion of using a Queue. +1000 points to the first person to suggest a better method of terminating the consumer/producer threads than either knowing the exact amount of items being worked with or returning a value such as null/nothing indicating that no more items exist in the queue (though this doesn't mean they aren't still being produced.)

Edit - I believe I've figured it out. Simply pass null or nothing to RingBuffer.Enqueue for each consumer and catch the null or nothing object in the consumer to terminate it. Hopefully someone finds this useful.

Imports System.Collections

Module Module1

    Public Class RingBuffer

        Private m_Capacity As Integer
        Private m_Queue As Queue

        Public Sub New(ByVal Capacity As Integer)

            m_Capacity = Capacity
            m_Queue = Queue.Synchronized(New Queue(Capacity))

        End Sub

        Public Sub Enqueue(ByVal value As Object)

            SyncLock m_Queue.SyncRoot

                If m_Queue.Count = m_Capacity Then
                    Threading.Monitor.Wait(m_Queue.SyncRoot)
                End If

                m_Queue.Enqueue(value)

                Threading.Monitor.PulseAll(m_Queue.SyncRoot)

            End SyncLock

        End Sub

        Public Function Dequeue() As Object

            Dim value As Object = Nothing

            SyncLock m_Queue.SyncRoot

                If m_Queue.Count = 0 Then
                    Threading.Monitor.Wait(m_Queue.SyncRoot)
                End If

                value = m_Queue.Dequeue()

                Console.WriteLine("Full Slots: {0} - Open Slots: {1}", m_Queue.Count, m_Capacity - m_Queue.Count)

                Threading.Monitor.PulseAll(m_Queue.SyncRoot)

            End SyncLock

            Return value

        End Function

    End Class

    Public Class Tile

        Public buffer() As Byte

        Public Sub New()

            buffer = New Byte(1023) {}

        End Sub

    End Class

    Public Sub Producer(ByVal rb As RingBuffer)

        Dim enq As Integer = 0
        Dim rng As New System.Security.Cryptography.RNGCryptoServiceProvider

        For i As Integer = 0 To 1023
            Dim t As New Tile
            rng.GetNonZeroBytes(t.buffer)
            rb.Enqueue(t)
            enq += 1
            Threading.Thread.Sleep(10)
        Next i
        rb.Enqueue(Nothing)

        Console.WriteLine("Total items enqueued: " & enq.ToString())
        Console.WriteLine("Done Producing!")

    End Sub

    Public Sub Consumer(ByVal rb As RingBuffer)

        Dim deq As Integer = 0

        Using fs As New IO.FileStream("c:\test.bin", IO.FileMode.Create)
            While True
                Dim t As Tile = rb.Dequeue()
                If t Is Nothing Then Exit While                
                fs.Write(t.buffer, 0, t.buffer.Length)
                deq += 1
                Threading.Thread.Sleep(30)
            End While
        End Using

        Console.WriteLine("Total items dequeued: " & deq.ToString())
        Console.WriteLine("Done Consuming!")

    End Sub

    Sub Main()

        Dim rb As New RingBuffer(1000)

        Dim thrdProducer As New Threading.Thread(AddressOf Producer)
        thrdProducer.SetApartmentState(Threading.ApartmentState.STA)
        thrdProducer.Name = "Producer"
        thrdProducer.IsBackground = True
        thrdProducer.Start(rb)

        Dim thrdConsumer As New Threading.Thread(AddressOf Consumer)
        thrdConsumer.SetApartmentState(Threading.ApartmentState.STA)
        thrdConsumer.Name = "Consumer"
        thrdConsumer.IsBackground = True
        thrdConsumer.Start(rb)

        Console.ReadKey()

    End Sub

End Module
A: 

If I look at the Consumer function:

If rb.FullSlots = 0 And Threading.Interlocked.Read(ProducerFinished) = 0 Then
   Exit While
End If
Dim t As Tile = rb.Read()

The consumer could find rb.FullSlots = 0 but ProducerFinished = False and continue to Read(). Inside Read() it waits for the writerSemaphore but in the mean time the Producer could finish and never release the writerSemaphore.

So (at least) the producer should take steps to let the readers continue after it decreases the ProducerFinished.

But I think you get a better design if you move this 'Closing' logic to the Ring buffer. There you can combine it with the Data-available logic.

Henk Holterman
I guess as long as I know how many items are going to be produced ahead of time I can simply expect that many in the producer/consumer threads, but there must be a method of doing this for an unknown number of items...
I would advice to go simpler - build a class around a Queue(of T) and a simple boolean for Closing. Use an internal syncRoot and use Wait and Pulse to deal with an empty Queue. Your current approach seems to be based on a lock-free design (but with locks :-)
Henk Holterman
Indeed it is :) Thanks Henk, I'll give it a try.