views:

20

answers:

0

our project(banking domain) is divided into 4 layers. One of which is a communication(COM) layer which developed in JAVA uses MINA. Our host applications run on UNIX machine which serve request from users via socket communication. The COM layer has a java program which acts both like a client and server. server to Requests from other JAVA API's and client to host(UNIX M/c) Here are the programs.

1) TC4CommunicationGateway.java

package com.tc4.communication;


import java.io.IOException;

import java.net.InetSocketAddress;
import java.net.SocketAddress;
//import java.nio.charset.Charset;

import org.apache.mina.filter.codec.ProtocolCodecFilter;
//import org.apache.mina.filter.codec.textline.TextLineCodecFactory;
import org.apache.mina.filter.logging.LoggingFilter;
import org.apache.mina.transport.socket.nio.NioSocketAcceptor;
import org.apache.mina.transport.socket.nio.NioSocketConnector;
import org.apache.mina.core.service.IoAcceptor;
import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.session.IoSession;
import org.apache.mina.core.future.*;

import com.tc4.api.ApplicationTransactionHeader;
import com.tc4.api.CommunicationHeader;
import com.tc4.api.Message;
import com.tc4.api.RequestMessage;
import com.tc4.api.ResponseMessage;
import com.tc4.communication.codec.TC4CodecFactory;
import com.tc4.logging.LogTC4;
import com.tc4.system.CgsSignOn;


public class TC4CommunicationGateway
{
    private TC4GatewayConnectionHandler gatewayConnectHandler;
    private NioSocketConnector connector;
    private IoAcceptor acceptor;
    private IoSession hostSession;
    private TC4GatewayAcceptorHandler gatewayAcceptHandler;

    private static final int PORT = 9123;

public TC4CommunicationGateway() {
    this.gatewayConnectHandler = new TC4GatewayConnectionHandler();
    this.connector = new NioSocketConnector();
    this.acceptor = new NioSocketAcceptor();
    this.gatewayAcceptHandler = new TC4GatewayAcceptorHandler();
    gatewayAcceptHandler.setGatewayConnetHandler(this.gatewayConnectHandler);
    connectToHost();
    listenToClients();
}

public void connectToHost() {
    SocketAddress address = new InetSocketAddress("172.16.25.3", 8004);
    // comment by vskrao.... connector.getSessionConfig().setReadBufferSize(16384);

    connector.getFilterChain().addLast( "logger", new LoggingFilter() );
    connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TC4CodecFactory() ));

    connector.setHandler(gatewayConnectHandler);
    ConnectFuture future1 = connector.connect(address);

    future1.awaitUninterruptibly();

    if (!future1.isConnected()) {
        return ;
    }

    this.hostSession = future1.getSession(); //Factory should be built here
    gatewayAcceptHandler.setHostSession(this.hostSession);

    //CGS Sign On
    LogTC4.info(this, "CGS Signon");
    CgsSignOn cgsSignOn = new CgsSignOn();
    cgsSignOn.setCgsId("1");
    cgsSignOn.setIpAddress("172.16.25.122");
    cgsSignOn.setTc4InstanceId(2);

    RequestMessage rm;
    rm = new RequestMessage(Message.SYSTEM_REQUEST,ApplicationTransactionHeader.TCDS_EFTS_CODE);
    rm.putSystemDetails(cgsSignOn,6507);

    hostSession.write(rm.getMessage().substring(0,CommunicationHeader.COMMUNICATION_HEADER_LENS));
    hostSession.write(rm.getMessage().substring(CommunicationHeader.COMMUNICATION_HEADER_LENS));

    String messageFromHost = gatewayConnectHandler.getMessageFromHost();
    while (messageFromHost.length() == 0){
        messageFromHost = gatewayConnectHandler.getMessageFromHost();
    }

    ResponseMessage rem = new ResponseMessage(Message.SYSTEM_RESPONSE, messageFromHost);
    LogTC4.info(this,"CGS Sign On Response Code : " + rem.getSystemTransactionResponseCode());
}

