views:

90

answers:

1

Hello,

From Don Syme blog (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx) I tried to implement a twitter stream listener. My goal is to follow the guidance of the twitter api documentation which says "that tweets should often be saved or queued before processing when building a high-reliability system".

So my code needs to have two components:

  • A queue that piles up and processes each status/tweet json
  • Something to read the twitter stream that dumps to the queue the tweet in json strings

I choose the following:

  • An agent to which I post each tweet, that decodes the json, and dumps it to database
  • A simple http webrequest

I also would like to dump into a text file any error from inserting in the database. ( I will probably switch to a supervisor agent for all the errors).

Two problems:

  • is my strategy here any good ? If I understand correctly, the agent behaves like a smart queue and processes its messages asynchronously ( if it has 10 guys on its queue it will process a bunch of them at time, instead of waiting for the 1 st one to finish then the 2nd etc...), correct ?
  • According to Don Syme's post everything before the while is Isolated so the StreamWriter and the database dump are Isolated. But because I need this, I never close my database connection... ?

The code looks something like:

let dumpToDatabase databaseName = 
   //opens databse connection 
   fun tweet -> inserts tweet in database

type Agent<'T> = MailboxProcessor<'T>



 let agentDump =
            Agent.Start(fun (inbox: MailboxProcessor<string>) ->
               async{
                   use w2 = new StreamWriter(@"\Errors.txt")
                   let dumpError  =fun (error:string) -> w2.WriteLine( error )
                   let dumpTweet =  dumpToDatabase "stream"
                   while true do 
                       let! msg = inbox.Receive()
                       try 
                           let tw = decode msg
                           dumpTweet tw
                       with 
                       | :? MySql.Data.MySqlClient.MySqlException as ex -> 
    dumpError (msg+ex.ToString() ) 
                        | _ as ex -> () 



                             }
                             )

    let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
    let parameters = "track=RT&"
    let stream_url = filter_url

    let stream = twitterStream MyCredentials stream_url parameters


    while true do 
        agentDump.Post(stream.ReadLine())

Thanks a lot !

Edit of code with processor agent:

let dumpToDatabase (tweets:tweet list)= 
    bulk insert of tweets in database    

let agentProcessor = 
        Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
           async{
               while true do 
                       let! msg = inbox.Receive()
                       try
                          msg
                          |> List.map(decode)
                          |> dumpToDatabase 
                        with
                        | _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
                 }
                 )



let agentDump =
        Agent.Start(fun (inbox: MailboxProcessor<string>) ->
                  let rec loop messageList count = async{
                      try
                          let! newMsg = inbox.Receive()
                          let newMsgList = newMsg::messageList
                          if count = 10 then 
                               agentProcessor.Post( newMsgList )
                               return! loop [] 0
                          else                    
                               return! loop newMsgList (count+1)
                      with
                      | _ as ex -> Console.WriteLine("Dump "+ex.ToString())

                  }
                  loop [] 0)

let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url

let stream = twitterStream MyCredentials stream_url parameters


while true do 
    agentDump.Post(stream.ReadLine())
+2  A: 

I think that the best way to describe agent is that it is is a running process that keeps some state and can communicate with other agents (or web pages or database). When writing agent-based application, you can often use multiple agents that send messages to each other.

I think that the idea to create an agent that reads tweets from the web and stores them in a database is a good choice (though you could also keep the tweets in memory as the state of the agent).

  • I wouldn't keep the database connection open all the time - MSSQL (and MySQL likely too) implements connection pooling, so it will not close the connection automatically when you release it. This means that it is safer and similarly efficient to reopen the connection each time you need to write data to the dataabase.

  • Unless you expect to receive a large number of error messages, I would probably do the same for file stream as well (when writing, you can open it, so that new content is added to the end).

The way queue of F# agents work is that it processes messages one by one (in your example, you're waiting for a message using inbox.Receive(). When the queue contains multiple messagaes, you'll get them one by one (in a loop).

  • If you wanted to process multiple messages at once, you could write an agent that waits for, say, 10 messages and then sends them as a list to another agent (which would then perform bulk-processing).

  • You can also specify timeout parameter to the Receive method, so you could wait for at most 10 messages as long as they all arrive within one second - this way, you can quite elegantly implement bulk processing that doesn't hold messages for a long time.

Tomas Petricek
thank you for your reply tomas. When work is over I ll try writing the bulk processing, looks like fun (probably will require lots more studying though ...) !
jlezard
@Tomas Petricek, finally got some time to try adding another agent, seems to work fine. I am a bit confused by your last bullet point: unless I am mistaken Receive kills the agent after the time specified, so I don't see how to use it ?Thanks !
jlezard
@jlezard: I think the exception only notifies you that no message was received, but it keeps the agent in a usable state. However, it is a better idea to use `TryReceive` which returns `None` when it times out (I didn't realize that it exists when I wrote the answer!)
Tomas Petricek