views:

468

answers:

7

I need to gather some statistics in my software and i am trying to make it fast and correct, which is not easy (for me!)

first my code so far with two classes, a StatsService and a StatsHarvester

public class StatsService
{
private Map<String, Long>   stats   = new HashMap<String, Long>(1000);

public void notify ( String key )
{
    Long value = 1l;
    synchronized (stats)
    {
        if (stats.containsKey(key))
        {
            value = stats.get(key) + 1;
        }
        stats.put(key, value);
    }
}

public Map<String, Long> getStats ( )
{
    Map<String, Long> copy;
    synchronized (stats)
    {
        copy = new HashMap<String, Long>(stats);
        stats.clear();
    }
    return copy;
}
}

this is my second class, a harvester which collects the stats from time to time and writes them to a database.

public class StatsHarvester implements Runnable
{
private StatsService    statsService;
private Thread          t;

public void init ( )
{
    t = new Thread(this);
    t.start();
}

public synchronized void run ( )
{
    while (true)
    {
        try
        {
            wait(5 * 60 * 1000); // 5 minutes
            collectAndSave();
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
    }
}

private void collectAndSave ( )
{
    Map<String, Long> stats = statsService.getStats();
    // do something like:
    // saveRecords(stats);
}
}

At runtime it will have about 30 concurrent running threads each calling notify(key) about 100 times. Only one StatsHarvester is calling statsService.getStats()

So i have many writers and only one reader. it would be nice to have accurate stats but i don't care if some records are lost on high concurrency.

The reader should run every 5 Minutes or whatever is reasonable.

Writing should be as fast as possible. Reading should be fast but if it locks for about 300ms every 5 minutes, its fine.

I've read many docs (Java concurrency in practice, effective java and so on), but i have the strong feeling that i need your advice to get it right.

I hope i stated my problem clear and short enough to get valuable help.


EDIT

Thanks to all for your detailed and helpful answers. As i expected there is more than one way to do it.

I tested most of your proposals (those i understood) and uploaded a test project to google code for further reference (maven project)

http://code.google.com/p/javastats/

I have tested different implementations of my StatsService

  • HashMapStatsService (HMSS)
  • ConcurrentHashMapStatsService (CHMSS)
  • LinkedQueueStatsService (LQSS)
  • GoogleStatsService (GSS)
  • ExecutorConcurrentHashMapStatsService (ECHMSS)
  • ExecutorHashMapStatsService (EHMSS)

and i tested them with x number of Threads each calling notify y times, results are in ms

         10,100   10,1000  10,5000  50,100   50,1000  50,5000  100,100  100,1000 100,5000 
GSS       1        5        17       7        21       117      7        37       254       Summe: 466
ECHMSS    1        6        21       5        32       132      8        54       249       Summe: 508
HMSS      1        8        45       8        52       233      11       103      449       Summe: 910
EHMSS     1        5        24       7        31       113      8        67       235       Summe: 491
CHMSS     1        2        9        3        11       40       7        26       72        Summe: 171
LQSS      0        3        11       3        16       56       6        27       144       Summe: 266

At this moment i think i will use ConcurrentHashMap, as it offers good performance while it is quite easy to understand.

Thanks for all your input! Janning

+5  A: 

Why don't you use java.util.concurrent.ConcurrentHashMap<K, V>? It handles everything internally avoiding useless locks on the map and saving you a lot of work: you won't have to care about synchronizations on get and put..

From the documentation:

A hash table supporting full concurrency of retrievals and adjustable expected concurrency for updates. This class obeys the same functional specification as Hashtable, and includes versions of methods corresponding to each method of Hashtable. However, even though all operations are thread-safe, retrieval operations do not entail locking, and there is not any support for locking the entire table in a way that prevents all access.

You can specify its concurrency level:

The allowed concurrency among update operations is guided by the optional concurrencyLevel constructor argument (default 16), which is used as a hint for internal sizing. The table is internally partitioned to try to permit the indicated number of concurrent updates without contention. Because placement in hash tables is essentially random, the actual concurrency will vary. Ideally, you should choose a value to accommodate as many threads as will ever concurrently modify the table. Using a significantly higher value than you need can waste space and time, and a significantly lower value can lead to thread contention. But overestimates and underestimates within an order of magnitude do not usually have much noticeable impact. A value of one is appropriate when it is known that only one thread will modify and all others will only read. Also, resizing this or any other kind of hash table is a relatively slow operation, so, when possible, it is a good idea to provide estimates of expected table sizes in constructors.

As suggested in comments read carefully the documentation of ConcurrentHashMap, especially when it states about atomic or not atomic operations.

To have the guarantee of atomicity you should consider which operations are atomic, from ConcurrentMap interface you will know that:

V putIfAbsent(K key, V value)
V replace(K key, V value)
boolean replace(K key,V oldValue, V newValue)
boolean remove(Object key, Object value)

can be used safely.

Jack
It's worth noting that this approach could very easily lead to lost updates.
Ben Lings
According to __Map__ contract it shouldn't allow lost updates, assuming also that __putIfAbsent__ is executed atomically.
Jack
To ensure that, you'll need to loop until `replace(key, currentValue, currentValue+1)` returns true.
Ben Lings
@Jack you might want to consider mentioning `putIfAbsent` in your main post. While the individual operations of ConcurrentHashMap are thread-safe, compound operations naturally are not.
matt b
In addition AtomicLong / AtomicInteger should be considered as the map values, this would eliminate the issue of lost increments (except potentially on the initial populations, unless the keys are all pre-populated or a call to putIfAbsent is included as pointed out by matt b.).
M. Jessup
A: 

If we ignore the harvesting part and focus on the writing, the main bottleneck of the program is that the stats are locked at a very coarse level of granularity. If two threads want to update different keys, they must wait.

If you know the set of keys in advance, and can preinitialize the map so that by the time an update thread arrives the key is guaranteed to exist, you would be able to do locking on the accumulator variable instead of the whole map, or use a thread-safe accumulator object.

Instead of implementing this yourself, there are map implementations that are designed specifically for concurrency and do this more fine-grained locking for you.

One caveat though are the stats, since you would need to get locks on all the accumulators at roughly the same time. If you use an existing concurrency-friendly map, there might be a construct for getting a snapshot.

Uri
+3  A: 

I would suggest taking a look at Java's util.concurrent library. I think you can implement this solution a lot cleaner. I don't think you need a map here at all. I would recommend implementing this using the ConcurrentLinkedQueue. Each 'producer' can freely write to this queue without worrying about others. It can put an object on the queue with the data for its statistics.

The harvester can consume the queue continually pulling data off and processsing it. It can then store it however it needs.

Chris Dail
+5  A: 

As jack was eluding to you can use the java.util.concurrent library which includes a ConcurrentHashMap and AtomicLong. You can put the AtomicLong in if absent else, you can increment the value. Since AtomicLong is thread safe you will be able to increment the variable without worry about a concurrency issue.

    ConcurrentMap<String, AtomicLong> stats = new ConcurrentHashMap<String, AtomicLong>();

public void notify(String key) {
    AtomicLong value = stats.get(key);
       if (value==null){ 
           value = stats.putIfAbsent(key, new AtomicLong(1); 
       } 
       if(value!=null){
           value.incrementAndGet();        
       }
   }

This should be both fast and thread safe

Edit: Refactored sligthly so there is only at most two lookups.

John V.
@BalusC: The docs say it returns "the previous value associated with the specified key, or null if there was no mapping for the key". Your one liner will throw a NullPointerException the first time a particular key is notified.
Ben Lings
@Ben: Yes, you're right (removed wrong comment). Still, this approach is a bit inefficient. I'd rewrite it like follows: `AtomicLong value = stats.get(key); if (value == null) { value = new AtomicLong(0); stats.put(key, value); } value.incrementAndGet();`. Now the map is scanned at least only once instead of at least two times.
BalusC
@BalausC thats not atomic though. What if stats.put replaces a put by another thread in between your if and stats.put?
John V.
Interesting, thanks for the link. Using `putIfAbsent()` is more a matter of threadsafety, not efficiency.
BalusC
@Mattias you are doing a global lock on all writes then, you will punish the ConcurrentHashMap's throughput. The ConcurrentHashMap was written specifically to allow for concurrent writes on non colliding entries. In best case scenario my solution will only block a single entry once per key.
John V.
According to javadoc, you don't need the first call to stats.get or the first null check. putIfAbsent will either return the previous mapped value (in which case you should increment), or null if there was no previous value, in which case the provided AtomicLong(1) was inserted.
Sean Reilly
Yes however, continously calling putIfAbsent is not the correct way of using ConcurrentHashMap. For every single putIfAbsent you will lock a single entry that is associated to the hashCode. You are doing a needless lock where as if you succeed with the initial get you will never block.
John V.
+1  A: 

Have you looked into ScheduledThreadPoolExecutor? You could use that to schedule your writers, which could all write to a concurrent collection, such as the ConcurrentLinkedQueue mentioned by @Chris Dail. You can have a separately schedule job to read from the Queue as necessary, and the Java SDK should handle pretty much all your concurrency concerns, no manual locking needed.

Hank Gay
+2  A: 

Chris Dail's answer looks like a good approach.

Another alternative would be to use a concurrent Multiset. There is one in the Google Collections library. You could use this as follows:

private Multiset<String> stats = ConcurrentHashMultiset.create();

public void notify ( String key )
{
    stats.add(key, 1);
}

Looking at the source, this is implemented using a ConcurrentHashMap and using putIfAbsent and the three-argument version of replace to detect concurrent modifications and retry.

Ben Lings
+1  A: 

A different approach to the problem is to exploit the (trivial) thread safety via thread confinement. Basically create a single background thread that takes care of both reading and writing. It has a pretty good characteristics in terms of scalability and simplicity.

The idea is that instead of all the threads trying to update the data directly, they produce an "update" task for the background thread to process. The same thread can also do the read task, assuming some lags in processing updates is tolerable.

This design is pretty nice because the threads will no longer have to compete for a lock to update data, and since the map is confined to a single thread you can simply use a plain HashMap to do get/put, etc. In terms of implementation, it would mean creating a single threaded executor, and submitting write tasks which may also perform the optional "collectAndSave" operation.

A sketch of code may look like the following:

public class StatsService {
    private ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Map<String,Long> stats = new HashMap<String,Long>();

    public void notify(final String key) {
        Runnable r = new Runnable() {
            public void run() {
                Long value = stats.get(key);
                if (value == null) {
                    value = 1L;
                } else {
                    value++;
                }
                stats.put(key, value);
                // do the optional collectAndSave periodically
                if (timeToDoCollectAndSave()) {
                    collectAndSave();
                }
            }
        };
        executor.execute(r);
    }
}

There is a BlockingQueue associated with an executor, and each thread that produces a task for the StatsService uses the BlockingQueue. The key point is this: the locking duration for this operation should be much shorter than the locking duration in the original code, so the contention should be much less. Overall it should result in a much better throughput and latency.

Another benefit is that since only one thread reads and writes to the map, plain HashMap and primitive long type can be used (no ConcurrentHashMap or atomic types involved). This also simplifies the code that actually processes it a great deal.

Hope it helps.

sjlee
I do not understand your post. Sorry, but aren't you shifting the problem to another place. When you say "produce an update task" you have to put that in a map or queue whatever data structure you like. And you have to synchronize this.
Janning
Hope my edits and code samples made some of the points clearer...
sjlee
yes much clearer. thanks a lot. i upvoted your answer. I am going to test your approach but i think The BlockingQueue must be synchronized in some way. So you move the synchronization from StatsService to the ExecutorService. But i will check it and edit my post to make my results available.
Janning
-1. This is a very inefficient solution. Also you are missing a `put` after the increment.
finnw
@finnw Thanks for pointing out the error in the code. I corrected it.As for the efficacy of the solution, (along with most things in life) it depends. When compared against using a ConcurrentHashMap (thus no synchronization being done), it does not perform better. However, if the original solution involves synchronization, this is a very useful alternative. Essentially, it's an asynchronous solution to the problem. Obviously it minimizes the latency from the viewpoint of the threads that generate these tasks. Second, it eliminates lock contention associated with the locking involved.
sjlee