views:

314

answers:

5

I'd like to write some code that runs a sequence of F# scripts (.fsx). The thing is that I could have literally hundreds of scripts and if I do that:

let shellExecute program args =
    let startInfo = new ProcessStartInfo()
    do startInfo.FileName        <- program
    do startInfo.Arguments       <- args
    do startInfo.UseShellExecute <- true
    do startInfo.WindowStyle     <- ProcessWindowStyle.Hidden

    //do printfn "%s" startInfo.Arguments 
    let proc = Process.Start(startInfo)
    ()

scripts
|> Seq.iter (shellExecute "fsi")

it could stress too much my 2GB system. Anyway, I'd like to run scripts by batch of n, which seems also a good exercise for learning Async (I guess it's the way to go).

I have started to write some code for that but unfortunately it doesn't work:

open System.Diagnostics

let p = shellExecute "fsi" @"C:\Users\Stringer\foo.fsx"

async {
    let! exit = Async.AwaitEvent p.Exited
    do printfn "process has exited"
}
|> Async.StartImmediate

foo.fsx is just a hello world script. What would be the most idiomatic way of solving this problem?

I'd like also to figure out if it's doable to retrieve a return code for each executing script and if not, find another way. Thanks!

EDIT:

Thanks a lot for your insights and links! I've learned a lot. I just want to add some code for running batchs in parallel using Async.Parallel as Tomas suggested it. Please comment if there is a better implementation for my cut function.

module Seq =
  /// Returns a sequence of sequences of N elements from the source sequence.
  /// If the length of the source sequence is not a multiple
  /// of N, last element of the returned sequence will have a length
  /// included between 1 and N-1.
  let cut (count : int) (source : seq<´T>) = 
    let rec aux s length = seq {
      if (length < count) then yield s
      else
        yield Seq.take count s
        if (length <> count) then
          yield! aux (Seq.skip count s) (length - count)
      }
    aux source (Seq.length source)

let batchCount = 2
let filesPerBatch =
  let q = (scripts.Length / batchCount)
  q + if scripts.Length % batchCount = 0 then 0 else 1

let batchs =
  scripts
  |> Seq.cut filesPerBatch
  |> Seq.map Seq.toList
  |> Seq.map loop

Async.RunSynchronously (Async.Parallel batchs) |> ignore

EDIT2:

So I had some troubles to get Tomas's guard code working. I guess the f function had to be called in AddHandler method, otherwise we loose the event for ever... Here's the code:

module Event =
  let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e
    { new IEvent<´Args> with 
        member this.AddHandler(d) = e.AddHandler(d); f() //must call f here!
        member this.RemoveHandler(d) = e.RemoveHandler(d); f()
        member this.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

The interesting thing (as mentioned by Tomas) is that it looks like the Exited event is stored somewhere when the process terminates, even though the process has not started with EnableRaisingEvents set to true. When this property is finally set to true, the event is fired up.

Since I'm not sure that this is the official specification (and also a bit paranoid), I found another solution that consists in starting the process in the guard function, so we ensure that the code will work on whichever situation:

let createStartInfo program args =
  new ProcessStartInfo
    (FileName = program, Arguments = args, UseShellExecute = false,
     WindowStyle = ProcessWindowStyle.Normal, 
     RedirectStandardOutput = true)

let createProcess info =
  let p = new Process()
  do p.StartInfo           <- info
  do p.EnableRaisingEvents <- true
  p

let rec loop scripts = async { 
  match scripts with 
  | [] -> printfn "FINISHED"
  | script::scripts ->
    let args = sprintf "\"%s\"" script
    let p = createStartInfo "notepad" args |> createProcess
    let! exit =
      p.Exited 
      |> Event.guard (fun () -> p.Start() |> ignore)
      |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    do printfn "\nPROCESSED: %s, CODE: %d, OUTPUT: %A"script p.ExitCode output
    return! loop scripts 
  }

Notice I've replaced fsi.exe by notepad.exe so I can replay different scenarios step by step in the debugger and control explicitly the exit of the process myself.

+5  A: 

Your approach looks great to me, I really like the idea of embedding process execution into asynchronous workflows using AwaitEvent!

The likely reason why it didn't work is that you need to set EnableRisingEvents property of the Process to true if you want it to ever trigger the Exited event (don't ask my why you have to do that, it sounds pretty silly to me!) Anyway, I did a couple of other changes to your code when testing it, so here is a version that worked for me:

