views:

81

answers:

1

I have a problem that I wonder if it can be efficiently implemented in ERLANG. I have a bunch of nodes communicating with each other using a protocol that adds priority information to messages.

I would like to be able to send the highest priority messages first (at the sender) and handle them first at the receiver, before the lower-priority messages. For example, at the receiver, I would have a thread receiving the messages and placing them in a heap of some sort and then another thread would read them from there. However, as I understand, Erlang was build with a "shared-nothing" architecture in mind, so I don't really know if this is doable.

Messages are continuously generated, so whenever the sender generates a high-priority message, it should send it first even if there are lower priority messages in the queue.

I've read some discussions related to priority receive that are rather old [1][2][3] and I wanted to know if the situation has improved since then.

[1] http://www.duomark.com/erlang/briefings/euc2006/pK.html

[2] http://erlang.org/pipermail/erlang-questions/2007-August/028769.html

[3] http://www.erlang.org/pipermail/erlang-questions/2007-July/027726.html

+6  A: 

If your sender and receive are on the same node, the problem is reasonably simple (your process-process bandwidth is practically unlimited) and you can use the nested receive approach:

receive
    {high_prio, Msg} -> process(Msg);
after 0 ->
    receive
        {low_prio, Msg} -> process(Msg)
    end
end.

When you're talking about a sender and receiver processes on different nodes, you may simply be stuck here if your node-node link capacity is not large enough for your message volume (i.e. the link starts buffering messages).

Distributed erlang doesn't make many guarantees, but one it does is that messages between pairs of processes will be delivered in order (barring a link disconnect/reconnect). This means that there is no way to have a high-priority message sent before a low-priority one if the node-node link is backed up.

To get around this you would need to implement your own node-node protocol that would allow you to control the buffering behaviour. The mailing list threads you cite provide some ideas for a solution - keep a priority queue of node-node messages (ordered set ets table), and actually send the messages via gen_tcp sockets. Something like this perhaps:

-module(custom_dist).

send_msg(Target, Prio, Msg) ->
    R = make_ref(),
    ets:insert(custom_dist_queue, {{Prio, R}, Target, Msg}),
    custom_dist_queue ! msg_send_request,
    R.

start_link(RemoteNode) ->
    proc_lib:start_link(?MODULE, init, [RemoteNode]).

init(RemoteNode) ->
    true = erlang:register(custom_dist_queue, self()), 
    {ok, S} = gen_tcp:open(RemoteNode, ?CUSTOM_DIST_PORT, [{packet, 4}]),
    proc_lib:init_ack(ok),
    loop(S).

loop(S) ->
    receive
        msg_send_request -> send_one_msg(S)
    end,
    loop(S).

send_one_msg(S) ->
    {{Prio, R}, Target, Msg} = get_msg_from_queue(),
    gen_tcp:send(S, {R, Target, Msg}),
    ok.

%% Remove highest priority message from queue.    
get_msg_from_queue() ->
    Key = ets:first(custom_dist_table),
    [Obj] = ets:lookup(custom_dist_table, Key),
    ets:delete(custom_dist_table, Key),
    Obj.

In this solution, we're using a small msg_send_request message to the custom_dist_queue server/registered process. This is a notification that there's a message to be sent and that the queue server should get the highest priority message and send that over the gen_tcp link. The listener on the remote node side would need to gen_tcp:recv the {Ref, Target, Msg} messages and do Target ! Msg to deliver them.

I'm undecided whether to use Refs or erlang:now() values as part of the ets table key (for determining priority). I can't remember how refs are going to compare and how this will affect the delivery order of messages at the same priority.


Long answer short: Message priority isn't a common pattern in erlang, so you've got some work to do if you want to implement it. If you want it you need to control the link buffering behaviour, erlang distribution doesn't allow this, so you need to implement a custom distribution protocol.

archaelus