tags:

views:

86

answers:

3

Hi

I have a F# application that communicates with a java application via named pipe. Where F# acts as server and java acts as client. The application works for the most part except that the F# runes in to “System.IO.IOException: All pipe instances are busy” error occasionally. Below is the full stack trace of the exception and code snip of both F# and Java. Any help is appreciated in resolving this issue

Thanks, Sudaly

full stack trace:

Unhandled Exception: System.IO.IOException: All pipe instances are busy.
   at Microsoft.FSharp.Control.CancellationTokenOps.Start@1143-1.Invoke(Exception e)
   at <StartupCode$FSharp-Core>.$Control.loop@413-38(Trampoline this, FSharpFunc`2 action)
   at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
   at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
   at <StartupCode$FSharp-Core>[email protected](Object state)
   at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
   at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
   at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
   at System.Threading.ThreadPoolWorkQueue.Dispatch()
   at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()

F# Code:

[<DataContract>] 
type Quote = { 
    [<field: DataMember(Name="securityIdentifier") >] 
    RicCode:string
    [<field: DataMember(Name="madeOn") >] 
    MadeOn:DateTime
    [<field: DataMember(Name="closePrice") >] 
    Price:int 
    }

let globalPriceCache = new Dictionary<string, Quote>()

let ParseQuoteString (quoteString:string) = 
    let data = Encoding.Unicode.GetBytes(quoteString)
    let stream = new MemoryStream() 
    stream.Write(data, 0, data.Length); 
    stream.Position <- 0L 
    let ser = Json.DataContractJsonSerializer(typeof<Quote array>) 
    let results:Quote array = ser.ReadObject(stream) :?> Quote array
    results

let RefreshCache quoteList =
    globalPriceCache.Clear() 
    quoteList 
    |> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result)) 

let EstablishConnection() =
    let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4)
    pipeServer.WaitForConnection()
    try
        Some(new StreamReader(pipeServer))
    with e -> 
        None

let rec MarketPriceCache() =
    match EstablishConnection() with
    |Some(sr) ->
        // Read request from the stream.
        let m_cache = 
            sr.ReadLine()  
            |>  ParseQuoteString  
            |>  RefreshCache

        MarketPriceCache()
    | _ -> () 


[<EntryPoint>]
let main args=
    try
        async { 
            MarketPriceCache() 
        } |> Async.Start

        while true do
            if globalPriceCache.Count > 0 then
    //Business logic
                System.Threading.Thread.Sleep(1000 * 50)
            else

                ignore(logInfo(sprintf "%s" "Price Cache is empty"))
                System.Threading.Thread.Sleep(1000 * 65)

    with e ->
        ignore(logError e.Message)
        ignore(logError e.StackTrace)    
    0

Java Code:

public void WatchForPrice()
 {
  while (true)
  {
   try 
   {
    Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap();
    List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>();
    if(priceMap != null)
    {

     Set<String> keySet = priceMap.keySet();
     System.out.println("Key Size: " + keySet.size());
     for(String key : keySet)
     {
      SecurityQuote quote =  priceMap.get(key).getQuote();
      if(quote != null)
      {
       LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote();
       localSecurityQuote.setClosePrice(quote.getClosePrice());
       localSecurityQuote.setMadeOn(quote.getMadeOn());     
       localSecurityQuote.setSecurityIdentifier(key);
       localSecurityQuotes.add(localSecurityQuote);
      }

     }

     JSONSerializer serializer = new JSONSerializer();
     String jsonString = serializer.serialize(localSecurityQuotes);

     // Connect to the pipe
     RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
     if (pipe != null )
     {
      // write to pipe
      pipe.write(jsonString.getBytes());
      pipe.close();

     }
     else
      System.out.println("Pipe not found");
     doPeriodicWait();
    }
    else 
     System.out.println("No Price data found");
   }
   catch (Exception e) 
   {
    e.printStackTrace();
    System.out.println(e.getMessage());
    doPeriodicWait();
   }
  }
 }
+4  A: 

This is a hunch, but maybe the issue is that you are not closing your pipe stream reader?

let rec MarketPriceCache() =
match EstablishConnection() with
|Some(sr) ->
    // Read request from the stream.        
    try        
        sr.ReadLine()  
        |>  ParseQuoteString  
        |>  RefreshCache   
    finally
        sr.Close()
    MarketPriceCache()
| _ -> () 

(m_cache variable is not needed - you are not using it anywhere)

Mitya
Thank you Mitya, i'll give your solution a try and let you know the result
sudaly
+3  A: 

You have to dispose of the NamedPipeServerStream every time you create one. The easiest way to do this in your code is to dispose of the StreamReader inside MarketPriceCache by putting a use statement around it:

let rec MarketPriceCache() =
    match EstablishConnection() with
    | Some(sr) ->
        // Read request from the stream.
        use reader = sr in
        (
            let m_cache = 
                reader.ReadLine()  
                |>  ParseQuoteString  
                |>  RefreshCache
        )
        MarketPriceCache()
    | _ -> ()

The syntax with using ... in is there to prevent that the scope of the reader ends after the recursive call to MarketPriceCache.

Ronald Wildenberg
Thank you Ronald, i'll give your solution a try and let you know the result
sudaly
Actually, both my solution and the one by Mitya are correct. The `use` binding is translated by the compiler to a try/finally statement that calls `Dispose` in the finally branch. A `use` binding is considered a more high-level construct.
Ronald Wildenberg
A: 

Thanks to both Ronald and Mitya. you solution worked. the application been running for almost 24 hrs now and there was not a single exception. Thank you.

sudaly