Hi
I have a F# application that communicates with a java application via named pipe. Where F# acts as server and java acts as client. The application works for the most part except that the F# runes in to “System.IO.IOException: All pipe instances are busy” error occasionally. Below is the full stack trace of the exception and code snip of both F# and Java. Any help is appreciated in resolving this issue
Thanks, Sudaly
full stack trace:
Unhandled Exception: System.IO.IOException: All pipe instances are busy.
at Microsoft.FSharp.Control.CancellationTokenOps.Start@1143-1.Invoke(Exception e)
at <StartupCode$FSharp-Core>.$Control.loop@413-38(Trampoline this, FSharpFunc`2 action)
at Microsoft.FSharp.Control.Trampoline.ExecuteAction(FSharpFunc`2 firstAction)
at Microsoft.FSharp.Control.TrampolineHolder.Protect(FSharpFunc`2 firstAction)
at <StartupCode$FSharp-Core>[email protected](Object state)
at System.Threading.QueueUserWorkItemCallback.WaitCallback_Context(Object state)
at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, ContextCallback callback, Object state, Boolean ignoreSyncCtx)
at System.Threading.QueueUserWorkItemCallback.System.Threading.IThreadPoolWorkItem.ExecuteWorkItem()
at System.Threading.ThreadPoolWorkQueue.Dispatch()
at System.Threading._ThreadPoolWaitCallback.PerformWaitCallback()
F# Code:
[<DataContract>]
type Quote = {
[<field: DataMember(Name="securityIdentifier") >]
RicCode:string
[<field: DataMember(Name="madeOn") >]
MadeOn:DateTime
[<field: DataMember(Name="closePrice") >]
Price:int
}
let globalPriceCache = new Dictionary<string, Quote>()
let ParseQuoteString (quoteString:string) =
let data = Encoding.Unicode.GetBytes(quoteString)
let stream = new MemoryStream()
stream.Write(data, 0, data.Length);
stream.Position <- 0L
let ser = Json.DataContractJsonSerializer(typeof<Quote array>)
let results:Quote array = ser.ReadObject(stream) :?> Quote array
results
let RefreshCache quoteList =
globalPriceCache.Clear()
quoteList
|> Array.iter(fun result->globalPriceCache.Add(result.RicCode, result))
let EstablishConnection() =
let pipeServer = new NamedPipeServerStream("testpipe", PipeDirection.InOut, 4)
pipeServer.WaitForConnection()
try
Some(new StreamReader(pipeServer))
with e ->
None
let rec MarketPriceCache() =
match EstablishConnection() with
|Some(sr) ->
// Read request from the stream.
let m_cache =
sr.ReadLine()
|> ParseQuoteString
|> RefreshCache
MarketPriceCache()
| _ -> ()
[<EntryPoint>]
let main args=
try
async {
MarketPriceCache()
} |> Async.Start
while true do
if globalPriceCache.Count > 0 then
//Business logic
System.Threading.Thread.Sleep(1000 * 50)
else
ignore(logInfo(sprintf "%s" "Price Cache is empty"))
System.Threading.Thread.Sleep(1000 * 65)
with e ->
ignore(logError e.Message)
ignore(logError e.StackTrace)
0
Java Code:
public void WatchForPrice()
{
while (true)
{
try
{
Map<String, SecurityQuoteCacheEntry> priceMap = getPriceCache().getCacheMap();
List<LocalSecurityQuote> localSecurityQuotes = new ArrayList<LocalSecurityQuote>();
if(priceMap != null)
{
Set<String> keySet = priceMap.keySet();
System.out.println("Key Size: " + keySet.size());
for(String key : keySet)
{
SecurityQuote quote = priceMap.get(key).getQuote();
if(quote != null)
{
LocalSecurityQuote localSecurityQuote = new LocalSecurityQuote();
localSecurityQuote.setClosePrice(quote.getClosePrice());
localSecurityQuote.setMadeOn(quote.getMadeOn());
localSecurityQuote.setSecurityIdentifier(key);
localSecurityQuotes.add(localSecurityQuote);
}
}
JSONSerializer serializer = new JSONSerializer();
String jsonString = serializer.serialize(localSecurityQuotes);
// Connect to the pipe
RandomAccessFile pipe = new RandomAccessFile("\\\\.\\pipe\\testpipe", "rw");
if (pipe != null )
{
// write to pipe
pipe.write(jsonString.getBytes());
pipe.close();
}
else
System.out.println("Pipe not found");
doPeriodicWait();
}
else
System.out.println("No Price data found");
}
catch (Exception e)
{
e.printStackTrace();
System.out.println(e.getMessage());
doPeriodicWait();
}
}
}