Страницы

среда, 1 февраля 2012 г.

Обмен сообщениями между нодами.

Одной из ключевых особенностей Erlang-а является встроенное межпроцессорное взаимодействие.
Снова используем обычный ноутбук:
Linux user-Vostro-1015 2.6.35-22-generic #35-Ubuntu SMP Sat Oct 16 20:36:48 UTC 2010 i686 GNU/Linux
Памяти 2 GB Intel Core 2 Duo.





Сразу скажу, что модель обмена сообщениями между нодами просто мегоудобное. Что естественно
рождает соблазн использовать этот функционал по полной программе. Реализуется он просто :


-module(test).
-export([rpc/1,start/0]).
start()->
register(n,self()),
io:format("start ~p ~n",[self()]),
loop().

loop()->
receive
{Fun,Param,func}->
Fun(Param),
loop();
Something ->
io:format("P1 ~p ~p ~n",[Something,self()]),
loop()
end
.



Дальше в консоли

erl -name bogdan@localhost.localdomain -s test


и у нас есть процесс который в бесконечном цикле слушает входящие сообщение.
Откроем вторую консоль.

erl -name bogdan2@localhost.localdomain


И выполним что-то типа
rpc:call('bogdan1@localhost.localdomain',test,rpc,[ "test" ])


И вуаля в первой консоли увидим что-то типа :
erl>
P1 test <0 .38.0=".38.0">

А теперь при помощи вот этого простенького модуля посмотрим сколько может принять сообщения
через сетевой интерфейс ( ноды зарегистрированы глобально )

-module(meash_loop).
-export([loop/2,loop/3]).

loop(Fun,List) when is_list(List) ->
First=now(),
lists:map(Fun,List),
Second=now(),
timer:now_diff(Second,First)
;

loop(Fun,Num)->
First=now(),
lists:map(Fun,lists:seq(1, Num) ),
Second=now(),
timer:now_diff(Second,First)
.

loop({Module,Fun},Num,Params)->
First=now(),
lists:map(fun(_Elem)-> Res=erlang:apply(Module,Fun,Params), Res end ,lists:seq(1, Num) ),
Second=now(),
timer:now_diff(Second,First)
;

loop(Fun,Num,Params)->
First=now(),
lists:map(fun(_Elem)-> Res=erlang:apply(Fun,Params), Res end ,lists:seq(1, Num) ),
Second=now(),
timer:now_diff(Second,First)
.



Функция loop запускает цикл выполняющий заданное кол-во раз функцию переданную в качестве аргумента. Итак в первой консоли :

(bogdan@localhost.localdomain)5> F=fun(Elem)-> rpc:call('bogdan1@localhost.localdomain',test,rpc,[ Elem ]) end.
#Fun
(bogdan@localhost.localdomain)6> meash_loop:loop(F,100).
41530
(bogdan@localhost.localdomain)7> meash_loop:loop(F,10000).
2238777
(bogdan@localhost.localdomain)8> meash_loop:loop(F,5000).
1055952
(bogdan@localhost.localdomain)9> meash_loop:loop(F,5000).
965800
(bogdan@localhost.localdomain)10> meash_loop:loop(F,5000).
965607
(bogdan@localhost.localdomain)11> meash_loop:loop(F,5000).
962122
(bogdan@localhost.localdomain)12> meash_loop:loop(F,5000).
961181


Результаты в микросекундах. Видим что ноды обмениваются сообщениями примерно со скоростью в 5000 сообщений в секунду. Конечно для полноценной сети результаты будут другие.
Напоследок еще протестирую на миллионе сообщений.

(bogdan@localhost.localdomain)13> meash_loop:loop(F,1000000).
220595303


Стоит заметить что ноды равномерно распределили между со
бой процессорное время, что не может не радовать.
А вот скорость средняя упала.

8765 bogdan 20 0 95348 42m 2260 S 50 1.4 1:34.63 beam.smp
8729 bogdan 20 0 47000 7376 2220 S 50 0.2 1:34.03 beam.smp