public void listenToClients(){
    try{
        /* commented by vskrao on 24/May/2010
        TextLineCodecFactory tlcf = new TextLineCodecFactory( Charset.forName( "UTF-8" )); 
        tlcf.setEncoderMaxLineLength(16384);
        tlcf.setDecoderMaxLineLength(16384);
        */

        acceptor.getFilterChain().addLast( "logger", new LoggingFilter() );
        acceptor.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TC4CodecFactory()));

        acceptor.setHandler( this.gatewayAcceptHandler);
        //comment by vskrao.... acceptor.getSessionConfig().setReadBufferSize( 2048 );
        acceptor.getSessionConfig().setIdleTime( IdleStatus.BOTH_IDLE, 10 );
        acceptor.bind( new InetSocketAddress(PORT));

        SocketAddress localAddr = acceptor.getLocalAddress();

        LogTC4.info(this,"***CGS Local Address     :" + localAddr.toString());
        LogTC4.info(this,"***CGS Listening on port :" + PORT);

    }
    catch(Exception e){
        e.printStackTrace();
    }
}

public void finalize(){
    //Shutdown CGS gracefully here
    //send CGS signoff here
    hostSession.getCloseFuture().awaitUninterruptibly();
    connector.dispose();
    acceptor.dispose();
}

public TC4GatewayConnectionHandler getGatewayConnectHandler() {
    return gatewayConnectHandler;
}

public void setGatewayConnectHandler(TC4GatewayConnectionHandler gatewayConnectHandler) {
    this.gatewayConnectHandler = gatewayConnectHandler;
}

public NioSocketConnector getConnector() {
    return connector;
}

public void setConnector(NioSocketConnector connector) {
    this.connector = connector;
}

public IoSession getHostSession() {
    return hostSession;
}

public void setHostSession(IoSession hostSession) {
    this.hostSession = hostSession;
}

public TC4GatewayAcceptorHandler getGatewayAcceptHandler() {
    return gatewayAcceptHandler;
}

public void setGatewayAcceptHandler(TC4GatewayAcceptorHandler gatewayAcceptHandler) {
    this.gatewayAcceptHandler = gatewayAcceptHandler;
}

public IoAcceptor getAcceptor() {
    return acceptor;
}

public void setAcceptor(IoAcceptor acceptor) {
    this.acceptor = acceptor;
}

}

2) TC4GatewayAcceptorHandler.java

package com.tc4.communication;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

import com.tc4.api.CommunicationHeader;
import com.tc4.logging.LogTC4;

public class TC4GatewayAcceptorHandler extends IoHandlerAdapter
{
private boolean readyForNextRead = false;
private IoSession hostSession;
private TC4GatewayConnectionHandler gatewayConnectHandler;


public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
{
    cause.printStackTrace();
}

public void messageSent( IoSession session, Object message ) throws Exception
{
    //LogTC4.info(this,"**Message To JTC4");
    LogTC4.debug(this, "**CGS RF:" + message.toString());
    readyForNextRead = false;
}

public void messageReceived( IoSession session, Object message ) throws Exception
{
    //LogTC4.info(this,"**Message From JTC4");
    LogTC4.debug(this, "**CGS TR:" + message.toString());
    Thread.sleep(100);
    //Forward this to hostSession
    hostSession.write(message.toString().substring(0,CommunicationHeader.COMMUNICATION_HEADER_LENS));
    hostSession.write(message.toString().substring(CommunicationHeader.COMMUNICATION_HEADER_LENS));
    Thread.sleep(100);
}

public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
{
    //LogTC4.info(this, "CGS Acceptor IDLE " + session.getIdleCount( status ));
}

public void sessionClosed(IoSession session){
    LogTC4.info(this, "CGS: Closed Client Connection");
    gatewayConnectHandler.setClientSession(null);
}

public void sessionOpened(IoSession session){
    LogTC4.info(this, "CGS: Accepted Client Connection");
    gatewayConnectHandler.setClientSession(session);
}

public boolean isReadyForNextRead() {
    return readyForNextRead;
}

public void setReadyForNextRead(boolean readyForNextRead) {
    this.readyForNextRead = readyForNextRead;
}

public IoSession getHostSession() {
    return hostSession;
}

public void setHostSession(IoSession hostSession) {
    this.hostSession = hostSession;
}

public TC4GatewayConnectionHandler getGatewayConnectHandler() {
    return gatewayConnectHandler;
}

public void setGatewayConnetHandler(TC4GatewayConnectionHandler gatewayConnectHandler) {
    this.gatewayConnectHandler = gatewayConnectHandler;
}

}

