UP | HOME

并发原语

Table of Contents

可以联想一下房间里的人:

人就是进程。房间里的人都有他们的私人记忆,进程也是如此

要改变你的记忆,就需要我说给你听。这就是发送和接收消息

我们有了小孩,这就是分裂(spawn)

我们死了,就是进程退出

了解顺序Erlang后,编写并发程序就很简单了。只需要三个新的基本函数: spawn+sendreceive

Erlang的并发是基于 进程 ( process )的。进程是一些独立的小型虚拟机,可以执行Erlang函数

你肯定曾经接触过进程,但仅仅是在操作系统的上下文环境里

在Erlang里,进程隶属于编程语言,而非操作系统

这就意味着Erlang的进程在任何操作系统上都会具有相同的逻辑行为,这样就能编写可移植的并发代码,让它在任何支持Erlang的操作系统上运行 

在Erlang里:

出于这些原因,Erlang有时会被称为是一种 纯消息传递式 语言

如果你没有进程编程的经验,可能听说过它很有难度的传言。你多半听过一些恐怖故事,涉及内存冲突、竞争状况、共享内存破坏等等

但在Erlang里,进程编程是很简单的

基本并发函数

在顺序编程里学到的知识同样适用于并发编程。要做的只是加上下面这几个基本函数

创建进程

Pid = spawn(Mod, Func, Args) 

创建 一个新的 并发进程 来执行 apply(Mod, Func, Args)

  • 这个新进程和调用进程 并发 运行
  • spawn 返回一个 进程标识符 Pid (process identifier的简称)
    • 可以用 Pid 来给此进程发送消息
  • 元数为 length(Args) 的 Func 函数 必须从 Mod 模块导出
注意: 当一个新进程被创建后,会使用最新版的代码定义模块 

另外一种形式的spawn调用:

Pid = spawn(Func)

创建一个新的并发进程来执行 Fun() :这种形式的 spawn 总是使用 被执行fun的当前值 ,而且这个fun 无需从模块里导出

这两种 spawn 形式的本质区别与动态代码升级有关

发送消息

Pid ! Message

向标识符为 Pid 的进程 发送 消息 Message

  • 消息发送是 异步 的。 发送方并不等待 ,而是会继续之前的工作
  • ! 被称为 发送操作符
  • Pid ! M 的返回值是 M
Pid1 ! Pid2 !...! Msg 的意思是把消息 Msg 发送给 Pid1 、Pid2 等所有进程

接收消息

接收发送给某个进程的消息。它的语法如下:

receive 
    Pattern1 [when Guard1] ->
        Expression1;
    Pattern2 [when Guard2] ->
        Expression2;
    ...
end 

当某个消息到达进程后:

  • 系统会尝试将它与 Pattern1 (以及可选的关卡 Guard1 )匹配,如果成功就执行 Expressions1
  • 如果第一个模式不匹配,就会尝试 Pattern2
  • 以此类推
  • 如果没有匹配的模式,消息就会被保存起来供以后处理,进程则会开始等待下一条消息

实例

首先来看一个的计算面积的代码:

-module(geometry).  
-export([area/1]). 

area({rectangle, Width, Height}) -> Width * Height;
area({square, Side})             -> Side * Side.

现在把这个函数改写成一个进程。为此从 area 函数的参数里取了两个模式,然后把它们重置为接收语句里的模式:

-module(area_server0).  
-export([loop/0]). 

loop() ->
    receive
        {rectangle, Width, Ht} -> 
            io:format("Area of rectangle is ~p~n",[Width * Ht]),
            loop();
        {square, Side} -> 
            io:format("Area of square is ~p~n", [Side * Side]),
            loop()
    end.

测试

在eshell 里创建一个 area_server0 模块 loop 函数的并发进程,然后返回 Pid ,也就是打印出来的 <0.88.0>

1> c (area_server0) . 
{ok,area_server0}
2> 
2> Pid = spawn(area_server0, loop, []) . 
<0.88.0>

向这个进程发送了一个消息

3> Pid ! {rectangle, 6, 10} . 
Area of rectangle is 60
{rectangle,6,10}

这个消息匹配 loop/0 接收语句里的第一个模式:

receive
    {rectangle, Width, Ht} -> 
        io:format("Area of rectangle is ~p~n",[Width * Ht]),
        loop();

收到消息之后,这个进程打印出矩形的面积。最后,shell打印出 {rectangle, 6, 10}

