views:

500

answers:

2

I'm working with a messaging toolkit (it happens to be Spread but I don't know that the details matter). Receiving messages from this toolkit requires some boilerplate:

  1. Create a connection to the daemon.
  2. Join a group.
  3. Receive one or more messages.
  4. Leave the group.
  5. Disconnect from the daemon.

Following some idioms that I've seen used elsewhere, I was able to cook up some working functions using Spread's Java API and Clojure's interop forms:

(defn connect-to-daemon
  "Open a connection"
  [daemon-spec]
  (let [connection (SpreadConnection.)
        {:keys [host port user]} daemon-spec]
    (doto connection
      (.connect (InetAddress/getByName host) port user false false))))

(defn join-group
  "Join a group on a connection"
  [cxn group-name]
  (doto (SpreadGroup.)
    (.join cxn group-name)))

(defn with-daemon*
  "Execute a function with a connection to the specified daemon"
  [daemon-spec func]
  (let [daemon (merge *spread-daemon* daemon-spec)
        cxn (connect-to-daemon daemon-spec)]
    (try
     (binding [*spread-daemon* (assoc daemon :connection cxn)]
       (func))
     (finally
      (.disconnect cxn)))))

(defn with-group*
  "Execute a function while joined to a group"
  [group-name func]
  (let [cxn (:connection *spread-daemon*)
        grp (join-group cxn group-name)]
    (try
     (binding [*spread-group* grp]
       (func))
     (finally
      (.leave grp)))))

(defn receive-message
  "Receive a single message. If none are available, this will block indefinitely."
  []
  (let [cxn (:connection *spread-daemon*)]
    (.receive cxn)))

(Basically the same idiom as with-open, just that the SpreadConnection class uses disconnect instead of close. Grr. Also, I left out some macros that aren't relevant to the structural question here.)

This works well enough. I can call receive-message from inside of a structure like:

(with-daemon {:host "localhost" :port 4803}
  (with-group "aGroup"
    (... looping ...
      (let [msg (receive-message)] 
        ...))))

It occurs to me that receive-message would be cleaner to use if it were an infinite lazy sequence that produces messages. So, if I wanted to join a group and get messages, the calling code should look something like:

(def message-seq (messages-from {:host "localhost" :port 4803} "aGroup"))
(take 5 message-seq)

I've seen plenty of examples of lazy sequences without cleanup, that's not too hard. The catch is steps #4 and 5 from above: leaving the group and disconnecting from the daemon. How can I bind the state of the connection and group into the sequence and run the necessary cleanup code when the sequence is no longer needed?

+4  A: 

This article describes how to do exactly that using clojure-contrib fill-queue. Regarding cleanup - the neat thing about fill-queue is that you can supply a blocking function that cleans itself up if there is an error or some condition reached. You can also hold a reference to the resource to control it externally. The sequence will just terminate. So depending on your semantic requirement you'll have to choose the strategy that fits.

Timothy Pratley
+2  A: 

Try this:

(ns your-namespace
  (:use clojure.contrib.seq-utils))

(defn messages-from [daemon-spec group-name]
  (let [cnx (connect-to-deamon daemon-spec))
        group (connect-to-group cnx group-name)]
    (fill-queue (fn [fill]
                  (if done? 
                      (do
                        (.leave group)
                        (.disconnect cnx)
                        (throw (RuntimeException. "Finished messages"))
                      (fill (.receive cnx))))))

Set done? to true when you want to terminate the list. Also, any exceptions thrown in (.receive cnx) will also terminate the list.

Eric Normand