As part of the requirement we need to process nearly 3 million records and associate them with a bucket. This association is decided on a set of rules (comprising of 5-15 attributes, with single or range of values and precedence) which derive the bucket for a record. Sequential processing of such a big number is clearly out of scope. Can someone guide us on the approach to effectively design a solution?
I'm not quite sure what you're after but here's a blog post about how the New York Times used Apache Hadoop Project to process a large volume of data.
3 million records isn't really that much from a volume-of-data point of view (depending on record size, obviously), so I'd suggest that the easiest thing to try is parallelising the processing across multiple threads (using the java.util.concurrent.Executor framework). As long as you have multiple CPU cores available, you should be able to get near-linear performance increases.
It depends on the data source. If it is a single database, you will spend most of the time retrieving the data anyway. If it is in a local file, then you can partition the data into smaller files or you can pad the records to have equal size - this allows random access to a batch of records.
If you have a multi-core machine, the partitioned data can be processed in parallel. If you determined the record-bucket assignment, you can write back the information into the database using the PreparedStatement's batch capability.
If you have only a single core machine, you can still achieve some performance improvements by designing a data retrieval - data processing - batch writeback separation to take advantage of the pause times of the I/O operations.
Is there a reason that you have to use Java to process the data? Couldn't you use SQL queries to write to intermediate fields? You could build upon each field -- attributes -- until you have everything in the bucket you need.
Or you could use a hybrid of SQL and java... Use different procedures to get different "buckets" of information and then send that down one thread path for more detailed processing and another query to get another set of data and send that down a different thread path...
This goes the same for most projects where you need to process large amounts of information. I am going to assume that each record is the same, e.g. you process it the same way each time, which would be the point you can spawn a separate thread to do the processing.
The second obvious point is where you are fetching your information, this case you mentioned a database, but really that is pretty irrelevant. You want to separate your I/O and processing elements in your code to separate threads (or more likely, a pool of executors for the processing).
Try to make each as independent as possible, and remember to use locking when necessary. Here are some links that you may want to read up on.
http://www.ibm.com/developerworks/library/j-thread.html
http://www.ibm.com/developerworks/java/library/j-threads1.html
http://www.devarticles.com/c/a/Java/Multithreading-in-Java/
Effective design steps for this scenario consist of first, determining any and all places where you can partition the records to be processed to allow full-engine parallelization (i.e., four units running against 750k records each is comparatively cheap). Then, depending upon the cost of the rules that summarize your record (I am viewing assignment of a bucket as a summarization operation), determine if your operation is going to be CPU bound or record retrieval bound.
If you're CPU bound, increasing the partitioning is your best performance gain. If you're IO bound, rule processing worker threads that can work in parallel in response to chunked data retrieval is a better-performing design.
All of this assumes that your rules will not result in state which needs to be tracked between records. Such a scenario deeply threatens the parallelization approach. If parallelization is not a tractable solution because of cumulative state being a component of the rule set, then your best solution may in fact be sequential processing of individual records.
Sequential processing of such a big number is clearly out of scope.
I don't think you know that. How long does it take to process 1,000 records in this way? 10,000? 100,000? 1,000,000? If the answer is really "too long," then fine: start to look for optimizations. But you might find the answer is "insignificant," and then you're done.
Other answers have alluded to this, but it's my entire answer. Prove that you have a problem before you start optimizing. Then you've at least got a simple, correct system to profile and against which to compare optimized answers.
As a meaningless benchmark, we have a system that has a internal cache. We're currently loading 500K rows. For each row we generate statistics, place keys in different caches, etc. Currently this takes < 20s for us to process.
It's a meaningless benchmark, but it is an instance that, depending on the circumstances, 3M rows is not a lot of rows on todays hardware.
That said.
As others have suggested, break the job up in to pieces, and parallelize the runs, 1-2 threads per core. Each thread maintains their own local data structures and state, and at the end, the master process consolidates the results. This is a crude "map/reduce" algorithm. The key here is to ensure that the threads aren't fighting over global resources like global counters, etc. Let the final processing of the thread results deal with those serially.
You can use more than one thread per core if each thread is doing DB IO, since no single thread will be purely CPU bound. Simply run the process several times with different thread counts until it comes out fastest.
We've seen 50% speed ups even when we run batches through a persistent queueing system like JMS to distribute the work vs linear processing, and I've seen these gains on 2 core laptop computers, so there is definite room for progress here.
Another thing if possible is don't do ANY disk IO (save reading the data from the DB) until the very end. At that point you have a lot more opportunity to batch any updates that need to be made so you can, at least, cut down on network round trip times. Even if you had to update every single row, large batches of SQL will still show net gains in performance. Obviously this can be memory intensive. Thankfully, most modern systems have a lot of memory.
Thank you so much for splendid responses. Sorry for my late intervention. I will try to reply to everyone of you. So stay put...
@yx: Can you please explain what you are looking for a bit more? I don't see how you can possibly processes the records without iterating through it in some way. Or are you looking for some multi threaded solutions?
Yes you are right I am looking forward to some multi-threading kind of an approach fo processing such a large number. We have two hetrogeneous data sets, say A and B. Objects of type A and B both have some attributes in come. Now, in order to associate one of the type in B to A, we compare some attributes of A with that of B and conclude. So its pseudo code looks like:
for each objTypeA in A do
for each objTypeB in B do
if (objTypeA.attrib1 == objTypeB.attrib1)
{
}
if (objTypeA.attrib2 == objTypeB.attrib2)
{
}
...
Step thru this process for each objTypeB in B till you find the right type for B. So for deriving a bucket we need to analyze each of the objects in B. This process happens for each object in A.
I did a small POC and observed that if A (3M records) and B (3000 Records) are processed in threads with equal count of chunks of A types. Say each chunk is of 5000 records. So its nearly 600 threads. The process takes nearly 15mins to complete. As of now we are thinking of making a single database call to get the data for A and B in two cursors and then map them to List in java. Once the bucket derivation process is over, the A type records are persisted back in Database.
I hope this makes the understanding of the problem more clear. I need to reduce this 15min of time to say 5mins max.
@Dave Webb: Are you talking about processing 3 million records once or on a daily/weekyly/monthly basis?
As per business, we would be getting the data to be processed on daily basis. Hence, we need to run this engine on daily basis.
@skaffman: As long as you have multiple CPU cores available, you should be able to get near-linear performance increases.
We are going to have just one CPU core. We basically want to optimize the design so as to increase the throughput.
@kd304: If it is a single database, you will spend most of the time retrieving the data anyway. If it is in a local file, then you can partition the data into smaller files or you can pad the records to have equal size - this allows random access to a batch of records.
Correct. Its a batch process only. This processing will happen on server side. There is a proprietary framework on top it with in which this small application is going to run. Volume based chunking of records and so forth.. happens automatically :)
@Frank: Is there a reason that you have to use Java to process the data? Couldn't you use SQL queries to write to intermediate fields? You could build upon each field -- attributes -- until you have everything in the bucket you need.
Or you could use a hybrid of SQL and java... Use different procedures to get different "buckets" of information and then send that down one thread path for more detailed processing and another query to get another set of data and send that down a different thread path...
Yes, there is a reason to it why we want to do this process on java side. Actually this whole system is dependent on feeds coming from two external systems. These feeds come on different days and due to business drivers we cannot completely rely on database processing using Informatica or Pl/SQL.
@Tetsujin no Oni: If you're CPU bound, increasing the partitioning is your best performance gain. If you're IO bound, rule processing worker threads that can work in parallel in response to chunked data retrieval is a better-performing design.
All this has been taken care of. Thanks!
@Will Hartung: You can use more than one thread per core if each thread is doing DB IO, since no single thread will be purely CPU bound. Simply run the process several times with different thread counts until it comes out fastest.
We've seen 50% speed ups even when we run batches through a persistent queueing system like JMS to distribute the work vs linear processing, and I've seen these gains on 2 core laptop computers, so there is definite room for progress here.
Perfect match. We have a framework which addresses this too. But the processing which is O(n^2) i.e. 3M x 3K is too huge...right? How to optimize the volume of comparisons to achieve it?
Based on the revised description, I think I'd try and look at sorting the data.
Sorting can be an n*log(n) process; and if most of the comparisons are for direct equality on sortable fields, this should yield a total complexity of ~O(n*log(n)). Theoretically. If after assigning an item to a bucket it's no longer needed, just remove it from the list of data.
Even if the data needed to be resorted a few times for various steps in the logic, it should still be a bit faster then then n^2 approach.
Basically, this would involve preprocessing the data to make it easier for actual processing.
This makes certain assumptions about the logic of bucket assigning (nameley that it's not too far from the psuedo code provided); and would be invalid if you needed to extract data from every pair of A,B.
Hope this helps.
Edit: I would comment if I could; but, alas, I am too new. Preprocessing applies as much to the data as it does to the individual categories. Ultimately all you need to do to go from a 15 minute compute time to a 5 minute compute time is to be able to programmatically determine 2/3s+ of the categories that cannot and will never match... in less then O(n) amortized time. Which might not be applicable to your specific situation, I admit.
@CoderTao: Sorting can be an n*log(n) process; and if most of the comparisons are for direct equality on sortable fields, this should yield a total complexity of O(n*log(n)+n). Theoretically. If after assigning an item to a bucket it's no longer needed, just remove it from the list of data
No, we can't delete the items after assigning it to a bucket. Its a real data and has to be stored back in the database after processing. Also, we can't sort the data on some parameter bcoz, both of them are two different entities. Its just that we are trying to relate them thru some parameters common in them. They may be compared with equality or should be in some range values.
Basically, this would involve preprocessing the data to make it easier for actual processing.
You have a point, but we could not categorize the data before any processing.
@CoderTao: No problem. Thanks!!! I am thinking if the rules could be optimized in some way to minimize the processing.
I would make efforts to push back with the specification author to focus more on 'what' needs to be done, rather than how. I can't imagine why a specifcation would push'java' for a data intensive operation. If it has to do with data, do it with SQL. If your using Oracle there is a function called nTile. So creating a fixed set of buckets is as trivial as:
select ntile(4)over(order by empno) grp, empno, ename from emp
Which results in:
GRP EMPNO ENAME
--- ----- ---------
1 7369 SMITH
1 7499 ALLEN
1 7521 WARD
1 7566 JONES
2 7654 MARTIN
2 7698 BLAKE
2 7782 CLARK
2 7788 SCOTT
3 7839 KING
3 7844 TURNER
3 7876 ADAMS
4 7900 JAMES
4 7902 FORD
4 7934 MILLER
At minimum you could at least establish your 'buckets' in SQL, then your Java Code would just need to process a given bucket.
Worker worker = new Worker(bucketID);
worker.doWork();
If you don't care about the number of buckets (the example above was asking for 4 buckets) tbut rather a fixed size of each bucket (5 records per bucket) then the SQL is:
select ceil(row_number()over(order by empno)/5.0) grp,
empno,
ename
from emp
Output:
GRP EMPNO ENAME
--- ---------- -------
1 7369 SMITH
1 7499 ALLEN
1 7521 WARD
1 7566 JONES
1 7654 MARTIN
2 7698 BLAKE
2 7782 CLARK
2 7788 SCOTT
2 7839 KING
2 7844 TURNER
3 7876 ADAMS
3 7900 JAMES
3 7902 FORD
3 7934 MILLER
Both examples above come from the terrific book: SQL Cookbook, 1st Edition by Anthony Molinaro
@Brian: I agree, but we cannot do so. What you demonstrated is absolutely correct. Let me add one more layer of complexity to make things more in sync. In my situation the feeds for A type data is received on T+1 day and B type data on T+2 day. Database is already doing some processing of consolidating and decorating the data for further process. Since the process of deriving the buckets involves to many rules, we cannot delegate that responsibility to database.
Again, I would like to re-iterate, I just want to re-factor the rules part, where the bucket is being derived. Please remember this.