Hello I been trying to find this memory leak for a while now and no luck.
What I have is a server that serves requests for a flash client security acceptance. It only sends out one packet nothing more but has the power to hold over 10,000 concurrent connections? maybe 65,534 if it has too.
Anyways after serving about 210,000+ users in 24 hours the memory usage went up to 74,000 MB from 12,000 MB. I thought it would stabilize at that and stay thinking maybe its some stuff that got unpacked registered and will be re-used. But after 48 hours it's up to 90 MB and at this rate i'll have to restart the server every 2 days to prevent OutOfMemoryException (Haven't got it yet but I know i'll get it with only 300 MB of ram to spare).
Anyways if anyone has any time and would like to help me out i'd really appreciate it, here is the source.
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.CharsetDecoder;
import java.nio.charset.CodingErrorAction;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.channels.spi.SelectorProvider;
import java.util.*;
public class PolicyServer implements Runnable {
public static final String POLICY_REQUEST = "<policy-file-request/>";
public static final String POLICY_XML =
"<?xml version=\"1.0\"?>"
+ "<cross-domain-policy>"
+ "<allow-access-from domain=\"*\" to-ports=\"*\" />"
+ "</cross-domain-policy>"
+ (char)0;
// The host:port combination to listen on
private InetAddress hostAddress;
private int port;
// The channel on which we'll accept connections
private ServerSocketChannel serverChannel;
// The selector we'll be monitoring
private Selector selector;
// The buffer into which we'll read data when it's available
private ByteBuffer readBuffer = ByteBuffer.allocate(255);
// This decodes raw bytes into ascii data.
private CharsetDecoder asciiDecoder;
// A list of PendingChange instances
private List<ChangeRequest> pendingChanges = new LinkedList<ChangeRequest>();
// Maps a SocketChannel to a list of ByteBuffer instances
private Map<SocketChannel, List<ByteBuffer>> pendingData = new HashMap<SocketChannel, List<ByteBuffer>>();
public PolicyServer(InetAddress hostAddress, int port) throws IOException {
this.hostAddress = hostAddress;
this.port = port;
this.selector = this.initSelector();
this.asciiDecoder = Charset.forName("US-ASCII").newDecoder().onMalformedInput(
CodingErrorAction.REPLACE).onUnmappableCharacter(
CodingErrorAction.REPLACE);
}
public void send(SocketChannel socket, byte[] data) {
synchronized (this.pendingChanges) {
// Indicate we want the interest ops set changed
this.pendingChanges.add(new ChangeRequest(socket, ChangeRequest.CHANGEOPS, SelectionKey.OP_WRITE));
// And queue the data we want written
synchronized (this.pendingData) {
List<ByteBuffer> queue = (List<ByteBuffer>) this.pendingData.get(socket);
if (queue == null) {
queue = new ArrayList<ByteBuffer>();
this.pendingData.put(socket, queue);
}
queue.add(ByteBuffer.wrap(data));
}
}
// Finally, wake up our selecting thread so it can make the required changes
this.selector.wakeup();
}
public void run() {
while (true) {
try {
// Process any pending changes
synchronized (this.pendingChanges) {
Iterator changes = this.pendingChanges.iterator();
while (changes.hasNext()) {
ChangeRequest change = (ChangeRequest) changes.next();
if(change == null) continue;
switch (change.type) {
case ChangeRequest.CHANGEOPS:
SelectionKey key = change.socket.keyFor(this.selector);
try {
if(key!=null)
key.interestOps(change.ops);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
}
this.pendingChanges.clear();
}
// Wait for an event one of the registered channels
this.selector.select();
// Iterate over the set of keys for which events are available
Iterator selectedKeys = this.selector.selectedKeys().iterator();
while (selectedKeys.hasNext()) {
SelectionKey key = (SelectionKey) selectedKeys.next();
selectedKeys.remove();
if (!key.isValid()) {
continue;
}
// Check what event is available and deal with it
if (key.isAcceptable()) {
this.accept(key);
} else if (key.isReadable()) {
this.read(key);
} else if (key.isWritable()) {
this.write(key);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
private void accept(SelectionKey key) throws IOException {
// For an accept to be pending the channel must be a server socket channel.
ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
// Accept the connection and make it non-blocking
SocketChannel socketChannel = serverSocketChannel.accept();
Socket socket = socketChannel.socket();
socketChannel.configureBlocking(false);
// Register the new SocketChannel with our Selector, indicating
// we'd like to be notified when there's data waiting to be read
// also contains a attachment of a new StringBuffer (for storing imcomplete/multi packets)
socketChannel.register(this.selector, SelectionKey.OP_READ, new StringBuffer());
}
private void read(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
// Clear out our read buffer so it's ready for new data
this.readBuffer.clear();
// Attempt to read off the channel
int numRead;
try {
numRead = socketChannel.read(this.readBuffer);
} catch (IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
if (numRead == -1) {
// Remote entity shut the socket down cleanly. Do the
// same from our end and cancel the channel.
key.channel().close();
key.cancel();
return;
}
// Grab the StringBuffer we stored as the attachment
StringBuffer sb = (StringBuffer)key.attachment();
// Flips the readBuffer by setting the current position of
// packet stream to beginning.
// Append the data to the attachment StringBuffer
this.readBuffer.flip();
sb.append(this.asciiDecoder.decode(this.readBuffer).toString());
this.readBuffer.clear();
// Get the policy request as complete packet
if(sb.indexOf("\0") != -1) {
String packets = sb.substring(0, sb.lastIndexOf("\0")+1);
sb.delete(0, sb.lastIndexOf("\0")+1);
if(packets.indexOf(POLICY_REQUEST) != -1)
send(socketChannel, POLICY_XML.getBytes());
}
}
private void write(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
synchronized (this.pendingData) {
List queue = (List) this.pendingData.get(socketChannel);
// Write until there's not more data ...
while (!queue.isEmpty()) {
ByteBuffer buf = (ByteBuffer) queue.get(0);
try {
socketChannel.write(buf);
if (buf.remaining() > 0) {
// ... or the socket's buffer fills up
break;
}
} catch(IOException e) {
// The remote forcibly closed the connection, cancel
// the selection key and close the channel.
key.cancel();
socketChannel.close();
return;
}
queue.remove(0);
}
if (queue.isEmpty()) {
// We wrote away all data, so we're no longer interested
// in writing on this socket. Switch back to waiting for
// data.
try {
if (key!=null)
key.interestOps(SelectionKey.OP_READ);
} catch(Exception ex) {
if (key!=null)
key.cancel();
}
}
}
}
private Selector initSelector() throws IOException {
// Create a new selector
Selector socketSelector = SelectorProvider.provider().openSelector();
// Create a new non-blocking server socket channel
this.serverChannel = ServerSocketChannel.open();
serverChannel.configureBlocking(false);
// Bind the server socket to the specified address and port
InetSocketAddress isa = new InetSocketAddress(this.hostAddress, this.port);
serverChannel.socket().bind(isa);
// Register the server socket channel, indicating an interest in
// accepting new connections
serverChannel.register(socketSelector, SelectionKey.OP_ACCEPT);
return socketSelector;
}
public static void main(String[] args) {
try {
new Thread(new PolicyServer(null, 5556)).start();
} catch (IOException e) {
e.printStackTrace();
}
}
}
Here is the ChangeRequest class
import java.nio.channels.SocketChannel;
public class ChangeRequest {
public static final int REGISTER = 1;
public static final int CHANGEOPS = 2;
public SocketChannel socket;
public int type;
public int ops;
public ChangeRequest(SocketChannel socket, int type, int ops) {
this.socket = socket;
this.type = type;
this.ops = ops;
}
}