views:

364

answers:

1

I am using service broker as my messaging system to schedule and run jobs. Eash job is composed of multiple tasks or steps called engines.

My service broker objects are:

  • MessageTypes: SubmitJob, JobResponse, SubmitTask, TaskResponse
  • Contracts: JobContract, TaskContract
  • Queues: ClientQueue, JobQueue, EngineQueue, ExternalActivatorQueue
  • Services: ClientService, JobService, EngineService, ExternalActivatorService
  • Event Notification: EventNotificationEngineQueue

I have internal activation (stored proc) on the jobqueue. For SubmitJob MessageTypes the stored proc gets the first task for that job, starts a dialog with the EngineService and sends a message to that Queue (StartTask) For TaskResponses MessageType, I check to see if there are any more task for this job, if there are then they get submitted to the EngineQueue, if not then the task for this job are complete (send message and clean up.)

That all seems to be working great. However, I want to have a external app (engine) that will process the EngineQueue messages. So I am using Microsoft's external activation mechanism (ssbeas.exe.) It took a long time but I finally got it to work. A message goes into the EngineQueue, the EventNotificationEngineQueue fires up my application and drains the queue. So far so good. However, my app seems to be running multiple times. My test application is configured to send an email when it completes. Even though I only send one job with one task I get multiple emails (indicating the program ran multiple times.)

