UP | HOME

完整实例

Table of Contents

接下来这个示例是一个简单的消息传递者(messager)示例。Messager 是一个允许用登录到不同的结点并向彼此发送消息的应用程序

开始之前,请注意以下几点:

1. 这个示例只演示了消息传递的逻辑,没有提供用户友好的界面(虽然这在 Erlang 是可以做到的)
2. 这类的问题使用 OTP 的工具可以非常方便的实现,还能同时提供线上更新的方法等
3. 这个示例程序并不完整,它没有考虑到结点离开等情况。这个问题在后面的版本会得到修复

Messager 允许 “客户端” 连接到集中的服务器并表明其身份。也就是说,用户并不需要知道另外一个用户所在 Erlang 结点的名称就可以发送消息

-module(messenger).
-export([start_server/0, server/1, logon/1, logoff/0, message/2, client/2]).

%%% Change the function below to return the name of the node where the
%%% messenger server runs
server_node() ->
    messenger@bill.

%%% This is the server process for the "messenger"
%%% the user list has the format [{ClientPid1, Name1},{ClientPid22, Name2},...]
server(User_List) ->
    receive
        {From, logon, Name} ->
            New_User_List = server_logon(From, Name, User_List),
            server(New_User_List);
        {From, logoff} ->
            New_User_List = server_logoff(From, User_List),
            server(New_User_List);
        {From, message_to, To, Message} ->
            server_transfer(From, To, Message, User_List),
            io:format("list is now: ~p~n", [User_List]),
            server(User_List)
    end.

%%% Start the server
start_server() ->
    register(messenger, spawn(messenger, server, [[]])).

%%% Server adds a new user to the user list
server_logon(From, Name, User_List) ->
    %% check if logged on anywhere else
    case lists:keymember(Name, 2, User_List) of
        true ->
            From ! {messenger, stop, user_exists_at_other_node},  %reject logon
            User_List;
        false ->
            From ! {messenger, logged_on},
            [{From, Name} | User_List]        %add user to the list
    end.

%%% Server deletes a user from the user list
server_logoff(From, User_List) ->
    lists:keydelete(From, 1, User_List).

%%% Server transfers a message between user
server_transfer(From, To, Message, User_List) ->
    %% check that the user is logged on and who he is
    case lists:keysearch(From, 1, User_List) of
        false ->
            From ! {messenger, stop, you_are_not_logged_on};
        {value, {From, Name}} ->
            server_transfer(From, Name, To, Message, User_List)
    end.
%%% If the user exists, send the message
server_transfer(From, Name, To, Message, User_List) ->
    %% Find the receiver and send the message
    case lists:keysearch(To, 2, User_List) of
        false ->
            From ! {messenger, receiver_not_found};
        {value, {ToPid, To}} ->
            ToPid ! {message_from, Name, Message}, 
            From ! {messenger, sent} 
    end.

%%% User Commands
logon(Name) ->
    case whereis(mess_client) of 
        undefined ->
            register(mess_client, 
                     spawn(messenger, client, [server_node(), Name]));
        _ -> already_logged_on
    end.

logoff() ->
    mess_client ! logoff.

message(ToName, Message) ->
    case whereis(mess_client) of % Test if the client is running
        undefined ->
            not_logged_on;
        _ -> mess_client ! {message_to, ToName, Message},
             ok
    end.

%%% The client process which runs on each server node
client(Server_Node, Name) ->
    {messenger, Server_Node} ! {self(), logon, Name},
    await_result(),
    client(Server_Node).

client(Server_Node) ->
    receive
        logoff ->
            {messenger, Server_Node} ! {self(), logoff},
            exit(normal);
        {message_to, ToName, Message} ->
            {messenger, Server_Node} ! {self(), message_to, ToName, Message},
            await_result();
        {message_from, FromName, Message} ->
            io:format("Message from ~p: ~p~n", [FromName, Message])
    end,
    client(Server_Node).

%%% wait for a response from the server
await_result() ->
    receive
        {messenger, stop, Why} -> % Stop the client 
            io:format("~p~n", [Why]),
            exit(normal);
        {messenger, What} ->  % Normal response
            io:format("~p~n", [What])
    end.
先来看看这个例子引入的一些新的概念

尾递归

这里有两个版本的 server_transfer 函数:其中一个有四个参数(server_transfer/4)另外一个有五个参数(server_transfer/5)

%%% Server transfers a message between user
server_transfer(From, To, Message, User_List) ->
    %% check that the user is logged on and who he is
    case lists:keysearch(From, 1, User_List) of
        false ->
            From ! {messenger, stop, you_are_not_logged_on};
        {value, {From, Name}} ->
            server_transfer(From, Name, To, Message, User_List)
    end.