这是因为 Pid ! Msg 的值被定义为 Msg 

同样也可以用它来计算正方形:

4> Pid ! {square, 12} .       
Area of square is 144
{square,12}

总结

到目前为止粗略介绍了 spawn 、 send 和 receive 的工作方式:

  • 当 spawn 命令被执行时,系统会创建一个新的进程,每个进程都带有一个邮箱,这个邮箱是和进程同步创建的
  • 给某个进程发送消息后,消息会被放入该进程的邮箱
  • 只有当程序执行一条接收语句时才会读取邮箱
好了,就是这些。不需要线程、锁、信号和人工控制

客户端-服务器

传统的客户端-服务器架构是指一个分隔客户端与服务器的网络:

  • 大多数情况下客户端会有多个实例,而服务器只有一个
  • 服务器这个词经常会让人联想到专业机器上运行重量级软件的画面

    客户端-服务器架构是Erlang的中心
    

Erlang 的实现机制则要轻量得多。客户端-服务器架构里的 客户端服务器 是不同的 进程 ,它们之间的 通信 使用普通的Erlang 消息传递 机制

客户端和服务器可以运行在同一台机器上,也可以运行在不同的机器上

客户端服务器 这两个词是指这两种进程所扮演的角色:

  • 客户端总是通过向服务器发送一个请求来发起计算
  • 服务器计算后生成回复,然后发送一个响应给客户端

实例

下面来编写第一个客户端-服务器应用程序

-module(area_server1).  
-export([loop/0, rpc/2]). 
rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        Response ->
            Response
    end.
loop() ->
    receive
        {From, {rectangle, Width, Ht}} -> 
            From ! Width * Ht,
            loop();
        {From, {circle, R}} -> 
            From !  3.14159 * R * R,
            loop();
        {From, Other} ->
            From ! {error,Other},
            loop()
    end.

首先,对上一节里编写的程序做一些小的修改

在上一个程序里,我们只需要向某个进程发送请求,然后接收它并打印出来,现在要做的是向发送原请求的进程发送一个响应

问题是,我们不知道该把响应发给谁。要发送一个响应,客户端必须加入一个服务器可以回复的地址

这就像是给某人写信,如果你想得到回复,最好把你的地址写在信中!

因此,发送方必须加入一个回复地址。要做到这一点,可以把:

Pid ! {rectangle, 6, 10} . 

修改成下面这样, self()客户端进程标识符

Pid ! {self(), {rectangle, 6, 10}} . 

因为消息发送格式变了,必须把接收请求的代码从:

loop() ->
    receive
        {rectangle, Width, Ht} -> 
            io:format("Area of rectangle is ~p~n",[Width * Ht]),
            loop();
         % ......

改成为:

loop() ->
    receive
        {From, {rectangle, Width, Ht}} -> 
            From ! Width * Ht,
            loop();
        % .....

注意:如何把计算结果发回由 From 参数指定的进程的

因为客户端把这个参数设置成它自己的ID,所以能收到结果
  • 发送请求的进程 通常称为 客户端
  • 接收请求并回复客户端的进程 称为 服务器
另外,最佳实践是确认发送给进程的每一个消息都已收到

如果发送给进程的消息不匹配原始接收语句里的任何一个模式,这条消息就会遗留在进程邮箱里,永远无法接收

为了解决这个问题,在接收语句的最后加了一个子句,让它能匹配所有发送给此进程的消息

最后添加一个名为 rpc (即远程过程调用)的实用小函数, 它封装了向服务器发送请求和等待响应的代码

rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        Response ->
            Response
    end.

测试

可以在shell里试验一下它:

1> c (area_server1) . 
{ok,area_server1}
2> 
2> Pid = spawn(area_server1, loop, []) . 
<0.88.0>
3> area_server1:rpc(Pid, {rectangle, 6, 8}) .  
48
4>  
4> area_server1:rpc(Pid, {circle, 6}) .        
113.09723999999999
5>   
5> area_server1:rpc(Pid, socks) .         
{error,socks}
这段代码有个小问题:

在 rpc/2 函数里,我们向服务器发送请求然后等待响应,但我们并不是等待来自服务器的响应,而是在等待任意消息

如果其他某个进程在客户端等待来自服务器的消息时向它发送了一个消息,客户端就会将此消息错误解读为来自服务器的响应

改进

要纠正这个问题,可以把接收语句的形式修改如下:

loop() ->
    receive 
        {From, ...} ->
            From ! {self(), ...},
            loop();
        %... ->
    end .  

再把 rpc 改成:

rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->
            Response
    end.

调用 rpc 函数时, Pid 会被绑定为某个值,因此 {Pid, Response} 这个模式里的 Pid 已绑定,而 Response 未绑定这个模式只会匹配包含一个双元素元组(第一个元素是 Pid )的消息。所有别的消息都会进入队列

receive 提供了选择性接收的功能,会在后面介绍

改进后的代码:

-module(area_server1).  
-export([loop/0, rpc/2]). 
rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        Response ->
            Response
    end.

loop() ->
    receive
        {From, {rectangle, Width, Ht}} -> 
            From ! Width * Ht,
            loop();
        {From, {circle, R}} -> 
            From !  3.14159 * R * R,
            loop();
        {From, Other} ->
            From ! {error,Other},
            loop()
    end.

测试下:

1> Pid = spawn(area_server2, loop, []) . 
<0.83.0>
2> 
2> area_server2:rpc(Pid, {circle, 5}) . 
78.53975

封装

最后一点可改进的地方。可以把 rpc 和 spawn 隐藏在模块内

这是一种好的做法,因为它能让我们在不改变客户端代码的情况下修改服务器的内部细节

最终的代码如下:

-module(area_server_final).  
-export([start/0, area/2, loop/0]). 

start() -> spawn(area_server_final, loop, []).

area(Pid, What) ->
    rpc(Pid, What).
rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->
            Response
    end.
loop() ->
    receive
        {From, {rectangle, Width, Ht}} -> 
            From ! {self(), Width * Ht},
            loop();
        {From, {circle, R}} -> 
            From !  {self(), 3.14159 * R * R},
            loop();
        {From, Other} ->
            From ! {self(), {error,Other}},
            loop()
    end.
注意:还需要把 spawn的参数(也就是 loop/0 )从模块中 导出 

调用函数 start/0area/2 (之前称为 spawn 和 rpc )来运行它:

1> Pid = area_server_final:start() . 
<0.83.0>
2> 
2> area_server_final:area(Pid, {rectangle, 10, 8}) .  
80
3>  
3> area_server_final:area(Pid, {circle, 4}) .         
50.26544
4>   
4> area_server_final:area(Pid, socks) .         
{error,socks}
这些新名称更好一些,因为它们能更准确地描述服务器的行为

总结

这样就完成了一个简单的客户端-服务器模块。所需要的就是三个基本函数: spawn 、 send 和receive

这种模式会以各类变种的形式不断重复出现,变化虽然可大可小,但基本的概念是不变的

进程很轻巧

在这个阶段,你可能会担心性能问题。毕竟,如果创建数百或者数千个Erlang进程,就必须付出一定的代价。让我们来看看代价有多大

现在将执行一些分裂操作,创建大量的进程,并计算要花费多长时间。下面是一个程序:

-module(processes).

-export([max/1]).

%% max(N) 

%%   Create N processes then destroy them
%%   See how much time this takes

max(N) ->
    Max = erlang:system_info(process_limit),
    io:format("Maximum allowed processes:~p~n",[Max]),
    statistics(runtime),
    statistics(wall_clock),
    L = for(1, N, fun() -> spawn(fun() -> wait() end) end),
    {_, Time1} = statistics(runtime),
    {_, Time2} = statistics(wall_clock),
    lists:foreach(fun(Pid) -> Pid ! die end, L),
    U1 = Time1 * 1000 / N,
    U2 = Time2 * 1000 / N,
    io:format("Process spawn time=~p (~p) microseconds~n",
              [U1, U2]).

wait() ->
    receive
        die -> void
    end.

for(N, N, F) -> [F()];
for(I, N, F) -> [F()|for(I+1, N, F)].
请注意在这里用的是 spawn(Fun) ,并且被创建的函数并不需要从模块里导出

下面的结果源于我现在所用的计算机:

2> processes:max(20000) . 
Maximum allowed processes:262144
Process spawn time=0.0 (1.95) microseconds
ok
3> processes:max(300000) . 
Maximum allowed processes:262144