open System
open System.Diagnostics

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo
      (FileName = program, Arguments = args, UseShellExecute = false,
       WindowStyle = ProcessWindowStyle.Hidden, 
       RedirectStandardOutput = true)

  // Start the process
  // Note: We must enable rising events explicitly here!
  Process.Start(startInfo, EnableRaisingEvents = true)

Most importantly, the code now sets EnableRaisingEvents to true. I also changed the code to use a syntax where you specify properties of an object when constructing it (to make the code a bit more succinct) and I changed a few properties, so that I can read the output (RedirectStandardOutput).

Now, we can use the AwaitEvent method to wait until a process completes. I'll assume that fsi contains the path to fsi.exe and that scripts is a list of FSX scripts. If you want to run them sequentially, you could use a loop implemented using recursion:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    // Start the proces in background
    let p = shellExecute fsi script 
    // Wait until the process completes
    let! exit = Async.AwaitEvent p.Exited 
    // Read the output produced by the process, the exit code
    // is available in the `ExitCode` property of `Process`
    let output = p.StandardOutput.ReadToEnd()
    printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
    // Process the rest of the scripts
    return! loop scripts  } 

// This starts the workflow on background thread, so that we can
// do other things in the meantime. You need to add `ReadLine`, so that
// the console application doesn't quit immedeiately
loop scripts |> Async.Start
Console.ReadLine() |> ignore    

Of course, you could also run the processes in parallel (or for example run 2 groups of them in parallel etc.) To do that you would use Async.Parallel (in the usual way).

Anyway, this is a really nice example of using asynchronous workflows in an area where I haven't seen them used so far. Very interesting :-)

