tags:

views:

302

answers:

2

Using Scala 2.8 RC1 or newer, what is the best (easiest and/or most direct) method to "peek" at the waiting messages in an actor's mailbox (from within the same actor's act() method) in order to examine what is in the queue, without having to react/receive the messages and/or disturb the current contents of the mailbox in any way.

The purpose of this is so that an actor may determine if it is safe to process a request to exit by first determining if any of the remaining mailbox messages are ones that must be processed, instead of just dropped by stopping the actor immediately.

+3  A: 

This sounds like a dangerous operation in general, since if there are critical messages, the actor processing them might check and find none, but then before exiting might be given another from some other thread.

If you know for certain that this can't happen, and you don't need lots of incredibly fast message switches, I'd probably write a guard-actor that counts and keeps track of critical messages but otherwise just passes them on to another actor for handling.

If you don't want to do that, then keep in mind that the details of the internals should change and you may have to go through the source code for Actor, Reactor, MessageQueue, etc. in order to get what you want. For now, something like this should work (warning, untested):

package scala.actors
package myveryownstuff
trait CriticalActor extends Actor {
  def criticalAwaits(p: Any => Boolean) = {
    synchronized {
      drainSendBuffer(mailbox)
      mailbox.get(0)(p).isDefined
    }
  }
}

Note that we have to place the extended trait in the scala.actors package, because all the mailbox internals are declared private to the scala.actors package. (This is a good warning that you should be careful before messing with the internals.) We then add a new method that takes a function that can test for a critical message and look for it using the built in mailbox.get(n) method that returns the nth message fulfilling some predicate.

Rex Kerr
I definitely do not want to manipulate the internals of the scala.actors package. If it turns out that there is no way to view the messages in the mailbox, without removing/processing them, then I'll have to accept that and design accordingly.Currently, I ask the actor for the count of messages remaining in the mailbox while processing my own ExitMsg. There should be none, if the rest of the application is shutting down the actors (some of which generate messages for the actor in queston) in the correct order. So I want a non-destructive read just to println the message out.
scaling_out
@scaling: `mailbox.get(0)` is a non-destructive read. Also, in Scala 2.8, you can no longer get the mailbox size (i.e. it's an implementation detail that was decided is not fully safe to expose). The guard-actor idea might work better for you.
Rex Kerr
So, the bottom line is that there is no way to peek into the mailbox using the public API? The current mailbox val is private to the actors package.
scaling_out
Yup, that's the bottom line.
Rex Kerr
+3  A: 

You don't need to peek ahead. Just keep track of the fact that an exit has been requested and use a reactWithin(0) to determine when the queue is empty after an exit has been requested.

import scala.actors._

sealed case class Message
case object Exit extends Message
case class Unimportant(n:Int) extends Message
case class Important(n:Int) extends Message

class SafeExitingActor extends Actor {
  def act : Nothing = react {
      case Exit => {
           println("exit requested, clearing the queue")
           exitRequested
      }
      case message => {
           processMessage(message, false)
           act
      }
  }

  // reactWithin(0) gives a TIMEOUT as soon as the mailbox is empty
  def exitRequested : Nothing = reactWithin(0) {
     case Exit => {
         println("extra exit requested, ignoring")
         exitRequested // already know about the exit, keep processing
     }
     case TIMEOUT => {
         println("timeout, queue is empty, shutting down")
         exit // TIMEOUT so nothing more to process, we can shut down
     }
     case message => {
         processMessage(message, true)
         exitRequested
     }
  }

  // process is a separate method to avoid duplicating in act and exitRequested
  def processMessage(message : Any, importantOnly : Boolean) = {
     message match {
       case Unimportant(n) if !importantOnly => println("Unimportant " + n)
       case Unimportant(n) => () // do nothing
       case Important(n) => println("Important! " + n)
     }
     Thread sleep 100 // sleep a little to ensure mailbox backlog
  }
}

object TestProcessing {
  def main(args : Array[String]) {
    val actor = new SafeExitingActor()
    actor.start()
    for (i <- 1 to 10) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    for (i <- 11 to 20) {
        actor ! Unimportant(i)
        actor ! Important(i)
    }
    actor ! Exit
    actor ! Important(100)
  }
}

That should output

Unimportant 1
Important! 1
Unimportant 2
Important! 2
Unimportant 3
Important! 3
Unimportant 4
Important! 4
Unimportant 5
Important! 5
Unimportant 6
Important! 6
Unimportant 7
Important! 7
Unimportant 8
Important! 8
Unimportant 9
Important! 9
Unimportant 10
Important! 10
exit requested, clearing the queue
Important! 11
Important! 12
Important! 13
Important! 14
Important! 15
Important! 16
Important! 17
Important! 18
Important! 19
Important! 20
extra exit requested, ignoring
Important! 100
timeout, queue is empty, shutting down
James Iry
Very interesting approach
scaling_out