%%% If the user exists, send the message
server_transfer(From, Name, To, Message, User_List) ->
    %% Find the receiver and send the message
    case lists:keysearch(To, 2, User_List) of
        false ->
            From ! {messenger, receiver_not_found};
        {value, {ToPid, To}} ->
            ToPid ! {message_from, Name, Message}, 
            From ! {messenger, sent} 
    end.
请注意:如何让 server_transfer 函数通过 server(User_List) 调用其自身的,这里形成了一个循环

Erlang 编译器非常的聪明,它会将上面的代码优化为一个循环而不是一个非法的递规函数调用

但是它只能是在函数调用后面没有别的代码的情况下才能工作(即尾递规)

list 模块

上面的函数里用到了 lists 模块中 的函数 lists:keymemeber(Key,Position,Lists) 函数遍历列表中的元组:

  • 查看每个元组的指定位置 (Position)处的数据并判断元组该位置是否与 Key 相等
    • 元组中的第一个元素的位置为 1,依次类推
  • 如果发现某个元组的 Position 位置处的元素与 Key 相同,则返回 true,否则返回 false

    3> lists:keymember(a, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]).
    true
    
    4> lists:keymember(p, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]).
    false
    

    lists:keydelete 与 lists:keymember 非常相似,只不过它将删除列表中找到的第一个元组(如果存在),并返回剩余的列表:

    3>  lists:keydelete(a, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]). 
    [{x,y,z},{b,b,b},{q,r,s}]
    

    lists:keysearch 与 lists:keymember 类似,但是它将返回 {value,Tuple_Found} 或者原子值 false。

    4>  lists:keysearch(a, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]). 
    {value,{b,a,c}}
    
    5>  lists:keysearch(p, 2, [{x,y,z},{b,b,b},{b,a,c},{q,r,s}]). 
    false
    
    lists 模块是一个非常有用的模块,推荐通过用户手册仔细研究一下 erl -man lists 
    

退出进程

Erlang 进程(概念上地)会一直运行直到它执行 receive 命令,而此时消息队列中又没有它想接收的消息为止

“概念上地” 是因为 Erlang 系统活跃的进程实际上是共享 CPU 处理时间的

当进程无事可做时,即一个函数调用 return 返回而没有调用另外一个函数时,进程就结束,另外一种终止进程的方式是调用 exit/1 函数

await_result() ->
    receive
        {messenger, stop, Why} -> % Stop the client 
            io:format("~p~n", [Why]),
            exit(normal);
        {messenger, What} ->  % Normal response
            io:format("~p~n", [What])
    end.
exit/1 函数的参数是有特殊含义的,实际上是进程结束的状态值

在这个例子中使用 exit(normal) 结束进程,它与程序因没有再调用函数而终止的效果是一样的

内置函数 whereis(RegisteredName) 用于检查是否已有一个进程注册了进程名称 RegisteredName:

  • 如果已经存在,则返回进程的 进程标识符
  • 如果不存在,则返回原子值 undefined
logon(Name) ->
    case whereis(mess_client) of 
        undefined ->
            register(mess_client, 
                     spawn(messenger, client, [server_node(), Name]));
        _ -> already_logged_on
    end.

logoff() ->
    mess_client ! logoff.

message(ToName, Message) ->
    case whereis(mess_client) of % Test if the client is running
        undefined ->
            not_logged_on;
        _ -> mess_client ! {message_to, ToName, Message},
             ok
end.
到这儿,应该已经可以看懂 messager 模块的大部分代码了

测试

在使用本示例程序之前,需要:

  1. 配置 server_node() 函数
  2. 将编译后的代码(messager.beam)拷贝到每一个你启动了 Erlang 的计算机上。
这接下来的例子中,将在四台不同的计算上启动了 Erlang 结点

如果网络没有那么多的计算机,也可以在同一台计算机上启动多个结点

我在本地测试时启动的四个结点分别为:messager@gentoo,c1@raspberrypi,c2@thinkpad,c3@gentoo 

首先在 meesager@gentoo 上启动服务器程序:

$ erl -sname messenger  
Erlang/OTP 23 [erts-11.1.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]

Eshell V11.1.5  (abort with ^G)
(messenger@gentoo)1> messenger:start_server(). 
true
(messenger@gentoo)2> 

接下来用 peter 是在 c1@raspberrypi 登录:

$ erl -sname c1 
Erlang/OTP 21 [erts-10.2.4] [source] [smp:4:4] [ds:4:4:10] [async-threads:1]

Eshell V10.2.4  (abort with ^G)
(c1@raspberrypi)1>  messenger:logon(peter). 
true
logged_on          

然后 fred 在 c2@thinkpad 上登录:

$ erl -sname c2 
Erlang/OTP 22 [erts-10.6.4] [source] [64-bit] [smp:12:12] [ds:12:12:10] [async-threads:1]