=ERROR REPORT==== 8-Mar-2021::10:50:30 ===
Error in process <0.25.4648> with exit value:
{system_limit,[{erlang,spawn_link,
                       [erlang,apply,[#Fun<shell.1.83096281>,[]]],
                       []},
               {erlang,spawn_link,1,[]},
               {shell,get_command,5,[{file,"shell.erl"},{line,299}]},
               {shell,server_loop,7,[{file,"shell.erl"},{line,230}]}]}
Eshell V7.3  (abort with ^G)
*** ERROR: Shell process terminated! ***

=ERROR REPORT==== 8-Mar-2021::10:50:30 ===
Too many processes

创建 20000个进程平均消耗了0.0微秒/进程的CPU时间和1.95微秒/进程的实际运行时间

请注意使用了内置函数 erlang:system_info(process_limit) 来找出所允许的最大进程数量

其中有一些是系统保留的进程,所以你的程序实际上不能用那么多

当超出限制值时,系统会拒绝启动更多的进程并生成一个错误报告(见第2个命令)

系统内设的限制值是262 144个进程。要超越这一限制,必须用 +P 标识启动Erlang仿真器如下:

$ erl +P 3000000 
Erlang/OTP 18 [erts-7.3] [source] [64-bit] [smp:6:6] [async-threads:10] [kernel-poll:false]

Eshell V7.3  (abort with ^G)
1> processes:max(500000) .  
Maximum allowed processes:4194304
Process spawn time=1.36 (4.388) microseconds
ok
2>  
2> processes:max(1000000) .  
Maximum allowed processes:4194304
Process spawn time=1.53 (4.811) microseconds
ok
3>  
3> processes:max(2000000) .  
Maximum allowed processes:4194304
Process spawn time=1.945 (3.928) microseconds
ok
4>  
4> processes:max(3000000) .  
Maximum allowed processes:4194304
Process spawn time=2.3833333333333333 (4.996666666666667) microseconds
ok

在前面的例子里,系统实际选择的值是恰好 大于参数的2的幂 。这个实际值可以通过调用 erlang:system_info(process_limit) 获得。可以看到,随着进程数量的增加,进程创建的时间也在增加。如果继续增加进程的数量,最终会耗尽物理内存,导致系统开始把物理内存交换到硬盘上,运行速度明显变慢

如果编写的程序需要使用大量进程,最好先搞清楚物理内存在交换到硬盘之前能容纳多少进程,并且确保程序运行在物理内存中

如你所见,创建大量进程的速度是很快的

如果你是一名C或Java程序员,也许会不敢使用大量的进程,而且必须负责管理它们

而在Erlang里,创建进程让编程变得更简单,而不是更复杂

带超时的接收

有时候一条接收语句会因为消息迟迟不来而一直等下去

发生这种情况的原因有很多,比如程序里可能有一处逻辑错误,或者准备发送消息的进程在消息发出前就崩溃了

要避免这个问题,可以给接收语句增加一个超时设置,设定进程 等待 接收消息的最长时间 。它的语法如下:

receive 
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
after Time ->
        Expressions;
end .

如果在进入接收表达式的 Time 毫秒后还没有收到匹配的消息,进程就会停止等待消息,转而执行 Expressions

只带超时的接收

可以编写一个只有超时部分的 receive 。通过这种方法,可以定义一个 sleep(T) 函数, 它会让当前的进程挂起 T 毫秒:

sleep(T) ->
    receive
    after T ->
       true
    end.

超时值为 0 的接收

超时值为0会让 超时的主体部分 立即发生

但在这之前,系统会尝试对邮箱里的消息进行匹配

可以用它来定义一个 flush_buffer 函数,它会清空进程邮箱里的所有消息:

flush_buffer() ->
    receive
        _Any ->
            flush_buffer()
    after 0 ->
        true
    end.
注意:如果没有超时子句, flush_buffer 就会在邮箱为空时永远挂起且不返回

还可以使用零超时来实现某种形式的 优先接收 ,就像下面这样:

priority_receive() ->
    receive
        {alarm, X} ->
            {alarm, X}
    after 0 ->
        receive
            Any ->
                Any
        end
    end.
  • 如果邮箱里不存在匹配 {alarm, X} 的消息, priority_receive 就会接收邮箱里的第一个消息
    • 如果没有任何消息,它就会在最里面的接收语句处挂起,并返回它收到的第一个消息
  • 如果存在匹配 {alarm, X} 的消息,这个消息就会被立即返回
请记住,只有当邮箱里的所有条目都进行过模式匹配后,才会检查 after 部分。如果没有 after 0 语句,警告(alarm)消息就不会被首先匹配

因此对大的邮箱使用优先接收是相当低效的,所以如果打算使用这一技巧,请确保邮箱不要太满

超时值为无穷大的接收

如果接收语句里的超时值是原子 infinity (无穷大),就 永远不会触发超时

这对那些在接收语句之外计算超时值的程序可能很有用

有时候计算的结果是返回一个实际的超时值,其他的时候则是让接收语句永远等待下去

实现一个定时器

可以用接收超时来实现一个简单的定时器

函数 stimer:start(Time, Fun) 会在 Time 毫秒之后执行 Fun (一个不带参数的函数)。它返回一个句柄(是一个PID),可以在需要时用来关闭定时器:

-module(stimer).
-export([start/2, cancel/1]).

start(Time, Fun) -> spawn(fun() -> timer(Time, Fun) end).
cancel(Pid) -> Pid ! cancel.
timer(Time, Fun) ->
    receive
        cancel ->
            void
    after Time ->
            Fun()
    end.

启动一个超时器,等待的时间超过了5秒钟,定时器就会触发:

2> Pid = stimer:start(5000, fun() -> io:format("time event~n") end) . 
<0.39.0>
3> 
time event

启动一个定时器,然后在到期前关闭它:

3> Pid1 = stimer:start(50000, fun() -> io:format("time event~n") end) .  
<0.41.0>
4>  
4> stimer:cancel(Pid1) . 
cancel
超时和定时器是实现许多通信协议的关键

等待某个消息时并不想永远等下去,所以会像例子里那样增加一个超时设置

选择性接收

基本函数 receive 用来从进程邮箱里提取消息,但它所做的不仅仅是简单的模式匹配。它还会把未匹配的消息加入队列供以后处理,并管理超时。下面这个语句:

receive 
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
after Time ->
        ExpressionsTimeout;
end .

它的工作方式如下:

  1. 进入 receive 语句时会 启动 一个 定时器 (但只有当表达式包含 after 部分时才会如此)
  2. 取出 邮箱里的 第一个消息 ,尝试将它与 Pattern1Pattern2模式匹配
    • 如果匹配成功,系统就会从 邮箱移除 这个 消息 ,并 执行 模式后面的 表达式
  3. 如果 receive 语句里的 所有模式不匹配 邮箱的 第一个消息 ,系统就会从 邮箱移除 这个 消息 并把它放入一个 保存队列 ,然后继续尝试邮箱里的 第二个消息
    • 这一过程会不断重复,直到发现匹配的消息或者邮箱里的 所有消息 都被 检查 过了为止
  4. 如果邮箱里的 所有消息不匹配 , 进程 就会被 挂起重新调度 ,直到 新的消息 进入邮箱才会继续执行
    • 新消息到达后, 保存队列里的消息 不会重新匹配 ,只有 新消息 才会进行匹配
  5. 一旦 某个消息 匹配成功 , 保存队列里的所有消息 就会按照 到达进程的顺序 重新 进入 邮箱
    • 但如果 设置定时器 ,就会 清除 这些消息
  6. 如果 定时器 在我们 等待消息时 到期 了,系统就会 执行 表达式 ExpressionsTimeout ,并把 所有保存的消息 按照它们 到达进程的顺序 重新 放回 邮箱

注册进程

如果想给一个进程发送消息,就需要知道它的PID,但是当进程创建时,只有父进程才知道它的PID。系统里没有其他进程知道它的存在

这通常很不方便,因为必须把PID发送给系统里所有想要和它通信的进程

另一方面,这也很安全。如果不透露某个进程的PID,其他进程就无法以任何方式与其交互

Erlang有一种 公布 进程标识符 的方法,它让系统里的任何进程都能与该进程通信。这样的进程被称为 注册进程 ( registered process )。管理注册进程的内置函数有四个:

  • 用 AnAtom (一个原子)作为名称来注册进程 Pid

    register(AnAtom, Pid)
    
    如果 AnAtom 已被用于注册某个进程,这次注册就会失败
    
  • 移除与 AnAtom 关联的所有注册信息

    unregister(AnAtom) 
    
    注意:如果某个注册进程崩溃了,就会自动取消注册
    
  • 检查 AnAtom 是否已被注册:
    • 如果是就返回进程标识符 Pid
    • 如果没有找到与 AnAtom 关联的进程就返回原子 undefined

      whereis(AnAtom) -> Pid | undefined 
      
  • 返回一个包含系统里所有注册进程的列表

    registered() ->
         [AnAtom::atom()]
    

测试

可以用 register 来改写前面的代码示例,并尝试用创建的进程名称进行注册:

3> Pid = spawn(area_server0, loop, []) . 
<0.41.0>
4> 
4> register(area, Pid) . 
true

一旦名称注册完成,就可以像这样给它发送消息:

5> area ! {rectangle, 4, 5} .  
Area of rectangle is 20
{rectangle,4,5}

实例

可以用 register 来制作一个模拟时钟的注册进程:

-module(clock).
-export([start/2, stop/0]).

start(Time, Fun) -> 
    register(clock, spawn(fun() -> tick(Time, Fun) end)).
stop() -> clock ! stop.
tick(Time, Fun) ->
    receive
        stop ->
            void
    after Time ->
            Fun(),
            tick(Time, Fun)
    end.

这个时钟会不断滴答作响,直到你停止它

1> clock:start(5000, fun() -> io:format("Tick~p~n",[erlang:now()]) end ) .  
true
2>  
Tick{1615,369367,86685}
Tick{1615,369372,90751}
Tick{1615,369377,94390}
Tick{1615,369382,98343}
Tick{1615,369387,102537}
2> clock:stop() . 
stop

尾递归

再来看一下之前编写的面积计算服务器,它的接收循环如下:

loop() ->
    receive
        {From, {rectangle, Width, Ht}} -> 
            From ! {self(), Width * Ht},
            loop();
        {From, {circle, R}} -> 
            From !  {self(), 3.14159 * R * R},
            loop();
        {From, Other} ->
            From ! {self(), {error,Other}},
            loop()
    end.
仔细观察,就会发现每当收到消息时就会处理它并立即再次调用 loop() 

这一过程被称为 尾递归 ( tail-recursive ):对一个尾递归的函数可以进行特别编译,把语句序列里的 最后一次函数调用 替换成 跳至被调用函数的开头

这就意味着尾递归的函数无需消耗栈空间也能一直循环下去

假设编写了以下(不正确的)代码:

loop() ->
    receive
        {From, {rectangle, Width, Ht}} -> 
            From ! {self(), Width * Ht},
            loop(),
            someOtherFunc();
        {From, {circle, R}} -> 
            From !  {self(), 3.14159 * R * R},
            loop();
        %......
    end.

在第5行里调用了 loop() ,但是编译器必然推断出“当调用 loop() 后必须返回这里,因为得调用第6行里的 someOtherFunc() ”。于是它把 someOtherFunc 的地址推入栈,然后跳到 loop 的开头

这么做的问题在于 loop() 是永不返回的,它会一直循环下去

所以,每次经过第5行,就会有一个返回地址被推入控制栈,最终系统的空间会消耗殆尽

避免这个问题的方法很简单,如果编写的函数 F 是 永不返回 的(就像 loop() 一样),就要确保:

  • 在调用 F 之后 不再调用其他任何东西
  • 别把 F 用在 列表元组 构造器

模板

编写并发程序时,几乎总是从下面这样的代码起步:

-module(ctemplate).
-compile(export_all).

start() ->
    spawn(?MODULE, loop, []).

rpc(Pid, Request) ->
    Pid ! {self(), Request},
    receive
        {Pid, Response} ->
            Response
    end.

loop(X) ->
    receive
        Any ->
            io:format("Received:~p~n",[Any]),
            loop(X)
    end.
  1. 接收循环仅仅是一个空循环,它会接收并打印出任何发给它的消息
  2. 在开发程序的过程中,我会开始向一些进程发送消息
    • 因为一开始没有给接收循环添加能匹配这些消息的模式,所以接收语句底部的代码就会把它们打印出来
  3. 每到这个时候,我就会给接收循环添加一个匹配模式并重新运行程序
这一技巧在相当程度上决定了编写程序的顺序

从一个小程序开始,逐渐扩展它,并在开发过程中不断进行测试

用 MFA 或 Fun 进行 spawn

用显式的模块、函数名和参数列表(称为MFA)来创建一个进程是确保运行进程能够正确升级为新版模块代码(即使用中被再次编译)的恰当方式

动态代码升级机制不适用于fun的创建,只能用于带有显式名称的MFA上
  • 如果不关心动态代码升级,或者确定程序不会在未来进行修改,就可以使用 spawn 的spawn(Fun) 形式
  • 如果有疑问,就使用 spawn(MFA)
接下来我们将关注错误恢复,了解如何运用三个新的概念(连接、信号和捕捉进程退出)来编写容错的并发程序

Next:错误处理

Previous:并发介绍

Home:目录