Ну да ладно, а теперь посмотрим как он будет стрелять сообщение на одной ноде.
Eshell V5.8.1 (abort with ^G)
(bogdan@localhost.localdomain)1> S=spawn(test,start,[]).
start <0 .40.0=".40.0">
<0 .40.0=".40.0">
(bogdan@localhost.localdomain)2> F=fun(Elem)-> n ! Elem end.
#Fun
(bogdan@localhost.localdomain)3> meash_loop:loop(F,5000).
64171

(bogdan@localhost.localdomain)9> Mesh1=meash_loop:loop(F,10000).
128594

(bogdan@localhost.localdomain)9> Mesh2=meash_loop:loop(F,100000).
745208


Только это мы с такой скоростью послали сообщения, а вот с какой скоростью их обработали я так понял совсем другой вопрос. Попробуем. Перепишем модуль test.



-module(test).
-export([rpc/1,start/0]).
start()->
register(n,self()),
io:format("start ~p ~n",[self()]),
loop(0).


loop(Sum)->
receive
{Fun,Param,func}->
Fun(Param),
loop(Sum+1);
sum ->
io:format("Sum ~p ~p ~n",[Sum,self()]),
loop(Sum);
Something ->
io:format("P1 ~p ~p ~n",[Something,self()]),
loop(Sum)
end
.

rpc(Elem)->
n ! Elem
.


То есть добавим возможность проверить контрольную сумму. И в консольке сделаем примерно следующее.


(bogdan@localhost.localdomain)1>
(bogdan@localhost.localdomain)1> Self=spawn(test,start,[]).
start <0 .40.0=".40.0">
<0 .40.0=".40.0">
(bogdan@localhost.localdomain)2> F=fun(Elem)-> Elem+Elem end.
#Fun
(bogdan@localhost.localdomain)3>
(bogdan@localhost.localdomain)3> F2=fun(El)-> n! {F,El,func} end.
#Fun
(bogdan@localhost.localdomain)22> TestR=fun(Num)-> T1=now() ,meash_loop:loop(F2,Num), n!sum, T2=now(), timer:now_diff(T2,T1) end.

#Fun
(bogdan@localhost.localdomain)23> TestR(100).
Sum 1121200 <0 .40.0=".40.0">
2596
(bogdan@localhost.localdomain)24> TestR(1000).
Sum 1122200 <0 .40.0=".40.0">
23048
(bogdan@localhost.localdomain)25> TestR(10000).
Sum 1132200 <0 .40.0=".40.0">
138129
(bogdan@localhost.localdomain)26> TestR(100000).
Sum 1232200 <0 .40.0=".40.0">
1360131
(bogdan@localhost.localdomain)27> TestR(100000).
Sum 1332200 <0 .40.0=".40.0">
1445152
(bogdan@localhost.localdomain)28> TestR(100000).
Sum 1432200 <0 .40.0=".40.0">
1549407
(bogdan@localhost.localdomain)29> TestR(100000).
Sum 1532200 <0 .40.0=".40.0">
1261018
(bogdan@localhost.localdomain)30> TestR(100000).
Sum 1632200 <0 .40.0=".40.0">
1311819
(bogdan@localhost.localdomain)31> TestR(900000).
Sum 2532200 <0 .40.0=".40.0">
11922922
(bogdan@localhost.localdomain)32> TestR(90000.)
(bogdan@localhost.localdomain)32> TestR(90000).
* 1: syntax error before: ')'
(bogdan@localhost.localdomain)32> TestR(90000).
1003007
Sum 2622200 <0 .40.0=".40.0">
(bogdan@localhost.localdomain)33> TestR(90000).
Sum 2712200 <0 .40.0=".40.0">
1102958
(bogdan@localhost.localdomain)34> TestR(90000).
Sum 2802200 <0 .40.0=".40.0">
1174438
(bogdan@localhost.localdomain)35> TestR(90000).
Sum 2892200 <0 .40.0=".40.0">
1127670
(bogdan@localhost.localdomain)36>




То есть, на одной ноде скорость обмена сообщениями в секунду упирается в 80-100 тысячь.

Комментариев нет: