views:

54

answers:

4

Hey,

I'm writing some client-server-application where I have to deal with multiple threads. I've got some servers, that send alive-packets every few seconds. Those servers are maintained in a ConcurrentHashMap, that contains their EndPoints paired with the time the last alive-package arrived of the respective server.

Now I've got a thread, that has to "sort out" all the servers that haven't sent alive-packets for a specific amount of time.

I guess I can't just do it like that, can I?

for( IPEndPoint server : this.fileservers.keySet() )
{
    Long time = this.fileservers.get( server );

    //If server's time is updated here, I got a problem

    if( time > fileserverTimeout )
        this.fileservers.remove( server );
}

Is there a way I can get around that without aquiring a lock for the whole loop (that I then have to respect in the other threads as well)?

A: 

Firstly make map synchronized

this.fileservers = Collections.synchronizedMap(Map)

then use the strategy which is used in Singleton classes

if( time > fileserverTimeout )
    {
        synchronized(this.fileservers)
        {
             if( time > fileserverTimeout )
                  this.fileservers.remove( server );
        }
    }

Now this makes sure that once you inside the synchronized block, no updates can occur. This is so because once the lock on the map is taken, map(synchronized wrapper) will not have itself available to provide a thread lock on it for update, remove etc.

Checking for time twice makes sure that synchronization is used only when there is a genuine case of delete

Gaurav Saxena
Synchronizing a concurrent hashmap is at least questionable.
Willi
Agreed @Willi, and FindBugs reliably reminds one of that, lest one forget...
andersoj
A: 

(See Roland's answer, which takes the ideas here and fleshes them out into a fuller example, with some great additional insights.)

Since it's a concurrent hash map, you can do the following. Note that CHM's iterators all implement the optional methods, including remove(), which you want. See CHM API docs, which states:

This class and its views and iterators implement all of the optional methods of the Map and Iterator interfaces.

This code should work (I don't know the type of the Key in your CHM):

ConcurrentHashMap<K,Long> fileservers = ...;

for(Iterator<Map.Entry<K,Long>> fsIter = fileservers.entrySet().iterator(); fileservers.hasNext(); )
{
    Map.Entry<K,Long> thisEntry = fsIter.next();
    Long time = thisEntry.getValue();

    if( time > fileserverTimeout )
        fsIter.remove( server );
}

But note that there may be race conditions elsewhere... You need to make sure that other bits of code accessing the map can cope with this kind of spontaneous removal -- i.e., probably whereever you touch fileservers.put() you'll need a bit of logic involving fileservers.putIfAbsent(). This solution is less likely to create bottlenecks than using synchronized, but it also requires a bit more thought.

Where you wrote "If server's time is updated here, I got a problem" is exactly where putIfAbsent() comes in. If the entry is absent, either you hadn't seen it before, or you just recently dropped it from the table. If the two sides of this need to be coordinated, then you may instead want to introduce a lockable record for the entry, and carry out the synchronization at that level (i.e., sync on the record while doing remove(), rather than on the whole table). Then the put() end of things can also sync on the same record, eliminating a potential race.

andersoj
+1  A: 

I don't see how another thread can update the server's time at that point in your code. Once you've retrieved the time of a server from the map using this.fileservers.get( server ), another thread cannot change its value as Long objects are immutable. Yes, another thread can put a new Long object for that server into the map, but that doesn't affect this thread, because it has already retrieved the time of the server.

So as it stands I can't see anything wrong with your code. The iterators in a ConcurrentHashMap are weakly consistent which means they can tolerate concurrent modification, so there is no risk of a ConcurrentModificationException being thrown either.

dogbane
+1  A: 

There is probably no problem here, depending on what exactly you store in the map. Your code looks a little weird to me, since you seem to save "the duration for which the server hasn't been active".

My first idea for recording that data was to store "the latest timestamp at which the server has been active". Then your code would look like this:

package so3950354;

import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;

public class ServerManager {

  private final ConcurrentMap<Server, Long> lastActive = new ConcurrentHashMap<Server, Long>();

  /** May be overridden by a special method for testing. */
  protected long now() {
    return System.currentTimeMillis();
  }

  public void markActive(Server server) {
    lastActive.put(server, Long.valueOf(now()));
  }

  public void removeInactive(long timeoutMillis) {
    final long now = now();

    Iterator<Map.Entry<Server, Long>> it = lastActive.entrySet().iterator();
    while (it.hasNext()) {
      final Map.Entry<Server, Long> entry = it.next();
      final long backThen = entry.getValue().longValue();
      /*
       * Even if some other code updates the timestamp of this server now,
       * the server had timed out at some point in time, so it may be
       * removed. It's bad luck, but impossible to avoid.
       */
      if (now - backThen >= timeoutMillis) {
        it.remove();
      }
    }
  }

  static class Server {

  }
}

If you really want to avoid that no code ever calls markActive during a call to removeInactive, there is no way around explicit locking. What you probably want is:

  • concurrent calls to markActive are allowed.
  • during markActive no calls to removeInactive are allowed.
  • during removeInactive no calls to markActive are allowed.

This looks like a typical scenario for a ReadWriteLock, where markActive is the "reading" operation and removeInactive is the "writing" Operation.

Roland Illig
Great! Fuller development than I put together... +1
andersoj