The problem is fairly odd to me although I'm a newbie.
Whats going on is that if you force the server under heavy load of connections and keep sending a invalid packet that doesn't represent POLICY_XML packet.
Pretty much what I'm trying to say is that if you connect it goes into socket READ OPERATION. Then you never go into send() which changes SelectionKey into WRITE OPERATION. Somehow the read operations stack up and after 2000 or so connection requests the server will stop accepting connections, no matter what.. I've tried to connect with telnet and always fail to make a connection.. But after around 5 minutes it starts accepting connections again and becomes fully functional.
Very strange problem but note that if I remove the packet matching statement and it will act similarly to a echo server then it will run endlessly without experiencing any connection accepting issues pretty much turns stable.
I've attached the whole server source code on the bottom can someone who has extensive knowledge with NIO please check it out and let me know if there is a way to fix it.
All that really catches my eye is the selector wakeup in send() which may fix everything after putting the line below into the read() it seems to do absolutely nothing and the problem remains.
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
Here is the source for the simple server.
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
public class PolicyServer implements Runnable {
public static final String POLICY_REQUEST = "<policy-file-request/>";
public static final String POLICY_XML =
"<?xml version=\"1.0\"?>"
+ "<cross-domain-policy>"
+ "<allow-access-from domain=\"*\" to-ports=\"*\" />"
+ "</cross-domain-policy>"
+ (char)0;
// The host:port combination to listen on
private InetAddress hostAddress;
private int port;
// The channel on which we'll accept connections
private ServerSocketChannel serverChannel;
// The selector we'll be monitoring
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(255);
// This decodes raw bytes into ascii data.
private CharsetDecoder asciiDecoder;
// A list of PendingChange instances
private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map<SocketChannel, List<ByteBuffer>> pendingData = new HashMap<SocketChannel, List<ByteBuffer>>();
public PolicyServer(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.selector = this.initSelector();
this.asciiDecoder = Charset.forName("US-ASCII").newDecoder().onMalformedInput(
CodingErrorAction.REPLACE).onUnmappableCharacter(
CodingErrorAction.REPLACE);
}
public void send(SocketChannel socket, byte[] data) {
synchronized (this.pendingChanges) {
// Indicate we want the interest ops set changed
this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
// And queue the data we want written
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
if (queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run() {
while (true) {
try {
// Process any pending changes
synchronized (this.pendingChanges) {
Iterator changes = this.pendingChanges.iterator();
while (changes.hasNext()) {
ChangeRequest change = (ChangeRequest) changes.next();
changes.remove();
if(change == null) continue;
switch (change.type) {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(this.selector);
try {
if(key!=null)
key.interestOps(change.ops);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
}
this.pendingChanges.clear();
}
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
key.cancel();
continue;
}
// Check what event is available and deal with it
try {
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
} catch(IOException io) {
this.pendingData.remove(key.channel());
try {
((SocketChannel)key.channel()).socket().close();
} catch (IOException e) {}
key.channel().close();
key.cancel();
key.attach(null);
key = null;
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
// also contains a attachment of a new StringBuffer (for storing imcomplete/multi packets)
socketChannel.register(this.selector, SelectionKey.OP_READ, new StringBuffer());
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// Attempt to read off the channel
int numRead = socketChannel.read(this.readBuffer);
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
throw new IOException("");
}
// Grab the StringBuffer we stored as the attachment
StringBuffer sb = (StringBuffer)key.attachment();
// Flips the readBuffer by setting the current position of
// packet stream to beginning.
// Append the data to the attachment StringBuffer
this.readBuffer.flip();
sb.append(this.asciiDecoder.decode(this.readBuffer).toString());
this.readBuffer.clear();
// Get the policy request as complete packet
if(sb.indexOf("\0") != -1) {
String packets = new String(sb.substring(0, sb.lastIndexOf("\0")+1));
sb.delete(0, sb.lastIndexOf("\0")+1);
if(packets.indexOf(POLICY_REQUEST) != -1)
send(socketChannel, POLICY_XML.getBytes());
} else if(sb.length() > 8192) {
sb.setLength(0);
//Force disconnect.
throw new IOException("");
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socketChannel);
if(queue == null || queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
try {
if (key!=null)
key.interestOps(SelectionKey.OP_READ);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
socketChannel.write(buf);
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
break;
}
queue.remove(0);
}
}
}
private Selector initSelector() throws IOException {
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// Create a new non-blocking server socket channel
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
serverChannel.socket().bind(isa);
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) {
try {
new Thread(new PolicyServer(null, 5556)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
import java.nio.channels.SocketChannel;
public class ChangeRequest {
public static final int CHANGEOPS = 1;
public SocketChannel socket;
public int type;
public int ops;
public ChangeRequest(SocketChannel socket, int type, int ops) {
this.socket = socket;
this.type = type;
this.ops = ops;
}
}
Thanks in advanced.