views:

408

answers:

2

Edit

This question has gone through a few iterations by now, so feel free to look through the revisions to see some background information on the history and things tried.


I'm using a CompletionService together with an ExecutorService and a Callable, to concurrently call the a number of functions on a few different webservices through CXF generated code.. These services all contribute different information towards a single set of information I'm using for my project. The services however can fail to respond for a prolonged period of time without throwing an exception, prolonging the wait for the combined set of information.

To counter this I'm running all the service calls concurrently, and after a few minutes would like to terminate any of the calls that have not yet finished, and preferably log which ones weren't done yet either from within the callable or by throwing an detailed Exception.

Here's some highly simplified code to illustrate what I'm doing already:

private Callable<List<Feature>> getXXXFeatures(final WiwsPortType port, 
final String accessionCode) {
    return new Callable<List<Feature>>() {
        @Override
        public List<Feature> call() throws Exception {
            List<Feature> features = new ArrayList<Feature>();
            //getXXXFeatures are methods of the WS Proxy
            //that can take anywhere from second to never to return
            for (RawFeature raw : port.getXXXFeatures(accessionCode)) {
                Feature ft = convertFeature(raw);
                features.add(ft);
            }
            if (Thread.currentThread().isInterrupted())
                log.error("XXX was interrupted");
            return features;
        }
    };
}

And the code that concurrently starts the WS calls:

WiwsPortType port = new Wiws().getWiws();
List<Future<List<Feature>>> ftList = new ArrayList<Future<List<Feature>>>();
//Counting wrapper around CompletionService, 
    //so I could implement ccs.hasRemaining()
CountingCompletionService<List<Feature>> ccs = 
        new CountingCompletionService<List<Feature>>(threadpool);
ftList.add(ccs.submit(getXXXFeatures(port, accessionCode)));
ftList.add(ccs.submit(getYYYFeatures(port accessionCode)));
ftList.add(ccs.submit(getZZZFeatures(port, accessionCode)));

List<Feature> allFeatures = new ArrayList<Feature>();
while (ccs.hasRemaining()) {
            //Low for testing, eventually a little more lenient
 Future<List<Feature>> polled = ccs.poll(5, TimeUnit.SECONDS);
 if (polled != null)
  allFeatures.addAll(polled.get());
 else {
  //Still jobs remaining, but unresponsive: Cancel them all
  int jobsCanceled = 0;
  for (Future<List<Feature>> job : ftList)
   if (job.cancel(true))
    jobsCanceled++;
  log.error("Canceled {} feature jobs because they took too long",
                        jobsCanceled);
  break;
 }
}

The problem I'm having with this code is that the Callables aren't actually canceled when waiting for port.getXXXFeatures(...) to return, but somehow keep running. As you can see from the if (Thread.currentThread().isInterrupted()) log.error("XXX was interrupted"); statements the interrupted flag is set after port.getFeatures returns, this is only available after the Webservice call completes normally, instead of it having been interrupted when I called Cancel.

Can anyone tell me what I am doing wrong and how I can stop the running CXF Webservice call after a given time period, and register this information in my application?

Best regards, Tim

+2  A: 

Edit 3 New answer.

I see these options:

  • Post your problem on the Apache CXF as feature request
  • Fix ACXF yourself and expose some features.
  • Look for options for asynchronous WS call support within the Apache CXF
  • Consider switching to a different WS provider (JAX-WS?)
  • Do your WS call yourself using RESTful API if the service supports it (e.g. plain HTTP request with parameters)
  • For über experts only: use true threads/thread group and kill the threads with unorthodox methods.
kd304
Sorry but how would this work? You can't simply 'cancel' the Callable.. You can only cancel the *Future* returned when you submit the Callable.. I can't see how I would be able to get the url from the Callable once it is lost after submitting it to the Executor..
Tim
Oh, sorry, you are right. Will think about it again.
kd304
Rephrased the question as the logging isn't the problem anymore: The problem is the webservice calls not being canceled in the first place! Thanks for the help so far, lets see what new answers this rephrasing brings..
Tim
I will adjust my answer too, so it doesn't attract downvotes.
kd304
+1  A: 

The CXF docs have some instructions for setting the read timeout on the HTTPURLConnection: http://cwiki.apache.org/CXF20DOC/client-http-transport-including-ssl-support.html

That would probably meet your needs. If the server doesn't respond in time, an exception is raised and the callable would get the exception. (except there is a bug where is MAY hang instead. I cannot remember if that was fixed for 2.2.2 or if it's just in the SNAPSHOTS right now.)

Daniel Kulp