views:

1917

answers:

7

Encountered a situation when ThreadPoolExecutor is parked in execute(Runnable) function while all the ThreadPool threads are waiting in getTask func, workQueue is empty.

Does anybody have any ideas?

The ThreadPoolExecutor is created with ArrayBlockingQueue, corePoolSize == maximumPoolSize = 4

[Edit] To be more precise, the thread is blocked in ThreadPoolExecutor.exec(Runnable command) func. It has the task to execute, but doesn't do it.

[Edit2] The executor is blocked somewhere inside the working queue (ArrayBlockingQueue).

[Edit3] The callstack:

thread = front_end(224)
at sun.misc.Unsafe.park(Native methord)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114)
at
java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653)
at net.listenThread.WorkersPool.execute(WorkersPool.java:45)

at the same time the workQueue is empty (checked using remote debug)

[Edit4] Code working with ThreadPoolExecutor:

public WorkersPool(int size) {
  pool = new ThreadPoolExecutor(size, size, IDLE_WORKER_THREAD_TIMEOUT, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(WORK_QUEUE_CAPACITY),
    new ThreadFactory() {
      @NotNull
      private final AtomicInteger threadsCount = new AtomicInteger(0);

      @NotNull
      public Thread newThread(@NotNull Runnable r) {
        final Thread thread = new Thread(r);
        thread.setName("net_worker_" + threadsCount.incrementAndGet());
        return thread;
      }
    },

    new RejectedExecutionHandler() {
      public void rejectedExecution(@Nullable Runnable r, @Nullable ThreadPoolExecutor executor) {
        Verify.warning("new task " + r + " is discarded");
      }
    });
}

public void execute(@NotNull Runnable task) {
  pool.execute(task);
}

public void stopWorkers() throws WorkersTerminationFailedException {
  pool.shutdownNow();
  try {
    pool.awaitTermination(THREAD_TERMINATION_WAIT_TIME, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    throw new WorkersTerminationFailedException("Workers-pool termination failed", e);
  }
}

}

A: 

I don't see any locking in the code of ThreadPoolExecutor's execute(Runnable). The only variable there is the workQueue. What sort of BlockingQueue did you provide to your ThreadPoolExecutor?

On the topic of deadlocks:

You can confirm this is a deadlock by examining the Full Thread Dump, as provided by <ctrl><break> on Windows or kill -QUIT on UNIX systems.

Once you have that data, you can examine the threads. Here is a pertinent excerpt from Sun's article on examining thread dumps (suggested reading):

For hanging, deadlocked or frozen programs: If you think your program is hanging, generate a stack trace and examine the threads in states MW or CW. If the program is deadlocked then some of the system threads will probably show up as the current threads, because there is nothing else for the JVM to do.

On a lighter note: if you are running in an IDE, can you ensure that there are no breakpoints enabled in these methods.

akf
As I wrote in my question, ArrayBlockingQueue is used. And it's empty. Yes, the thread is blocking somewhere in the working queue.
Vitaly
I used remoted debug. Edited the question - callstack added.
Vitaly
You can also check for deadlocks using JConsole
pjp
ah, yes, I see the ArrayBlockingQueue now. Two points: 1. on the call stack, the line above the stack that identifies the thread would also be useful, as it gives an indication of the thread condition, and 2. some code would be good to see how you are accessing the TPE and if you are accessing the ArrayBlockingQueue outside of the TPE as well.
akf
Edited callStack (but I'm not sure it's what your meant).Added the code - it's simple.By the way, it seems nobody encountered the problem. Already thinking about writing own simple pool :((
Vitaly
A: 

As someone already mentioned, this sounds like normal behaviour, the ThreadPoolExecutor is just waiting to do some work. If you want to stop it, you need to call:

executor.shutdown()

to get it to terminate, usually followed by a executor.awaitTermination

jamie mccrindle
Edited the question.
Vitaly
A: 

Seems a JDK (or Linux or SMP hardware, what else...) nasty bug! Here's my stack trace from kill -QUIT, grepped for 0x00002aab58766f50 - lock ID:

"pool-1-thread-78303" prio=10 tid=0x00002aae5806f800 nid=0x68a5 waiting on condition [0x00002aad6ea26000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00002aab58766f50> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1971)
       at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:342)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
       at java.lang.Thread.run(Thread.java:619)

"pool-1-thread-78302" prio=10 tid=0x00002aae584de000 nid=0x68a4 waiting on condition [0x00002aae31797000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00002aab58766f50> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1971)
       at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:342)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
       at java.lang.Thread.run(Thread.java:619)

"pool-1-thread-78301" prio=10 tid=0x00002aae584dd800 nid=0x68a3 waiting on condition [0x00002aad5bf32000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00002aab58766f50> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1971)
       at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:342)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
       at java.lang.Thread.run(Thread.java:619)

"pool-1-thread-78299" prio=10 tid=0x00002aae58135000 nid=0x68a1 waiting on condition [0x00002aad35813000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00002aab58766f50> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1971)
       at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:342)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
       at java.lang.Thread.run(Thread.java:619)

