Hello,
From Don Syme blog (http://blogs.msdn.com/b/dsyme/archive/2010/01/10/async-and-parallel-design-patterns-in-f-reporting-progress-with-events-plus-twitter-sample.aspx) I tried to implement a twitter stream listener. My goal is to follow the guidance of the twitter api documentation which says "that tweets should often be saved or queued before processing when building a high-reliability system".
So my code needs to have two components:
- A queue that piles up and processes each status/tweet json
- Something to read the twitter stream that dumps to the queue the tweet in json strings
I choose the following:
- An agent to which I post each tweet, that decodes the json, and dumps it to database
- A simple http webrequest
I also would like to dump into a text file any error from inserting in the database. ( I will probably switch to a supervisor agent for all the errors).
Two problems:
- is my strategy here any good ? If I understand correctly, the agent behaves like a smart queue and processes its messages asynchronously ( if it has 10 guys on its queue it will process a bunch of them at time, instead of waiting for the 1 st one to finish then the 2nd etc...), correct ?
- According to Don Syme's post everything before the while is Isolated so the StreamWriter and the database dump are Isolated. But because I need this, I never close my database connection... ?
The code looks something like:
let dumpToDatabase databaseName =
//opens databse connection
fun tweet -> inserts tweet in database
type Agent<'T> = MailboxProcessor<'T>
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
async{
use w2 = new StreamWriter(@"\Errors.txt")
let dumpError =fun (error:string) -> w2.WriteLine( error )
let dumpTweet = dumpToDatabase "stream"
while true do
let! msg = inbox.Receive()
try
let tw = decode msg
dumpTweet tw
with
| :? MySql.Data.MySqlClient.MySqlException as ex ->
dumpError (msg+ex.ToString() )
| _ as ex -> ()
}
)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())
Thanks a lot !
Edit of code with processor agent:
let dumpToDatabase (tweets:tweet list)=
bulk insert of tweets in database
let agentProcessor =
Agent.Start(fun (inbox: MailboxProcessor<string list>) ->
async{
while true do
let! msg = inbox.Receive()
try
msg
|> List.map(decode)
|> dumpToDatabase
with
| _ as ex -> Console.WriteLine("Processor "+ex.ToString()))
}
)
let agentDump =
Agent.Start(fun (inbox: MailboxProcessor<string>) ->
let rec loop messageList count = async{
try
let! newMsg = inbox.Receive()
let newMsgList = newMsg::messageList
if count = 10 then
agentProcessor.Post( newMsgList )
return! loop [] 0
else
return! loop newMsgList (count+1)
with
| _ as ex -> Console.WriteLine("Dump "+ex.ToString())
}
loop [] 0)
let filter_url = "http://stream.twitter.com/1/statuses/filter.json"
let parameters = "track=RT&"
let stream_url = filter_url
let stream = twitterStream MyCredentials stream_url parameters
while true do
agentDump.Post(stream.ReadLine())