Eshell V10.6.4  (abort with ^G)
(c2@thinkpad)1> messenger:logon(fred). 
true
logged_on       

最后 james 在 c3@gentoo 上登录:

$ erl -sname c3 
Erlang/OTP 23 [erts-11.1.5] [source] [64-bit] [smp:4:4] [ds:4:4:10] [async-threads:1] [hipe]

Eshell V11.1.5  (abort with ^G)
(c3@gentoo)1> messenger:logon(james). 
true
logged_on     

现在,Peter 就可以向 Fred 发送消息了:

(c1@raspberrypi)2> messenger:message(fred, "hello"). 
ok
(c1@raspberrypi)3> 
sent  

Fred 收到消息:

Message from peter: "hello"
(c2@thinkpad)2> 

在 messenger@gentoo 主服务器上可以看到:

(messenger@gentoo)2>
list is now: [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}]
这说明消息是先发送给 messenger进程,然后再转发给 Fred 的 

Fred 回复一个消息给 Peter :

(c2@thinkpad)2> messenger:message(peter, "go away, I'm busy"). 
ok
(c2@thinkpad)3> 
sent            

Peter 结点上显示:

Message from fred: "go away, I'm busy"
(c1@raspberrypi)3> 

同样在服务器节点上再次打印了一遍用户列表:

(messenger@gentoo)2>
list is now: [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}]
list is now: [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}]

现在 Fred 登出服务器:

(c2@thinkpad)3>  messenger:logoff(). 
logoff

随后,James 再向 Fred 发送消息时,则出现下面的情况:

(c1@raspberrypi)3> messenger:message(fred, "peter doesn't like you"). 
ok
(c1@raspberrypi)4> 
receiver_not_found 
因为 Fred 已经离开,所以发送消息失败

messenger@gentoo结点上再次打印的用户列表上也可以看到:

list is now: [{<9162.89.0>,james},{<9158.85.0>,peter}]

详细分析

在上面的例子里有4个Erlang 结点: messenger@gentoo, c1@raspberrypi, c2@thinkpad, c3@gentoo

其中 messenger@gentoo 上手动启动了进程 messenger

而peter 在 c1@raspberrypi,fred 在 c2@thinkpad,james 在 c3@gentoo 上分别登录

每次登录都会使得在 c1@raspberrypi, c2@thinkpad, c3@gentoo 结点上各自运行一个进程 mess_client 

当Peter 在调用 “message” 发送消息时:

messenger:message(fred, "hello")

首先检查用户自身是否在系统中运行(是否可以查找到 mess_client 进程):

whereis(mess_client) 

如果用户存在则将消息发送给 mess_client:

mess_client ! {message_to, fred, "hello"}

客户端通过下面的代码将消息发送到服务器:

{messenger, messenger@gentoo} ! {self(), message_to, fred, "hello"},

然后等待服务器的回复。 服务器收到消息后将调用:

server_transfer(<9158.85.0>, fred, "hello", [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}]), 

接下来,用下面的代码检查进程标识符 From 是否在 User_Lists 列表中:

lists:keysearch(<9158.85.0>, 1, [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}])   
如果 keysearch 返回原子值 false ,则出现的某种错误,服务将返回如下消息:

From ! {messenger, stop, you_are_not_logged_on}

client 收到这个消息后,则执行 exit(normal) 然后终止程序

这里 keysearch 返回的是 {value,{<9158.85.0>, peter}} ,则可以确定该用户已经登录,并其名字(peter)存储在变量 Name 中

server_transfer(<9158.85.0>, peter, fred, "hello", [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}])
注意:这次是函数 server_transfer/5,与刚才的 server_transfer/4 不是同一个函数

还会再次调用 keysearch 函数用于在 User_List 中查找与 fred 对应的进程标识符:

lists:keysearch(fred, 2, [{<9162.89.0>,james},{<9160.86.0>,fred},{<9158.85.0>,peter}])

这里用到了参数 2,因为要查找的是元组中的第二个元素

如果返回的是原子值 false,则说明 fred 已经登出,服务器将向发送消息的进程发送如下消息:

{<9158.85.0>,peter} ! {messenger, receiver_not_found};

peter 就会收到该消息 receiver_not_found 

这次 keysearch 返回值为: {value, {<9160.86.0>, fred}} 。所以将下面的消息发送给 fred 客户端:

<9160.86.0> ! {message_from, peter, "hello"}, 

而如下的消息会发送给 peter 的客户端:

<9158.85.0> ! {messenger, sent} 

fred 客户端收到消息后将其输出:

{message_from, peter, "hello"} ->
    io:format("Message from ~p: ~p~n", [peter, "hello"])

peter 客户端在 await_result 函数中收到回复的消息:

{messenger, sent} ->  % Normal response
    io:format("~p~n", [sent])

Next:容错编程

Previous:分布式编程

Home:并发编程