Tomas Petricek
Nice answer. Is there any danger that the process will exit before the call to `Async.AwaitEvent` (meaning that the event isn't raised after the listener is added)?
kvb
Yes, it is a race, see e.g. http://v2matveev.blogspot.com/2010/02/event-based-async-pattern-in-f.html
Brian
Tomas Petricek
@Tomas - would my answer solve the race condition, or am I missing something?
Joel Mueller
@Joel: I'm afraid that your approach won't work (details below your post)
Tomas Petricek
+1  A: 

What about a mailboxprocessor?

Bohdan Szymanik
Sounds not bad but I prefer sticking with asynchronous workflows first since I think they could be used more widely than MailboxProcessor in my F# code. Plus, in MailboxProcessor F# code I've seen there are often async code.
Stringer Bell
+3  A: 

In response to Tomas's answer, would this be a workable solution to the race condition involved in starting the process, and then subscribing to its Exited event?

type Process with
    static member AsyncStart psi =
        let proc = new Process(StartInfo = psi, EnableRaisingEvents = true)
        let asyncExit = Async.AwaitEvent proc.Exited
        async {
            proc.Start() |> ignore
            let! args = asyncExit
            return proc
        }

Unless I'm mistaken, this would subscribe to the event prior to starting the process, and package it all up as an Async<Process> result.

This would allow you to rewrite the rest of the code like this:

let shellExecute program args = 
  // Configure process to redirect output (so that we can read it)
  let startInfo = 
    new ProcessStartInfo(FileName = program, Arguments = args, 
        UseShellExecute = false,
        WindowStyle = ProcessWindowStyle.Hidden, 
        RedirectStandardOutput = true)

  // Start the process
  Process.AsyncStart(startInfo)

let fsi = "PATH TO FSI.EXE"

let rec loop scripts = async { 
    match scripts with 
    | [] -> printf "FINISHED"
    | script::scripts ->
        // Start the proces in background
        use! p = shellExecute fsi script 
        // Read the output produced by the process, the exit code
        // is available in the `ExitCode` property of `Process`
        let output = p.StandardOutput.ReadToEnd()
        printfn "\nPROCESSED: %s, CODE: %d\n%A" script p.ExitCode output
        // Process the rest of the scripts
        return! loop scripts 
} 

If that does the job, it's certainly a lot less code to worry about than Vladimir's Async.GetSubject.

Joel Mueller
I think this won't work, but the approach looks promising. The code that you run outside of the workflow creates only an async workflow (`asyncExit`) that will attach the handler to the event once the workflow is started (on `let!`), so this behaves same as my original version. To fix it, you'd need to attach handler to the `proc.Exited` event (before returning the workflow) and handle the case when the event fires before `AwaitEvent` is called using some mutable variable... I think I found another way to deal with this, so I'll post it.
Tomas Petricek
Oh, that's disappointing. I was hoping that obtaining an async workflow based on AwaitEvent was the same thing as subscribing to it.
Joel Mueller
+3  A: 

I did some experiments and here is one way to deal with the problem discussed in the comments below my post and in the answer from Joel (which I think doesn't work currently, but could be fixed).

I think the specification of Process is that it can trigger the Exited event after we set the EnableRaisingEvents property to true (and will trigger the event even if the process has already completed before we set the property). To handle this case correctly, we need to enable raising of events after we attach handler to the Exited event.

This is a problme, because if we use AwaitEvent it will block the workflow until the event fires. We cannot do anything after calling AwaitEvent from the workflow (and if we set the property before calling AwaitEvent, then we get a race....). Vladimir's approach is correct, but I think there is a simpler way to deal with this.

I'll create a function Event.guard taking an event and returning an event, which allows us to specify some function that will be executed after a handler is attached to the event. This means that if we do some operation (which in turn triggers the event) inside this function, the event will be handled.

To use it for the problem discussed here, we need to change my original solution as follows. Firstly, the shellExecute function must not set the EnableRaisingEvents property (otherwise, we could lose the event!). Secondly, the waiting code should look like this:

let rec loop scripts = async { 
  match scripts with 
  | [] -> printf "FINISHED"
  | script::scripts ->
    let p = shellExecute fsi script 
    let! exit = 
      p.Exited 
        |> Event.guard (fun () -> p.EnableRaisingEvents <- true)
        |> Async.AwaitEvent
    let output = p.StandardOutput.ReadToEnd()
    return! loop scripts  } 

Note the use of the Event.guard function. Roughly, it says that after the workflow attaches handler to the p.Exited event, the provided lambda function will run (and will enable raising of events). However, we already attached the handler to the event, so if this causes the event immediately, we're fine!

The implementation (for both Event and Observable) looks like this (using ´ instead of ' to avoid colorization screw-up):

module Event =
  let guard f (e:IEvent<´Del, ´Args>) = 
    let e = Event.map id e
    { new IEvent<´Args> with 
        member x.AddHandler(d) = e.AddHandler(d)
        member x.RemoveHandler(d) = e.RemoveHandler(d); f()
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

module Observable =
  let guard f (e:IObservable<´Args>) = 
    { new IObservable<´Args> with 
        member x.Subscribe(observer) = 
          let rm = e.Subscribe(observer) in f(); rm }

Nice thing is that this code is very straightforward.

Tomas Petricek
Very nice! Would you mind describing the significance of the `in` keyword in `e.Subscribe(observer) in f()`? It looks like it has something to do with verbose syntax, but I'm not familiar with that "function call in function call" usage.
Joel Mueller
Sure. In the verbose syntax, you'd always write `in` after the initialization expression of `let` (e.g. `let a = 1 in printf "..."`). If you use the _light_ syntax, the compiler _thinks_ it is there if you end `let` with a newline. If you want to write `let` followed by something else on a single-line (in my case, to make the code more succinct), you need to add `in` after `let` (and `;` after normal expression).
Tomas Petricek
+2  A: 

It is possible to simplify version of Subject from blogpost. instead of returning imitation of event, getSubject can return workflow.

Result workflow itself is state machine with two states 1. Event wasn't triggered yet: all pending listeners should be registered 2. Value is already set, listener is served immediately In code it will appear like this:

type SubjectState<'T> = Listen of ('T -> unit) list | Value of 'T

getSubject implementation is trivial

let getSubject (e : IEvent<_, _>) = 
    let state = ref (Listen [])
    let switchState v = 
        let listeners = 
            lock state (fun () ->
                match !state with
                | Listen ls -> 
                    state := Value v 
                    ls
                | _ -> failwith "Value is set twice"
            )
        for l in listeners do l v

    Async.StartWithContinuations(
        Async.AwaitEvent e,
        switchState,
        ignore,
        ignore
    )

Async.FromContinuations(fun (cont, _, _) ->
    let ok, v = lock state (fun () ->
        match !state with
        | Listen ls ->
            state := Listen (cont::ls)
            false, Unchecked.defaultof<_>
        | Value v ->
            true, v
        )
    if ok then cont v
    )
desco