"pool-1-thread-78296" prio=10 tid=0x00002aae584cb800 nid=0x689e waiting on condition [0x00002aae24301000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00002aab58766f50> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:1971)
       at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:342)
       at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:907)
       at java.lang.Thread.run(Thread.java:619)

"Memcached IO over {MemcachedConnection to dc155.4shared.com/72.233.72.153:11211}" prio=10 tid=0x000000005a8ac000 nid=0x4587 waiting on condition [0x0000000040209000]
  java.lang.Thread.State: WAITING (parking)
       at sun.misc.Unsafe.park(Native Method)
       - parking to wait for  <0x00002aab58766f50> (a java.util.concurrent.locks.ReentrantLock$NonfairSync)
       at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:747)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(AbstractQueuedSynchronizer.java:778)
       at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(AbstractQueuedSynchronizer.java:1114)
       at java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(ReentrantLock.java:186)
       at java.util.concurrent.locks.ReentrantLock.lock(ReentrantLock.java:262)
       at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224)
       at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653)
       at net.spy.memcached.transcoders.TranscodeService.decode(TranscodeService.java:41)
       at net.spy.memcached.MemcachedClient$4.gotData(MemcachedClient.java:722)
       at net.spy.memcached.protocol.GetCallbackWrapper.gotData(GetCallbackWrapper.java:29)
       at net.spy.memcached.protocol.ProxyCallback.gotData(ProxyCallback.java:42)
       at net.spy.memcached.protocol.ascii.BaseGetOpImpl.handleRead(BaseGetOpImpl.java:93)
       at net.spy.memcached.protocol.ascii.OperationImpl.readFromBuffer(OperationImpl.java:108)
       at net.spy.memcached.MemcachedConnection.handleReads(MemcachedConnection.java:362)
       at net.spy.memcached.MemcachedConnection.handleIO(MemcachedConnection.java:306)
       at net.spy.memcached.MemcachedConnection.handleIO(MemcachedConnection.java:193)
       at net.spy.memcached.MemcachedClient.run(MemcachedClient.java:1458)
Andriy
A: 
5 threads in: at java.util.concurrent.ArrayBlockingQueue.poll(ArrayBlockingQueue.java:342)
              at java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:945)

1 thread  in: at java.util.concurrent.ArrayBlockingQueue.offer(ArrayBlockingQueue.java:224)
              at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:653)

and everything's deadlocked! Isn't that a bug?

Andriy
A: 

hi there,

What's nature of task being passed to TPE.execute() func.? If task has access to TPE then this might be your problem.

artemv
A: 

The library code source is below (that's in fact a class from http://spymemcached.googlecode.com/files/memcached-2.4.2-sources.zip),
- a bit complicated - added protection against repeated calls of FutureTask if I'm not mistaken - but doesn't seem like deadlock prone - very simple ThreadPool usage:

package net.spy.memcached.transcoders;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;

import net.spy.memcached.CachedData;
import net.spy.memcached.compat.SpyObject;

/**
 * Asynchronous transcoder.
 */
public class TranscodeService extends SpyObject {

    private final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 10, 60L,
            TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(100),
            new ThreadPoolExecutor.DiscardPolicy());

    /**
     * Perform a decode.
     */
    public <T> Future<T> decode(final Transcoder<T> tc,
            final CachedData cachedData) {

        assert !pool.isShutdown() : "Pool has already shut down.";

        TranscodeService.Task<T> task = new TranscodeService.Task<T>(
                new Callable<T>() {
                    public T call() {
                        return tc.decode(cachedData);
                    }
                });

        if (tc.asyncDecode(cachedData)) {
            this.pool.execute(task);
        }
        return task;
    }

    /**
     * Shut down the pool.
     */
    public void shutdown() {
        pool.shutdown();
    }

    /**
     * Ask whether this service has been shut down.
     */
    public boolean isShutdown() {
        return pool.isShutdown();
    }

    private static class Task<T> extends FutureTask<T> {
        private final AtomicBoolean isRunning = new AtomicBoolean(false);

        public Task(Callable<T> callable) {
            super(callable);
        }

        @Override
        public T get() throws InterruptedException, ExecutionException {
            this.run();
            return super.get();
        }

        @Override
        public T get(long timeout, TimeUnit unit) throws InterruptedException,
                ExecutionException, TimeoutException {
            this.run();
            return super.get(timeout, unit);
        }

        @Override
        public void run() {
            if (this.isRunning.compareAndSet(false, true)) {
                super.run();
            }
        }
    }

}
Andriy
A: 

Definitely strange.

But before writing your own TPE try:

  • another BlockingQueue impl., e.g. LinkedBlockingQueue

  • specify fairness=true in ArrayBlockingQueue, i.e. use new ArrayBlockingQueue(n, true)

From those two opts I would chose second one 'cause it's very strange that offer() being blocked; one reason that comes into mind - thread scheduling policy on your Linux. Just as an assumption.

artemv