views:

471

answers:

2

In my previous question, someone has commented that write a .NET Stream to two another Streams simultaneously is possible. I only could find one way but using a temporary variable (buffer) that would store the contents (fully or partialy at once) of the input Stream.

  1. There is any way to do that without using buffers?
  2. There is any way to do it concurrently?
+4  A: 

Using a buffer you can asynchronously write to both streams, assuming the streams support proper asynchronous access. That's effectively concurrent - although you'll have a tricky job keeping all the buffers in sync given that one could finish writing the first buffer a long time before the other has done so.

As far as I'm aware there's no way to copy from one (general) stream to another without using a buffer of at least one byte. (And using just a single byte will be painfully inefficient, of course.)

Jon Skeet
+3  A: 

If you are happy to limit your concurrency to the rate of the slowest stream then this is pretty simple and requires no additional buffer, I have written it from memory so might have messed it up a little.

Also it doesn't implement all the overrides needed, things like CanRead, CanSeek and the like are all skipped for simplicity, just the core concurrent writing is done.

using System;
using System.IO;
using System.Threading;

namespace MultiCastStream
{   
  public class MultiWriteStream : Stream
  {
    private readonly Stream[] streams;   
    private AutoResetEvent[] waits;
    private readonly IAsyncResult[] results;

    public MultiWriteStream(params Stream[] streams)
    {
      this.streams = (Stream[])streams.Clone();
      this.results = new IAsyncResult[this.streams.Length];
    }

    private void prepWaits()
    {
      if (waits == null)
      {
        this.waits = new AutoResetEvent[this.streams.Length];
        for(int i= 0; i < this.waits.Length; i++)
        {
          this.waits[i] = new AutoResetEvent(false);
        }
      }
      else
      {
        for(int i= 0; i < this.waits.Length; i++)
        {
          this.waits[i].Reset();
        }
      }
    }

    public override void Write (byte[] buffer, int offset, int count)
    {
      prepWaits();
      for( int i = 0; i < this.streams.Length; i++)
      {
        this.results[i] = streams[i].BeginWrite(
          buffer, offset, count, this.Release, waits[i]); 
      }
      WaitHandle.WaitAll(waits);
      for( int i = 0; i < this.streams.Length; i++)
      {
        this.results[i] = this.streams[i].EndWrite(results[i]); 
      }
    }

    private void Release(IAsyncResult result)
    {
      ((AutoResetEvent)result.AsyncState).Set();
    }

    public override void WriteByte (byte value)
    {
      // no point doing this asynchronously
      foreach (Stream s in this.streams) 
      s.WriteByte (value);
    }

    protected override void Dispose (bool disposing)
    {
      base.Dispose (disposing);
      if (this.waits != null)
      {
        foreach (AutoResetEvent w in this.waits)
          w.Close();
      }
      foreach (Stream s in this.streams)
        s.Dispose();
    }  
  }
}
ShuggyCoUk