Here is the code for my app (vb.net) (broker is a object that encapsulates the service broker services (Send, Receive, etc.) :

While True

            oBroker.tran = oBroker.cnn.BeginTransaction

            oBroker.Receive("EMGQueue", msgType, msg, serviceInstance, dialogHandle)

            If dialogHandle = System.Guid.Empty Then
                'Console.WriteLine("An Error Occurred. Program Terminated.")
                oBroker.tran.Commit()
                Exit While
            End If

            ConsoleWriteLine("Received: " & msgType)

            If (msg Is Nothing) Then
                ConsoleWriteLine("commiting and exiting")
                oBroker.tran.Commit()
                Exit While

            Else
                Select Case (msgType)
                    Case "SubmitTask"
                        ProcessMsg(oBroker.cnn, oBroker.tran, msgType, msg, iTaskID, iTaskKey)
                        oBroker.Send(dialogHandle, "<TaskStatus>1</TaskStatus>'")

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog"
                        oBroker.EndDialog(dialogHandle)

                    Case "http://schemas.microsoft.com/SQL/ServiceBroker/Error"""
                        oBroker.EndDialog(dialogHandle)

                End Select
            End If


            ConsoleWriteLine("commiting...")
            oBroker.tran.Commit()

        End While

I dont understand why the app is running multiple times but beyond that I dont understand why the subsequent versions are still able to see the message in the queue. After all, the first incarnation should have locked the message in the queue. It does lock the queue because I was able to test using query manager to try and receive a message while my app was running and it was blocked.

I have tried playing with the concurrency values in the EAService.config. When I set it to min="0" and max="1" I did reduced the number times the app seem to be running down to two Previously, using min="0" and max="10", it was running it seemed like 18 copies.

I hope that made sense and sorry about the length. Anyone have any ideas what is happening here? Have I made a mistake in my .net app coding?

Thanks Martin

Edit: adding log that is created after the Engine has run:

2010-02-08 09:31:39 - Main
2010-02-08 09:31:39 - Received: SubmitTask
2010-02-08 09:31:39 - ProcessMsg
2010-02-08 09:31:39 - <Task><TaskID> 5</TaskID><TaskKey>2</TaskKey></Task>
2010-02-08 09:31:39 - DoWork
2010-02-08 09:31:39 - Sending Email
2010-02-08 09:31:40 - commiting...
2010-02-08 09:31:40 - Sleeping
2010-02-08 09:32:10 - Sleeping Completed.
2010-02-08 09:32:10 - Main Complete
2010-02-08 09:32:10 - External activated application succeeds and terminates now.
2010-02-08 09:32:10 - Main
2010-02-08 09:32:10 - Received: SubmitTask
2010-02-08 09:32:10 - ProcessMsg
2010-02-08 09:32:10 - <Task><TaskID> 5</TaskID><TaskKey> 2</TaskKey></Task>
2010-02-08 09:32:10 - DoWork
2010-02-08 09:32:10 - Sending Email
2010-02-08 09:32:10 - commiting...
2010-02-08 09:32:10 - Sleeping
2010-02-08 09:32:40 - Sleeping Completed.
2010-02-08 09:32:40 - Main Complete
2010-02-08 09:32:40 - External activated application succeeds and terminates now.

You can see it goes through the whole app twice (main, received, dowork, sendemail, complete.)

Edit 2: here is the latest incarnation of the stored procedure (debuggin statements and all) that gets activated when a job is submitted to the queue:

ALTER PROCEDURE [dbo].[pr_ProcessJob] AS BEGIN

    DECLARE     @message_type_name      sysname
    DECLARE     @dialog             uniqueidentifier 
    DECLARE     @message_sequence_number    bigint
    DECLARE     @error_message_sequence_number  bigint
    DECLARE     @message_body           xml
    DECLARE     @cgid               uniqueidentifier
    DECLARE     @JobID              int
    DECLARE     @Params             varchar(MAX)

    DECLARE     @ErrorNumber            bigint
    DECLARE     @ErrorText          nvarchar(MAX)

    DECLARE         @TaskID             int 
    DECLARE     @TaskService            varchar(100)
    DECLARE     @TaskKey            int
    DECLARE     @chEngine           uniqueidentifier
    DECLARE     @Step               int
    DECLARE     @NextStep           int
    DECLARE     @jobch              uniqueidentifier
    DECLARE     @EngineMsg          XML
    DECLARE     @TimeStarted            datetime
    DECLARE     @TaskStatus         int

    --  This procedure will just sit in a loop processing Task messages in the queue
    --  until the queue is empty
    SET NOCOUNT ON
    SET @error_message_sequence_number = -100


    PRINT 'pr_ProcessJob: Start'

    WHILE (1=1) BEGIN

        BEGIN TRY

            PRINT 'pr_ProcessJob: BEGIN TRANSACTION'
            BEGIN TRANSACTION

            -- first lets get the conversation group id for the next message.
            WAITFOR (
                GET CONVERSATION GROUP @cgid FROM [JobQueue]
            ), TIMEOUT 1000

            IF (@@ROWCOUNT = 0) BEGIN

                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (GET CONVERSATION)'
                ROLLBACK TRANSACTION
                BREAK
            END

            PRINT @CGID

            -- Inner Loop (Message Processing)
            WHILE (1=1) BEGIN

                -- Receive the next available message
                PRINT 'Receiving Message.'
                WAITFOR (
                    RECEIVE top(1) -- just handle one message at a time
                        @message_type_name=message_type_name,  --the type of message received
                        @message_body=CAST(message_body AS XML),      -- the message contents
                        @message_sequence_number=message_sequence_number,
                        @dialog = conversation_handle    -- the identifier of the dialog this message was received on
                        FROM [JobQueue]
                        WHERE conversation_group_id=@cgid
                ), TIMEOUT 3000  -- if the queue is empty for three seconds, give up and go away

                -- If we didn't get anything, the queue is empty so bail out

                IF (@@ROWCOUNT = 0) BEGIN               
                    PRINT 'pr_ProcessJob::WaitFor - No messages for conversation group bailing out'
                    BREAK
                END --IF (@@ROWCOUNT = 0)

                PRINT 'Message Received: ' + @message_type_name
                SAVE TRANSACTION MessageReceivedSavePoint

                -- Handle the End Conversation Message
                IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                    -- When we receive an End Dialog, we need to end also.
                    PRINT 'ENDING CONVERSATION'
                    END CONVERSATION @dialog
                END -- IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog') BEGIN
                ELSE BEGIN
                    -- Handle the Conversation Error Message
                    IF (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error') BEGIN
                        -- We can't return anything here because the dialog at the other end is closed so just log 
                        -- an error and close our end of the conversation.

                        PRINT 'ENDING CONVERSATION w/Error'
                        END CONVERSATION @dialog
                    END -- (@message_type_name = 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                    ELSE BEGIN
                        IF (@message_type_name = 'SubmitJob')  BEGIN -- Process normal Job messages..   

                            PRINT 'pr_ProcessJob:: Message Type SubmitJob received.'

                            -- Pull the information out of the task message with XQuery
                            SELECT @JobID = @message_body.value('(/Job/JobID)[1]', 'int'),
                                @Params = @message_body.value('(/Job/Params)[1]', 'varchar(MAX)')

                            PRINT 'pr_ProcessJob::@JobID = ' + cast(@jobID as varchar(10))
                            PRINT 'pr_ProcessJob::@Params = ' + @Params


                            SELECT  @ErrorNumber = 0, @ErrorText = N''

                            -- Do something with the job
                            -- save state

                            -- we are looking for the first step 
                            SET @Step=1

                            PRINT 'Selecting from JobTask'
                            ---------------------------------------------------------
                            -- Get the next task
                            ---------------------------------------------------------
                            SELECT  TOP 1
                                @TaskID=task.TaskID, 
                                @TaskService=tt.TaskService, 
                                @TaskKey =Task.TaskKey
                            FROM JobTask task INNER JOIN TaskType tt
                                ON task.TaskTypeID = tt.TaskTypeID      
                            WHERE task.jobID=@JobID AND task.enabled=1 and task.step>=@step
                            ORDER BY Task.step
                            ---------------------------------------------------------
                            PRINT 'Selecting from JobTask: complete'

                            PRINT 'Step='+cast(@step as varchar(max))               
                            PRINT 'TaskID='+cast(@TaskID as varchar(max))
                            PRINT 'TaskService='+cast(@TaskService as varchar(max))
                            PRINT'TaskKey='+cast(@TaskKey as varchar(max))                      

                            PRINT 'BEGIN DIALOG with ' + @TaskService   

                            BEGIN DIALOG @chEngine
                                FROM SERVICE [JobService]
                                TO SERVICE @TaskService
                                ON CONTRACT [TaskContract]
                                WITH RELATED_CONVERSATION=@dialog;

                            PRINT 'BEGIN DIALOG with ' + @TaskService+' completed.'

                            SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                            PRINT CAST(@EngineMsg as varchar(max))

                            PRINT 'Sending Message Type SubmitTask to Engine.';

                            SEND ON CONVERSATION @chEngine
                                MESSAGE TYPE SubmitTask
                                (@EngineMsg)

                            PRINT 'Inserting into jobstate'

                            INSERT INTO JobState(cgid, jobch, jobID, step) VALUES(@cgid, @dialog, @jobid, @step)

                        END -- IF (@message_type_name = 'SubmitJob') 
                        ELSE BEGIN

                            IF (@message_type_name = 'TaskResponse')  BEGIN 

                                PRINT 'Processing MessageType TaskResponse'

                                SELECT @TaskStatus = @message_body.value('(/TaskStatus)[1]', 'int')

                                PRINT 'pr_ProcessJob::@TaskStatus = ' + cast(@TaskStatus as varchar(10))

                                PRINT 'Loading State'

                                --LoadState
                                SELECT  @JobID=jobid, 
                                    @Step=Step, 
                                    @jobch=jobch,
                                    @TimeStarted=sysdate    
                                FROM Jobstate
                                WHERE cgid=@cgid

                                PRINT 'Loading State complete' 

                                PRINT @jobch

                                PRINT 'Selecting from JobTask'
                                ---------------------------------------------------------
                                -- Get the next task
                                ---------------------------------------------------------
                                SELECT  TOP 1
                                    @TaskID=task.TaskID, 
                                    @TaskService=tt.TaskService, 
                                    @TaskKey =task.TaskKey,
                                    @NextStep = task.Step

                                FROM JobTask task INNER JOIN TaskType tt
                                    ON task.TaskTypeID = tt.TaskTypeID      
                                WHERE task.jobID=@JobID AND task.enabled=1 and task.step>@step
                                ORDER BY Task.step
                                ---------------------------------------------------------
                                PRINT 'Selecting from JobTask: complete'

                                PRINT 'NextTask: ['+@TaskService+']'

                                if (@TaskService is null) BEGIN

                                    PRINT '@TaskService is NULL: BEGIN'

                                    -- no more tasks
                                    --END CONVERSATION @jobch

                                    PRINT 'Removing from state table'                               
                                    DELETE FROM JobState 
                                    WHERE @cgid=cgid
                                    PRINT @@ROWCOUNT
                                    PRINT 'Removing from state table-completed'

                                    DECLARE @ResponseDoc xml

                                    -- Send a response message saying we're done
                                    DECLARE @Time nvarchar(100)                 
                                    SET @Time = cast(getdate() as nvarchar(100))

                                    DECLARE @TimeStartedText nvarchar(100)
                                    SET @TimeStartedText = cast(@TimeStarted as nvarchar(100))

                                    SET @ResponseDoc = N'<Job/>'
                                    SET @ResponseDoc.modify(
                                    'insert (<JobID>{ sql:variable("@JobID") }</JobID>, 
                                    <JobStatus>{ sql:variable("@ErrorNumber") }</JobStatus>,
                                    <ErrorNumber>{ sql:variable("@ErrorNumber") }</ErrorNumber>,
                                    <ErrorText>{ sql:variable("@ErrorText") }</ErrorText>,
                                    <TimeStarted>{ sql:variable("@TimeStartedText") }</TimeStarted>, 
                                    <TimeCompleted>{ sql:variable("@Time") }</TimeCompleted>) 
                                    as last into /Job [1]'); 

                                    SEND ON CONVERSATION @jobch 
                                        MESSAGE TYPE [JobResponse] (@ResponseDoc)

                                    END CONVERSATION @jobch             

                                    PRINT '@TaskService is NULL: END'

                                END --if (@TaskService is null) BEGIN
                                ELSE BEGIN
                                    -- there are more tasks 
                                    PRINT '@TaskService is not null: BEGIN'

                                    PRINT 'BEGIN DIALOG with ' + @TaskService

                                    --another task
                                    BEGIN DIALOG @chEngine
                                        FROM SERVICE [JobService]
                                        TO SERVICE @TaskService
                                        ON CONTRACT [TaskContract]
                                        WITH RELATED_CONVERSATION=@dialog;

                                    SET @EngineMsg = CAST('<Task><TaskID>'+ str(@TaskID)+'</TaskID><TaskKey>'+ str(@Taskkey)+'</TaskKey></Task>' as XML);

                                    PRINT 'SEND ' +cast(@EngineMsg as varchar(max));

                                    SEND ON CONVERSATION @chEngine
                                    MESSAGE TYPE SubmitTask (@EngineMsg)                                    

                                    PRINT 'SAVING State: ' +str(@step)

                                    -- save state
                                    Update JobState
                                        SET step = @NextStep
                                    FROM JobState
                                    WHERE cgid=@cgid

                                    PRINT '@TaskService is not null: END'

                                END -- ELSE (@TaskService is NOT NULL)

                                PRINT 'Processing MessageType TaskResponse...Complete'

                            END -- IF (@message_type_name = 'TaskCompleted')

                        END -- ELSE IF (@message_type_name <> 'JobRequest') 
                    END -- ELSE  (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/Error')
                END -- ELSE (@message_type_name <> 'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog')
            END -- WHILE (1=1) BEGIN

            PRINT 'COMMIT TRANSACTION'
            COMMIT TRANSACTION

        END TRY
        BEGIN CATCH

            --rollback transaction

            DECLARE @ErrNum int
            DECLARE @ErrMsg varchar(max)


                SELECT
                    ERROR_NUMBER() AS ErrorNumber
                    ,ERROR_SEVERITY() AS ErrorSeverity
                    ,ERROR_STATE() AS ErrorState
                    ,ERROR_PROCEDURE() AS ErrorProcedure
                    ,ERROR_LINE() AS ErrorLine
                    ,ERROR_MESSAGE() AS ErrorMessage;

            PRINT 'pr_ProcessJob: ROLLBACK (CATCH)'


            if (error_number()=1205) BEGIN
                -- a deadlock occurred. We can try it again.
                PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                ROLLBACK TRANSACTION
                --CONTINUE
            END --if (error_number()=1205)
            ELSE BEGIN
                if (error_number()=9617) BEGIN
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION (CATCH)'
                    ROLLBACK TRANSACTION
                END
                ELSE BEGIN -- (error_number()<>9617) 
                    -- another error occurred.  The message cant be procesed sucessfully
                    PRINT 'pr_ProcessJob: ROLLBACK TRANSACTION to MessageReceivedSavePoint (CATCH)'
                    ROLLBACK TRANSACTION MessageReceivedSavePoint

                END --ELSE (error_number()<>9617)       
            END -- if (error_number()<>1205)
        END CATCH

    END -- while loop

PRINT 'pr_ProcessJob: Complete' 

END -- CREATE PROCEDURE [dbo].[ProcessJobProc]
A: 

If the subsequent instances of your app are able to see the message, it means only one thing: the previous instance must have rolled back the receive. At a first glance the code you provided looks OK, so I would go and look for errors in the object model you're using. If one of its methods throws an exception, the app will be terminated and the transaction that received the message automatically rolled back.

If you want only a single instance of your app running at a time, keep the Max setting at 1, otherwise it will by default run more instances concurrently to keep up with the load.

Pawel Marciniak
First, thanks for the help. I also thought it was some kind of exception/rollback at the app level so I started logging to a file (I added an example of the log to my question because it would not fit here.) As you can see, it cycles through the app twice including the commit statement. btw, it only does this upon external activation. If I run the app in VS or a command line it only runs once. I'm baffled. Thanks again.
Are you sure you're not actually sending two messages? What's the logic (code) that sends these Task messages?
Pawel Marciniak
I have a job queue that has internal activation turned on. The stored procedure gets the next task based on the job and current state (which is saves off.) The tasks are processed based on type and sent to the appropriate queue. The SP is to big to post here but I will see if I can edit it into my orig post.
The last thing would be to check how the situation looks like from External Activator's point of view. If you enable the following tracing options in EA's config file, you will get a detailed description of its actions in the trace file.<LogSettings> <LogFilter> <TraceFlag>All Levels</TraceFlag> <TraceFlag>All Modules</TraceFlag> <TraceFlag>All Entities</TraceFlag> </LogFilter></LogSettings>
Pawel Marciniak