views:

110

answers:

4

I have written the following code to perform some simultaneous HTTP posting and file archiving:

Dim t1 As New Threading.Thread(New Threading.ThreadStart(AddressOf ProcessNTSMessageQueue))
Dim t2 As New Threading.Thread(New Threading.ThreadStart(AddressOf ProcessNTSMessageQueue))
Dim t3 As New Threading.Thread(New Threading.ThreadStart(AddressOf ProcessSuccessfulNTSMessageQueue))

t1.Start()
t2.Start()
t3.Start()

t1.Join()
t2.Join()
t3.Join()

I have two threads (1 & 2) reading a Queue of XML messages and posting the messages via HTTP to a web server. Once a message is posted it is dequeued from this queue added to a second queue. Thread 3 reads this queue and simply writes the XML out to a file.

I would expect to see XML files gradually appearing as threads 1 & 2 process the message queue (sometimes the message queue can take about 40 minutes to process). What I am actually seeing is that no XML files appear. I am I misunderstanding the code?

*Edit. Here's the code for the four relevent threaded methods:

Private Sub ProcessNTSMessageQueue()
        While True
            Dim msg As String = Nothing
            SyncLock _MessageQueue

                If _MessageQueue.Count > 0 Then
                    msg = _MessageQueue.Dequeue()
                Else
                    Exit Sub
                End If

            End SyncLock

            'Post the message
            'it's important to do this outside lock() 
            If msg <> Nothing Then
                Me.PostXMLToNTS(msg)
            End If
        End While

    End Sub

Private Sub ProcessSuccessfulNTSMessageQueue()
        While True
            Dim msg As String = Nothing
            SyncLock _SentMessageQueue

                If _SentMessageQueue.Count > 0 Then
                    msg = _SentMessageQueue.Dequeue()
                Else
                    Exit Sub
                End If

            End SyncLock

            'Post the message
            'it's important to do this outside lock() 
            If msg <> Nothing Then
                Me.ArchiveXMLAsFile(Guid.NewGuid.ToString, msg)
                _SuccessfulMessageCount += 1
            End If

        End While
    End Sub

Private Function PostXMLToNTS(ByVal XMLString As String) As Boolean

        Try

            Dim result As Net.HttpStatusCode = NTSPoster.PostXml(XMLString, _Settings.NTSPostURLCurrent, _Settings.NTSPostUsernameCurrent, _Settings.NTSPostPasswordCurrent)

            Select Case result
                Case Net.HttpStatusCode.Accepted, Net.HttpStatusCode.OK 'Good

                    If _SentMessageQueue Is Nothing Then
                        _SentMessageQueue = New Queue(Of String)
                    End If
                    _SentMessageQueue.Enqueue(XMLString)

                    Return True

                Case Else 'Probably bad

                    If _FailedMessageQueue Is Nothing Then
                        _FailedMessageQueue = New Queue(Of String)
                    End If
                    _FailedMessageQueue.Enqueue(XMLString)

                    Return False

            End Select

        Catch ex As Exception
            Throw
        End Try
    End Function

Private Sub ArchiveXMLAsFile(ByVal name As String, ByVal XML As String)

        Try
            'Create directory in archive location based on todays date
            Dim dir As New DirectoryInfo(Me.TodaysXMLArchiveLocation)

            'If the directory does not already exist then create it
            If Not dir.Exists Then
                dir.Create()
            End If

            Dim FileName As String = String.Format("{0}.xml", name)
            Using sw As StreamWriter = New StreamWriter(Path.Combine(dir.FullName, FileName))
                sw.Write(XML)
                sw.Close()
            End Using

        Catch ex As Exception
            Throw
        End Try

    End Sub
A: 

No, you're not misunderstanding the code; to me, it looks like there's a bug in one of your functions (or ProcessNTSMessageQueue is not written in a thread-safe way and, hence, cannot be parallelized).

Have you tried single-stepping your code in Visual Studio? You can hit the pause button and then select a thread from the drop-down list in the tool bar.

Heinzi
+2  A: 

If the code of ProcessNTSMessageQueue and ProcessSuccessfulNTSMessageQueue is correct, you should see your expected results.

Things to check:

  • Is your Queue threadsafe?
  • How are you dequeing objcts?
  • How does the reader thread get notified of new writes in the queue?
Jorge Córdoba
hmm, your third point is interesting - I'm not sure if it is getting notified.
Simon
+3  A: 

This section of ProcessSuccessfulNTSMessageQueue exits the method (and terminates the thread) the first time through, since there probably aren't any messages on the queue when the thread begins:

If _SentMessageQueue.Count > 0 Then
    msg = _SentMessageQueue.Dequeue()
Else
    Exit Sub
End If

The question is, how else can you know when the third thread is done, right? The solution is to make another class-level variable to signal when the first two threads are done. In ProcessNTSMessageQueue, you'd set it here:

If _MessageQueue.Count > 0 Then
    msg = _MessageQueue.Dequeue()
Else
    _IsStartQueueEmpty = True;
    Exit Sub
End If

And in ProcessSuccessfulNTSMessageQueue, you'd use it like this:

If _SentMessageQueue.Count > 0 Then
    msg = _SentMessageQueue.Dequeue()
Else
    If _IsStartQueueEmpty Then
        Exit Sub
End If
Jeff Sternal
I'd say so that's what's happening
Jorge Córdoba
Should probably be continue (or VB equivalent)
Jorge Córdoba
this works exactly as I wanted. Thanks very much.
Simon
+2  A: 

I agree with @Jeff Sternal as to the likely problem, namely that the ProcessSuccessfulNTSMessageQueue() is exiting early.

Beyond that, it looks like you've got a race condition regarding the _SentMessageQueue variable. In PostSuccessfulNTSMessageQueue(), you lock on the queue. But it looks like the queue is created in the PostXMLToNTS() method. If this is this the case, then you are running the risk that the lock statement in the ProcessSuccessfulNTSMessageQueue() will thrown an ArgumentNullException if the queue has not been created yet.

In addition, you are not synchronizing either Enqueue() operation in the PostXMLToNTS() method. These need to be synchronized like the Dequeue() operation is in the ProcessSuccessfulNTSMessageQueue() method.

Matt Davis
Good finds - I was too narrowly focused on the main question. These will need to be fixed too.
Jeff Sternal