views:

165

answers:

6

Hi, I'm new to Java and really need your help.

I am presently using a queue, a receiver thread puts data into this queue and the parser reads out of this. But the problem is the receiver may receive at incredible peak speed, eg. 3000/sec, while the parser only parses at 100/sec.

EDIT:I have checked, the queue first stays at 100 or so, and after ten seconds it starts to grow at 100 per second, and crashes at 2000 or so. Could it be possible that there is a memory leak?

My code (in a tight loop) is

byte[] data = new byte[1024];
System.arraycopy(udpPacket.getData(), 0, data, 0, 1024);
queue.offer(data);

The heap is filled up too quickly, and I get an outofmemory exception. I guess the problem is that queue is made using a linked-list, and all the pointers must be saved in the heap.

I know a C version that does the same thing(using a buffer) but has much better performance, but because of deployment issues, we can only use Java.

A: 

Another way to do this would be to sample the data when queue gets too big, and save the sampling rate so we could simulate the original data.

TiansHUo
+1  A: 

If you receive 3000/sec but only process 100/sec sooner or later you will run out of memory. May I suggest you use more threads to do the parsing?

Concerning the queue, have a look at LinkedBlockingDeque and LinkedBlockingQueue. There are both high-performance thread-safe queue implementations.

idrosid
Thanks for the quick answer, but a stupid question, how do I parse one queue using multiple threads, I don't understand that.Another problem is that I am running out of memory in matter of seconds, this is totally unacceptable...
TiansHUo
I think that this idrosid means that you need to decouple the action of removing items from the queue from the processing of that item. So, you read the item from the queue and pass it off to one of a number of different threads to do the parsing. It allows you to process more than one queue item at a time. Having said that, I think it still will just defer the point at which problems occur rather than eliminate it.
Why does it defer the problem instead of eliminate it?
TiansHUo
I have implemented this, and it works
TiansHUo
+2  A: 

Since data comes in 30 times faster than it is processed you may extend HeapSize using

java -Xms<initial heap size> -Xmx<maximum heap size> if the transmission is finished before your memory is exhausted.

  • Or as youself suggested dump the data to disc and process delayed.
  • Otherwise you would have to optimze your parser
stacker
Yeah, optimizing the parser, that's a good idea, because when the peak happens, the data is often homogeneous, so maybe I could keep statistics of top data and do some aggregation
TiansHUo
@TiansHUo, this is not the answer. It may solve your problem now but you may have more problems in the future when even more data needs to be processed...
bruno conde
huh? Could you clarify what are the problems that will come up when more data needs to be processed. Or could you give a direction on how to improve the algorithm...
TiansHUo
@TiansHUo, Imagine that you have an infinite source of data that needs constantly to be parsed. Increasing the memory won't solve the problem because the producer transfer rate is greater than the consumer's. The BlockingDeque suggested by me and @idrosid is a solution to this "memory leak" because it caches limited data in the queue halting the producer when the limit is reached.
bruno conde
You have mistaken the problem, normally, the consumer rate is bigger than the producer's, but sometimes the producer peaks.Maybe putting data on disk is a way to implement a bigger cache?
TiansHUo
A: 

When you run java, you can use the -Xmx parameter to make more memory available to the virtual machine. For example, java -Xmx512m will allow the VM to allocate up to 512Mb of memory. (The default is fairly small).

But if you're allocating memory and filling up a list with data and never removing it, eventually you're going to run out of memory, no matter which language you're using.

Ash
+1  A: 

If the producer produces more data then the consumer can handle, then the data will start to accumulate and eventually you run into OutOfMemory problems. This will depend on (1) the rate difference between the producer and consumer, (2) the quantity of data you have to process.

I suggest you limit the number of items in the queue. Use a BlockingDeque -> LinkedBlockingDeque to limit the capacity of the queue and block your loop when the limit is reached. This way, the queue acts as a cache to the parser.

bruno conde
but the problem now is that the queue is only 2000 or so and I got a outofmemory exception, I guess there is a memory leak somewhere
TiansHUo
Humm, that really is a small amount of data ... 1.9MB from my calculations .. shouldn't be a problem.
bruno conde
@bruno conde: Are there any (preferably opensource) tools you could suggest to help me see where the real problem is?
TiansHUo
+1  A: 

I guess the problem is that queue is made using a linked-list, and all the pointers must be saved in the heap.

I don't think so. I think that the real problem is the mismatch between the rate at which your system gets input and the rate that it can process it. Unless you can process at at least the average input rate, you will eventually run out of memory, no matter how you represent the queue.

You must either improve the processing rate, reduce the input rate or ... drop data.

Stephen C