3) package com.tc4.communication;

import org.apache.mina.core.session.IdleStatus;
import org.apache.mina.core.service.IoHandlerAdapter;
import org.apache.mina.core.session.IoSession;

import com.tc4.api.CommunicationHeader;
import com.tc4.logging.LogTC4;

public class TC4GatewayConnectionHandler extends IoHandlerAdapter
{
private boolean readyForNextRead = false;
private String messageFromCgs= "";
private String messageToCgs = "";
private IoSession clientSession;
private static String messageFromHost="";


@Override
public void exceptionCaught( IoSession session, Throwable cause ) throws Exception
{
    cause.printStackTrace();
}
@Override
public void messageSent( IoSession session, Object message ) throws Exception
{
    //LogTC4.info(this, "**Message To Host");
    LogTC4.debug(this, "**CGS TF:" + message.toString());
    //readyForNextRead = false;
}

@Override
public void messageReceived( IoSession session, Object message ) throws Exception
{
    //LogTC4.info(this, "**Message From Host");
    LogTC4.debug(this, "**CGS RR:" + message.toString());

    messageFromHost = message.toString();
    //readyForNextRead = true;
    LogTC4.debug(this, "Message Session ID " + session);
    LogTC4.debug(this, "CGS Session ID " + clientSession);
    if (clientSession != null){
        Thread.sleep(100);

        // Since we get entire message from Host at one go due to TC4CommunicationDecoder
        // We will relay the same way to the client as well

        // First write the communication header
        String str = message.toString().substring(0,CommunicationHeader.COMMUNICATION_HEADER_LENS);
        clientSession.write(str);
        Thread.sleep(100);

        // Secondly, write the actual message
        str = message.toString().substring(CommunicationHeader.COMMUNICATION_HEADER_LENS);
        clientSession.write(str);
    }

}

@Override
public void sessionIdle( IoSession session, IdleStatus status ) throws Exception
{
    //System.out.println( "Connection To Host IDLE " + session.getIdleCount( status ));
}

public void sessionClosed(IoSession session){
    System.out.println( "Connection To Host Closed");
}
public void sessionOpened(IoSession session) throws Exception{
    System.out.println( "Connection To Host Opened");
}

public boolean isReadyForNextRead() {
    return readyForNextRead;
}
public void setReadyForNextRead(boolean readyForNextRead) {
    this.readyForNextRead = readyForNextRead;
}
public String getMessageFromCgs() {
    return messageFromCgs;
}
public void setMessageFromCgs(String messageFromCgs) {
    this.messageFromCgs = messageFromCgs;
}
public String getMessageToCgs() {
    return messageToCgs;
}
public void setMessageToCgs(String messageToCgs) {
    this.messageToCgs = messageToCgs;
}
public IoSession getClientSession() {
    return clientSession;
}
public void setClientSession(IoSession clientSession) {
    this.clientSession = clientSession;
}
public String getMessageFromHost() {
    return messageFromHost;
}
public void setMessageFromHost(String messageFromHosti) {
    //this.messageFromHost = messageFromHost;
    messageFromHost = messageFromHosti;
}

}

The server program 1package com.tc4.communication;

import java.net.InetSocketAddress; import java.net.SocketAddress;

import javax.faces.context.FacesContext;

import org.apache.mina.core.future.ConnectFuture; import org.apache.mina.core.session.IoSession; import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.logging.LoggingFilter; import org.apache.mina.transport.socket.nio.NioSocketConnector;

import com.tc4.api.CommunicationHeader; import com.tc4.communication.codec.TC4CodecFactory; import com.tc4.logging.LogTC4; import com.tc4.utilities.TC4Properties;

