views:

461

answers:

2

I have a little sample application I was working on trying to get some of the new .Net 4.0 Parallel Extensions going (they are very nice). I'm running into a (probably really stupid) problem with an OutOfMemoryException. My main app that I'm looking to plug this sample into reads some data and lots of files, does some processing on them, and then writes them out somewhere. I was running into some issues with the files getting bigger (possibly GB's) and was concerned about memory so I wanted to parallelize things which led me down this path.

Now the below code gets an OOME on smaller files and I think I'm just missing something. It will read in 10-15 files and write them out in parellel nicely, but then it chokes on the next one. It looks like it's read and written about 650MB. A second set of eyes would be appreciated.

I'm reading into a MemorySteam from the FileStream because that is what is needed for the main application and I'm just trying to replicate that to some degree. It reads data and files from all types of places and works on them as MemoryStreams.

This is using .Net 4.0 Beta 2, VS 2010.

namespace ParellelJob
{
class Program
{
    BlockingCollection<FileHolder> serviceToSolutionShare;
    static void Main(string[] args)
    {
        Program p = new Program();
        p.serviceToSolutionShare = new BlockingCollection<FileHolder>();
        ServiceStage svc = new ServiceStage(ref p.serviceToSolutionShare);
        SolutionStage sol = new SolutionStage(ref p.serviceToSolutionShare);

        var svcTask = Task.Factory.StartNew(() => svc.Execute());
        var solTask = Task.Factory.StartNew(() => sol.Execute());

        while (!solTask.IsCompleted)
        {

        }

    }
}

class ServiceStage
{
    BlockingCollection<FileHolder> outputCollection;
    public ServiceStage(ref BlockingCollection<FileHolder> output)
    {
        outputCollection = output;
    }

    public void Execute()
    {
        var di = new DirectoryInfo(@"C:\temp\testfiles");
        var files = di.GetFiles();
        foreach (FileInfo fi in files)
        {
            using (var fs = new FileStream(fi.FullName, FileMode.Open, FileAccess.Read))
            {
                int b;
                var ms = new MemoryStream();
                while ((b = fs.ReadByte()) != -1)
                {
                    ms.WriteByte((byte)b); //OutOfMemoryException Occurs Here
                }
                var f = new FileHolder();
                f.filename = fi.Name;
                f.contents = ms;

                outputCollection.TryAdd(f);
            }
        }
        outputCollection.CompleteAdding();

    }
}

class SolutionStage
{
    BlockingCollection<FileHolder> inputCollection;
    public SolutionStage(ref BlockingCollection<FileHolder> input)
    {
        inputCollection = input;
    }
    public void Execute()
    {
        FileHolder current;
        while (!inputCollection.IsCompleted)
        {
            if (inputCollection.TryTake(out current))
            {
                using (var fs = new FileStream(String.Format(@"c:\temp\parellel\{0}", current.filename), FileMode.OpenOrCreate, FileAccess.Write))
                {
                    using (MemoryStream ms = (MemoryStream)current.contents)
                    {
                        ms.WriteTo(fs);
                        current.contents.Close();
                    }
                }
            }
        }
    }
}

class FileHolder
{
    public string filename { get; set; }
    public Stream contents { get; set; }
}
}
A: 

Just looking through quickly, inside your ServiceStage.Execute method you have

var ms = new MemoryStream();

I don't see where you are closing ms out or have it in a using. You do have the using in the other class. That's one thing to check out.

Jim Leonardo
I am trying to share the MemoryStream between classes by putting it in the shared BlockingCollection. The stream gets closed out after the SolutionStage gets done with it.
MikeD
No, the MemStreams are used to transfer the data and are closed on the receiving end. And being totally managed, you don't really need to Dispose() them.
Henk Holterman
+1  A: 

The main logic seems OK, but if that empty while-loop in main is literal then you are burning unnecesary CPU cycles. Better use solTask.Wait() instead.

But if individual files can run in Gigabytes, you still have the problem of holding at least 1 completely in memory, and usually 2 (1 being read, i being processed/written.

PS1: I just realized you don't pre-allocate the MemStream. That's bad, it will have to re-size very often for a big file, and that costs a lot of memory. Better use something like:

var ms = new MemoryStream(fs.Length);

And then, for big files, you have to consider the Large Object Heap (LOH). Are you sure you can't break a file up in segments and process them?

PS2: And you don't need the ref's on the constructor parameters, but they're not the problem.

Henk Holterman
pre-allocating the MemoryStream solves my problem for all but the largest files. I will be breaking up larger files into chunks. Just didn't have that in this sample app. Thanks.
MikeD