tags:

views:

63

answers:

1

Hello friends I have a messaging system that uses p2p. Each peer has a incoming message list and a outgoing message list. What I need to do is whenever a new peer will join the mesh he will get the all the incoming messages from other peers and add those into it's own incoming message list. Now I know when I get the other peer info from I can ask them to give their own list to me. But I'm not finding the way how..? Any suggestion on this or help would be highly appreciated. I'm giving my code below.

Thanking in Advance Johnny

   #region Instance Fields

    private string strOrigin = "";
    //the chat member name
    private string m_Member;
    //the channel instance where we execute our service methods against
    private IServerChannel m_participant;
    //the instance context which in this case is our window since it is the service host
    private InstanceContext m_site;
    //our binding transport for the p2p mesh
    private NetPeerTcpBinding m_binding;
    //the factory to create our chat channel
    private ChannelFactory<IServerChannel> m_channelFactory;
    //an interface provided by the channel exposing events to indicate
    //when we have connected or disconnected from the mesh
    private IOnlineStatus o_statusHandler;
    //a generic delegate to execute a thread against that accepts no args
    private delegate void NoArgDelegate();
    //an object to hold user details
    private IUserService userService;        
    //an Observable Collection of object to get all the Application Instance Details in databas
    ObservableCollection<AppLoginInstance> appLoginInstances;
    // an Observable Collection of object to get all Incoming Messages types
    ObservableCollection<MessageType> inComingMessageTypes;
    // an Observable Collection of object to get all Outgoing Messages
    ObservableCollection<PDCL.ERP.DataModels.Message> outGoingMessages;
    // an Observable Collection of object to get all Incoming Messages
    ObservableCollection<PDCL.ERP.DataModels.Message> inComingMessages;
    //an Event Aggregator to publish event for other modules to subscribe
    private readonly IEventAggregator eventAggregator;   

    /// <summary>
    /// an IUnityCOntainer to get the container
    /// </summary>
    private IUnityContainer container;
    private RefreshConnectionStatus refreshConnectionStatus;
    private RefreshConnectionStatusEventArgs args;
    private ReplyRequestMessage replyMessageRequest;
    private ReplyRequestMessageEventArgs eventsArgs;

    #endregion

    public P2pMessageService(IUserService UserService, IEventAggregator EventAggregator, IUnityContainer container)
    {
        userService = UserService;
        this.container = container;
        appLoginInstances = new ObservableCollection<AppLoginInstance>();
        inComingMessageTypes = new ObservableCollection<MessageType>();
        inComingMessages = new ObservableCollection<PDCL.ERP.DataModels.Message>();
        outGoingMessages = new ObservableCollection<PDCL.ERP.DataModels.Message>();
        this.args = new RefreshConnectionStatusEventArgs();
        this.eventsArgs = new ReplyRequestMessageEventArgs();
        this.eventAggregator = EventAggregator;            
        this.refreshConnectionStatus = this.eventAggregator.GetEvent<RefreshConnectionStatus>();
        this.replyMessageRequest = this.eventAggregator.GetEvent<ReplyRequestMessage>();
    }    


    #region IOnlineStatus Event Handlers
    void ostat_Offline(object sender, EventArgs e)
    {
        // we could update a status bar or animate an icon to 
        //indicate to the user they have disconnected from the mesh

        //currently i don't have a "disconnect" button but adding it
        //should be trivial if you understand the rest of this code
    }

    void ostat_Online(object sender, EventArgs e)
    {
        try
        {                
            m_participant.Join(userService.AppInstance);
        }
        catch (Exception Ex)
        {
            Logger.Exception(Ex, Ex.TargetSite.Name + ": " + Ex.TargetSite + ": " + Ex.Message);
        }
    }

    #endregion        

    #region IServer Members

    //this method gets called from a background thread to 
    //connect the service client to the p2p mesh specified
    //by the binding info in the app.config
    public void ConnectToMesh()
    {
        try
        {               

            m_site = new InstanceContext(this);

            //use the binding from the app.config with default settings
            m_binding = new NetPeerTcpBinding("P2PMessageBinding");

            m_channelFactory = new DuplexChannelFactory<IServerChannel>(m_site, "P2PMessageEndPoint");
            m_participant = m_channelFactory.CreateChannel();
            o_statusHandler = m_participant.GetProperty<IOnlineStatus>();
            o_statusHandler.Online += new EventHandler(ostat_Online);
            o_statusHandler.Offline += new EventHandler(ostat_Offline);
            //m_participant.InitializeMesh();
            //this.appLoginInstances.Add(this.userService.AppInstance);

            BackgroundWorkerHelper.DoWork<object>(() =>
            {
                //this is an empty unhandled method on the service interface.
                //why? because for some reason p2p clients don't try to connect to the mesh
                //until the first service method call.  so to facilitate connecting i call this method
                //to get the ball rolling.
                m_participant.InitializeMesh();
                //SynchronizeMessage(this.inComingMessages);    
                return new object();
            }, arg =>
            {

            });

            this.appLoginInstances.Add(this.userService.AppInstance);

        }
        catch (Exception Ex)
        {
            Logger.Exception(Ex, Ex.TargetSite.Name + ": " + Ex.TargetSite + ": " + Ex.Message);
        }
    }


    public void Join(AppLoginInstance obj)
    {
        try
        {
            // Adding Instance to the PeerList

            if (appLoginInstances.SingleOrDefault(a => a.InstanceId == obj.InstanceId)==null)
            {
                appLoginInstances.Add(obj);                    
                this.refreshConnectionStatus.Publish(new RefreshConnectionStatusEventArgs() { Status = m_channelFactory.State });                                        
            }                    

            //this will retrieve any new members that have joined before the current user
            m_participant.SynchronizeMemberList(userService.AppInstance);

        }
        catch(Exception Ex)
        {
            Logger.Exception(Ex,Ex.TargetSite.Name + ": " + Ex.TargetSite + ": " + Ex.Message);
        }
    }

    /// <summary>
    /// Synchronizes member list
    /// </summary>
    /// <param name="obj">The AppLoginInstance Param</param>
    public void SynchronizeMemberList(AppLoginInstance obj)
    {

        //as member names come in we simply disregard duplicates and 
        //add them to the member list, this way we can retrieve a list
        //of members already in the chatroom when we enter at any time.

        //again, since this is just an example this is the simplified
        //way to do things.  the correct way would be to retrieve a list
        //of peernames and retrieve the metadata from each one which would
        //tell us what the member name is and add it.  we would want to check
        //this list when we join the mesh to make sure our member name doesn't 
        //conflict with someone else
        try
        {
            if (appLoginInstances.SingleOrDefault(a => a.InstanceId == obj.InstanceId) == null)
            {
                appLoginInstances.Add(obj);                    
            }
        }
        catch (Exception Ex)
        {
            Logger.Exception(Ex, Ex.TargetSite.Name + ": " + Ex.TargetSite + ": " + Ex.Message);
        }
    }      

    /// <summary>
    /// This methos broadcasts the mesasge to all peers.
    /// </summary>
    /// <param name="msg">The whole message which is to be broadcasted</param>
    /// <param name="securityLevels"> Level of security</param>
    public void BroadCastMsg(PDCL.ERP.DataModels.Message msg, List<string> securityLevels)
    {
        try
        {
            foreach (string s in securityLevels)
            {
                if (this.userService.IsInRole(s))
                {
                    if (this.inComingMessages.Count == 0 && msg.CreatedByApp != this.userService.AppInstanceId)
                    {
                        this.inComingMessages.Add(msg);                            
                    }
                    else if (this.inComingMessages.SingleOrDefault(a => a.MessageId == msg.MessageId) == null && msg.CreatedByApp != this.userService.AppInstanceId)
                    {
                        this.inComingMessages.Add(msg);
                    }
                }
            }
        }
        catch (Exception Ex)
        {
            Logger.Exception(Ex, Ex.TargetSite.Name + ": " + Ex.TargetSite + ": " + Ex.Message);
        }

    }

    /// <summary>
    /// 
    /// </summary>
    /// <param name="msg">The Message to denyed</param>
    public void BroadCastReplyMsg(PDCL.ERP.DataModels.Message msg)
    {
        try
        {
            //if (this.inComingMessages.SingleOrDefault(a => a.MessageId == msg.MessageId) != null)
            //{
                this.replyMessageRequest.Publish(new ReplyRequestMessageEventArgs() { Message = msg });
                this.inComingMessages.Remove(this.inComingMessages.SingleOrDefault(o => o.MessageId == msg.MessageId));
            //}

        }
        catch (Exception ex)
        {
            Logger.Exception(ex, ex.TargetSite.Name + ": " + ex.TargetSite + ": " + ex.Message);
        }
    }

    //again we need to sync the worker thread with the UI thread via Dispatcher
    public void Whisper(string Member, string MemberTo, string Message)
    {          


    }

    public void InitializeMesh()
    {
        //do nothing
    }

    public void Leave(AppLoginInstance obj)
    {
        if (this.appLoginInstances.SingleOrDefault(a => a.InstanceId == obj.InstanceId) != null)
        {
            this.appLoginInstances.Remove(this.appLoginInstances.Single(a => a.InstanceId == obj.InstanceId));
        }            
    }        

    //public void SynchronizeRemoveMemberList(AppLoginInstance obj)
    //{
    //    if (appLoginInstances.SingleOrDefault(a => a.InstanceId == obj.InstanceId) != null)
    //    {
    //        appLoginInstances.Remove(obj);
    //    }
    //}       

    #endregion
A: 

The p2p protocol in WCF is inherently a broadcast protocol. The only way to send a message to a particular peer would be to get it's address and talk to it over a different binding. The problem with this is that if a peer joins then all of the current nodes will want to connect to it to update the list.

You'll have to make a separate method on a separate binding so that the client can retrieve the list.


More information can be found on the PeerChannel Blog and information specific to synchronization can be found here. The method for this mostly depends on the size of your mesh, the size of the messages, and the content of the messages.

For example, if your messages are relatively small and have a unique identifier then you could re-transmit the messages on the mesh with a hopcount of one which means that some nodes will re-receive the messages.

Alternatively, you could send a message containing the IP Address of a node with lots of messages and use another WCF connection (such as basicHttpBinding) to the mesh and then any computer that requires the messages can connect to one of those nodes.

Matthew Steeples
Would u kindly explain a little bit more please.
Johnny
I have added some more of an explanation
Matthew Steeples