public class JTC4Communication { private IoSession clientSession; private String messageFromClient; // This will be called by controlling programs using set method which will write to CGS private String messageToClient; //this will be called by controlling programs to check whether the response have been received or not from Host/CGS private TC4CommunicationGateway cgs; //private JTC4CommunicationHandler jtch; private NioSocketConnector connector; int used=0;

/* valid=0  sock not created.
 *      =1  socket created.
 *      =-1 socket invalid.
*/
boolean valid=false;

public JTC4Communication(){

    FacesContext fc = FacesContext.getCurrentInstance();
    //Since CGS Signon should happen only once we have configured cgs as application scope
    //and to get access of cgs object already (if present) we have to make this call 
    cgs = (TC4CommunicationGateway) fc.getApplication().getVariableResolver().resolveVariable(fc, "cgs");

    //this.jtch = new JTC4CommunicationHandler();

    messageFromClient = "";
    messageToClient = "";
}

//Establish a connection Will be called from TC4Login
public void connectToCGS(JTC4CommunicationHandler communicationHandler){
    connector = new NioSocketConnector();
    String cgsaddress = TC4Properties.getProperty("cgsaddress");
    Integer cgsport = new Integer(TC4Properties.getProperty("cgsport"));
    SocketAddress address = new InetSocketAddress(cgsaddress, cgsport.intValue()); //new Integer(cgsport)); //"172.16.25.122", 9123

    connector.getFilterChain().addLast( "logger", new LoggingFilter() );
    connector.getFilterChain().addLast( "codec", new ProtocolCodecFilter( new TC4CodecFactory() ));

    connector.setHandler(communicationHandler);
    ConnectFuture future1 = connector.connect(address);

    future1.awaitUninterruptibly();

    if (!future1.isConnected()) {
        return ;
    }
    clientSession = future1.getSession();
    LogTC4.debug(this, "Client Session ID " + clientSession);
    valid = true;

}

public void finalize(){
    clientSession.getCloseFuture().awaitUninterruptibly();

    connector.dispose();
}


public IoSession getClientSession() {
    return clientSession;
}
public void setClientSession(IoSession clientSession) {
    this.clientSession = clientSession;
}
public String getMessageFromClient() {
    return messageFromClient;
}


public void setMessageFromClient(String messageFromClient){
    try{
        this.messageFromClient = messageFromClient;
        Thread.sleep(100);
        // If the client lost connection due to some reason; reconnect
        if(clientSession == null) {
            connectToCGS(new JTC4CommunicationHandler());
        }

        // modified by Kamesh 24/May/10: split communication header and message then send it.
        clientSession.write(messageFromClient.substring(0,CommunicationHeader.COMMUNICATION_HEADER_LENS));
        clientSession.write(messageFromClient.substring(CommunicationHeader.COMMUNICATION_HEADER_LENS));

        LogTC4.debug(this, "CGS Session ID " + clientSession);
        //cgs.getGatewayConnectHandler().setClientSession(clientSession);

        messageToClient = "";
    }catch(Exception e){
        e.printStackTrace();
    }
}


public String getMessageToClient() {
    JTC4CommunicationHandler jtch;
    jtch = (JTC4CommunicationHandler)clientSession.getHandler();
    if (jtch.isMessageReceivedFromCgs()){
        messageToClient = jtch.getMessageReceivedFromCgs();
        return messageToClient;
    }
    else {
        return "";
    }
}
public void setMessageToClient(String messageToClient) {
    this.messageToClient = messageToClient;
}
public TC4CommunicationGateway getCgs() {
    return cgs;
}
public void setCgs(TC4CommunicationGateway cgs) {
    this.cgs = cgs;
}



public void release() throws Exception
{
    try
    {
        LogTC4.error(this,"releasing socket"+clientSession.toString() );
        if (clientSession.isConnected()){
            clientSession.close(true);
        }
    }
    catch(Exception e)
    {
        LogTC4.error(this,"Exception in release",e);
    } finally {

        if (clientSession!=null) {clientSession.close(true);}
    }
}

public boolean isValid()
{
    return valid;
}

public boolean isAlive()
{
    boolean isAlive = false;
    try {
        if (clientSession!=null)
            isAlive = clientSession.isConnected();
        else
            isAlive =false;

    } catch (Exception e) {
        e.printStackTrace();
    }
    return isAlive;
}
public boolean isClosed(){
    if (clientSession!=null)
        return clientSession.isClosing();
    else
        return false;
}

}

Program which acts as both server and client(program 1) accepts multiple connections(each port accepts 10 connections from user programs) and at the same time user programs can fire different transaction requests which will be sent to host for processing and after the processing is over the same is written back to program 1. thr problem here now is to keep track of the response that is received from host belongs to which client. How can i achieve this? any help will be greatly appreciated.