views:

166

answers:

2

I have a situation where I have a single activemq broker with 2 queues, Q1 and Q2. I have two ruby-based consumers using activemessaging. Let's call them C1 and C2. Both consumers subscribe to each queue. I'm setting activemq.prefetchSize=1 when subscribing to each queue. I'm also setting ack=client.

Consider the following sequence of events:

1) A message that triggers a long-running job is published to queue Q1. Call this M1.

2) M1 is dispatched to consumer C1, kicking off a long operation.

3) Two messages that trigger short jobs are published to queue Q2. Call these M2 and M3.

4) M2 is dispatched to C2 which quickly runs the short job.

5) M3 is dispatched to C1, even though C1 is still running M1. It's able to dispatch to C1 because prefetchSize=1 is set on the queue subscription, not on the connection. So the fact that a Q1 message has already been dispatched doesn't stop one Q2 message from being dispatched.

Since activemessaging consumers are single-threaded, the net result is that M3 sits and waits on C1 for a long time until C1 finishes processing M1. So, M3 is not processed for a long time, despite the fact that consumer C2 is sitting idle (since it quickly finishes with message M2).

Essentially, whenever a long Q1 job is run and then a whole bunch of short Q2 jobs are created, exactly one of the short Q2 jobs gets stuck on a consumer waiting for the long Q1 job to finish.

Is there a way to set prefetchSize at the connection level rather than at the subscription level? I really don't want any messages dispatched to C1 while it is processing M1. The other alternative is that I could create a consumer dedicated to processing Q1 and then have other consumers dedicated to processing Q2. But, I'd rather not do that since Q1 messages are infrequent--Q1's dedicated consumers would sit idle most of the day tying up memory.

+1  A: 

The activemq.prefetchSize is only available on a SUBSCRIBE message, not a CONNECT, according to the ActiveMQ docs for their extended stomp headers (http://activemq.apache.org/stomp.html). Here is the relevant info:

verb: SUBSCRIBE

header: activemq.prefetchSize

type: int

description: Specifies the maximum number of pending messages that will be dispatched to the client. Once this maximum is reached no more messages are dispatched until the client acknowledges a message. Set to 1 for very fair distribution of messages across consumers where processing messages can be slow.

My reading and experience with this, is that since M1 has not been ack'd (b/c you have client ack turned on), that this M1 should be the 1 message allowed by prefetchSize=1 set on the subscription. I am surprised to hear that it didn't work, but perhaps I need to run a more detailed test. Your settings should be correct for the behavior you want.

I have heard of flakiness from others about the activemq dispatch, so it is possible this is a bug with the version you are using.

One suggestion I would have is to either sniff the network traffic to see if the M1 is getting ack'd for some reason, or throw some puts statements into the ruby stomp gem to watch the communication (this is what I usually end up doing when debugging stomp problems).

If I get a chance to try this out, I'll update my comment with my own results.

One suggestion: It is very possible that multiple long processing messages could be sent, and if the number of long processing messages exceeds your number of processes, you'll be in this fix where quick processing messages are waiting.

I tend to have at least one dedicated process that just does quick jobs, or to put it another way, dedicate a set # of processes that just do longer jobs. Having all poller consumer processes listen to both long and short can end up with sub-optimal results no matter what dispatch does. Process groups are the way to configure a consumer to listen to a subset of destinations: http://code.google.com/p/activemessaging/wiki/Configuration

processor_group name, *list_of_processors

A processor group is a way to run the poller to only execute a subset of

the processors by passing the name of the group in the poller command line arguments.

You specify the name of the processor as its underscored lowercase

version. So if you have a FooBarProcessor and BarFooProcessor in a processor group, it would look like this:

    ActiveMessaging::Gateway.define do |s|
      ...
      s.processor_group :my_group, :foo_bar_processor, :bar_foo_processor
    end

The processor group is passed into the poller like the following:

    ./script/poller start -- process-group=my_group
Andrew Kuklewicz
Thanks for the response. We're using processor groups to distinguish between high priority and low priority jobs. High priority jobs are ones kicked off by our UI, and low priority jobs are kicked off by cron. We could use a processor group here, but the long running job is actually only kicked off once per day. We could remove it from activemq altogether. For the long job, we're mainly just using activemq for fault tolerance since we have consumers on multiple servers.
Clint Miller
A: 

I'm not sure if ActiveMessaging supports this, but you could unsubscribe your other consumers when the long processing message arrives and then re-subscribe them after it get processed.

It should give you the desired effect.

Hiram Chirino