Hatena::Grouperlang

lnzntの Erlang 日記 このページをアンテナに追加 RSSフィード

|

2011年08月13日

いろはメモ - Erlang 編 - 「ろ」 : 並行プログラミング

| 07:26 | いろはメモ - Erlang 編 - 「ろ」 : 並行プログラミング - lnzntの Erlang 日記 を含むブックマーク はてなブックマーク - いろはメモ - Erlang 編 - 「ろ」 : 並行プログラミング - lnzntの Erlang 日記 いろはメモ - Erlang 編 - 「ろ」 : 並行プログラミング - lnzntの Erlang 日記 のブックマークコメント

いろはメモ - 初心者の書いたメモです。間違いは随時直していきます。

----

並行プログラミング

プロセス
Erlang における LWP*1、あるいはスレッドのようなもの。メッセージパッシングにより相互通信する。

並行プログラミングで使うプリミティブは以下。

@spec spawn(Fun) -> Pid
Funを評価する新しい並行プロセスを作り、そのPID*2を返す。
@spec spawn(Mod, FuncName, Args) -> Pid
上と同じ。Fun の代わりに Mod:FuncName(Args) *3を評価する。
@spec Pid ! Message -> Message
PID が Pid のプロセスに Message を非同期に送信する。Message を返す。
receive...end
メールボックスの(他プロセスから送信された)メッセージを受信する。
  receive
    Pattern1 [ when Gaurd1 ] -> Expr1;
        :
    PatternN [ when GaurdN ] -> ExprN
  after Time ->             %% after はタイムアウト時の指定。省略可。   
    Expr                    %% Time はタイムアウト値(ミリ秒単位)。
  end                       %%        0 の場合、即時。infinity の場合、無限。
クライアントサーバアーキテクチャ

次のようなモデル。

                              1:要求(request)
                          ---------------------->
 [クライアント(client)]                            [サーバ(server)]
                         <----------------------
                              2:応答(response)

                                ※1:要求、2:応答、の繰り返し
                                ※応答は無い場合もある
簡単な例

サーバクライアントが送ったリクエスト(リスト)を逆転して返信する例。

-module(server).            %% サーバ (server.erl)
-compile(export_all).

start() ->
    spawn(fun() -> loop([]) end).

loop(X) ->
    receive                   
        {Client, Request} ->  %% リクエストは {Pid, List} を期待
            io:format("Request Received:~p from: ~w~n", [Request, Client]),
                              %% lists:reverse() はリストを逆転させる BIF
            Client ! {self(), lists:reverse(Request)},   
            loop(X)           %% (末尾再帰です)
    end.
-module(client).       %% クライアント (clinet.erl)
-compile(export_all).

request(Server, Request) ->
    Server ! {self(), Request},
    receive {Server, Response} -> %% レスポンスは {Pid, Any} を期待
        Response
    after 1000 ->         %% 1000ミリ秒(=1秒)後までに期待する応答が無ければ
        throw('Time out') %% 例外を投げる
    end.

実行例。

1> c(server).
{ok,server}
2> c(client).
{ok,client}
3> Server = server:start().                %% サーバ起動
<0.47.0>
4> Response = client:request(Server, "Hello"). %% リクエスト送信
Request Received:"Hello" from: <0.35.0>    %% これはサーバの出力
"olleH"
5> Response.                               %% "Hello" が逆転されている
"olleH"

今度は、サーバの応答を返信する部分を以下のようにコメントアウト

                              :
     %      Client ! {self(), lists:reverse(Request)},
                              :

で、実行してみるとクライアントは要求送信後タイムアウトになる。

1> c(server).
{ok,server}
2> c(client).
{ok,client}
3> Server = server:start().
<0.51.0>
4> client:request(Server, "Hello").
Request Received:"Hello" from: <0.35.0>
** exception throw: 'Time out'             %% 1秒後、タイムアウト!
     in function  client:request/2
登録済みプロセス

PID に名前を付けて登録するプリミティブがある。

@spec register(AnAtom, Pid)
Pid を AnAtom という名前で登録する。その名前が既に登録済みなら失敗する。
@spec unregister(AnAtom)
AnAtom に対応する登録を削除する。名前が未登録の場合失敗する。
@spec whereis(AnAtom) -> Pid | undefined
AnAtom に対応する PID を返す。未登録の場合は undefined を返す。
@spec registered() -> [AnAtom::atom()]
システムに登録されているすべてのプロセスのリストを返す。

server.erl と client.erl を修正してプロセスの登録に対応する。

-module(server).
-compile(export_all).

start(Name) ->                      %% 追加
    register(Name, start()).        %% 追加

start() ->
    spawn(fun() -> loop([]) end).

loop(X) ->
    receive
        {Client, Request} ->
            io:format("Request Received:~p from: ~w~n", [Request, Client]),
            Client ! {self(), lists:reverse(Request)},
            loop(X)
    end.
-module(client).
-compile(export_all).

request(Server, Request) ->
    Server ! {self(), Request},
    receive {Server, Response} ->
        Response
    after 1000 ->
        throw('Time out')
    end.

request_by_name(Name, Request) ->     %% 追加
    request(whereis(Name), Request).  %% 追加

実行。

1> c(server).
{ok,server}
2> c(client).
{ok,client}
3> server:start(myserver).   %% myserver という名前でサーバを登録、起動
true
4> client:request_by_name(myserver, "hello").  %% myserver へリクエスト
Request Received:"hello" from: <0.35.0>
"olleh"

リンクとエラー処理

リンク
プロセス間のエラー伝播経路の定義。リンクはBIF のコールにより明示的に作られる
リンクセット
あるプロセスに現在リンクされているプロセスの集合。
終了シグナル
プロセスの終了時にリンクセットのプロセスブロードキャストされる。
システムプロセス
trap_exit を true にしたプロセス。終了シグナル受信時の動作が異なる(下表を参照)
プロセスの種類終了シグナルシグナル受信時の動作
一般のプロセスnormal終了シグナルは無視
Why死ぬ。終了シグナル Why をリンクセットにブロードキャスト
killed死ぬ。終了シグナル killed をリンクセットにブロードキャスト
kill死ぬ。終了シグナル killed をリンクセットにブロードキャスト
システムプロセスnormal{'EXIT', Pid, normal} をメールボックスに追加
Why{'EXIT', Pid, Why} をメールボックスに追加
killed{'EXIT', Pid, killed} をメールボックスに追加
kill死ぬ。終了シグナル killed をリンクセットにブロードキャスト
 (一般プロセス同士のリンク)
      +--------------+                  +--------------+    
      |  プロセス A  |-----(リンク)-----|  プロセス B  |
      +--------------+                  +--------------+
        1:正常終了
        2:終了シグナル送信 ----------->  3:プロセスはそのまま
          (normal)

      +--------------+                  +--------------+    
      |  プロセス A  |-----(リンク)-----|  プロセス B  |
      +--------------+                  +--------------+
        1:異常終了
        2:終了シグナル送信 ----------->  3:このプロセスも異常終了
          (normal以外)                   4:終了シグナル送信 -------> (リンクセットのプロセス)                                   


  ---------------------------- 
 (一般プロセスとシステムプロセスのリンク)

      +--------------+                  *==============*    
      |  プロセス A  |-----(リンク)-----|  プロセス B  | (システムプロセス)
      +--------------+                  *==============*
        1:正常or異常終了
        2:終了シグナル送信 ----------->  3:終了シグナル捕捉 (パターンは、{'EXIT', Pid, Why})        
          (Why)
                                         ※ただし、終了シグナル kill は捕捉できない

テスト用ソース(linktest.erl)。システムプロセスのひ孫プロセスを異常終了させてみる。

-module(linktest).
-compile(export_all).

sleep(MilliSeconds) ->
    receive
    after MilliSeconds -> true
    end.

start() ->
    spawn(fun() ->
            process_flag(trap_exit, true),  %% この BIF コールでシステムプロセスになる

            Child = spawn_link(fun() ->     %% spawn_link() でリンクしたプロセスを spawn
                GrandChild = spawn_link(fun() ->
                    GrandGrandChild = spawn_link(fun() ->
                        sleep(1000),
                        1/0                %% ゼロ除算エラー !! (badarith)
                        end),
                    io:format("grandgrandchild: ~w~n", [GrandGrandChild]),
                    sleep(3600000)
                    end),
                io:format("grandchild: ~w~n", [GrandChild]),
                sleep(3600000)
                end),
            io:format("child: ~w~n", [Child]),
            loop([])
        end).

loop(X) ->
    receive
        {'EXIT', Pid, Why} ->      %% 終了シグナルを捕捉
            io:format("signal from [~w]: ~p~n", [Pid, Why]),
            loop(X)
    end.

実行結果。

1> c(linktest).   %% 「1/0」の部分の Warning は無視
./linktest.erl:22: Warning: this expression will fail with a 'badarith' exception
{ok,linktest}
2> linktest:start().
child: <0.43.0>
grandchild: <0.44.0>
grandgrandchild: <0.45.0>
<0.42.0>
   
=ERROR REPORT==== 13-Aug-2011::08:02:13 ===
Error in process <0.45.0> with exit value: {badarith,[{linktest,'-start/0-fun-0-',0}]}

signal from [<0.43.0>]: {badarith,[{linktest,'-start/0-fun-0-',0}]} %% 捕捉した「子」からのシグナル
(システムプロセス)    (子)            (孫)           (ひ孫)
  *==========*    +----------+    +----------+    +----------+
  | <0.42.0> | -> | <0.43.0> | -> | <0.44.0> | -> | <0.45.0> | 
  *==========*    +----------+    +----------+    +----------+            
                                                    1:エラー!!(異常終了)
                                                    2:終了シグナル送信(badarith)
                                                      |
                                   3:異常終了 <-------+
                                   4:終了シグナル送信(badarith)
                                     |
                  5:異常終了 <-------+
                  6:終了シグナル送信(badarith)
                    |
 7:シグナル捕捉 <---+
kill

linktest.erl の「ひ孫」プロセスを、自身を kill するように、変更してテストしてみる。

(変更前)
                       1/0                %% ゼロ除算エラー !! (badarith)
(変更後)
                       exit(kill)         %% 自身に kill を送信
1> c(linktest).
{ok,linktest}
2> li
linktest    lists       
2> linktest:start().
child: <0.43.0>
grandchild: <0.44.0>
grandgrandchild: <0.45.0>
<0.42.0>
signal from [<0.43.0>]: killed  %% 「子」からの killed を捕捉
モニタ

リンクでなくモニタというエラー伝播経路の定義もある。

モニタは erlang:monitor() で設定する(らしい)。

[リンク] 対称の関係
  +-----------+               +-----------+
  | プロセスA |---(リンク)--- | プロセスB | 
  +-----------+               +-----------+
  case 1)
    終了すると...
    終了シグナル送信   --->   終了シグナル受信

  case 2)
                              終了すると...
    終了シグナル受信  <---    終了シグナル送信

ーーーーーーーーーーーーーーーーーーーーーー
[モニタ] 非対称の関係
  +-----------+               +-----------+
  | プロセスA |<<==(モニタ)== | プロセスB | 
  +-----------+               +-----------+
  case 1)
    終了すると...
    終了シグナル送信   --->   終了シグナル受信

  case 2)
                              終了しても...
                              終了シグナルは送信されない!
リンクに関するプリミティブ

spawn_link(), link(), unlink()、process_flag(), exit(), erlang:monitor() など。

以下が参考になる。

分散 Erlang

Erlang クラスタ

Erlang クラスタで分散 Erlang が構築できる。LAN 上の HPC クラスタなどに向く(ようである)。

 +-------- Erlang クラスタ (マジッククッキーで一意識別) ---------+
 |                                                               |
 |     (node011@host01)            (node012@host01)              |
 |     +----------------+   rpc   +----------------+             |
 |     | Erlang ノード  |<------->| Erlang ノード  |             |
 |     +----------------+         +----------------+             |
 |            A                          A                       |
 |        rpc |                          |                       |
 |            V                          |                       |
 |     (node021@host02)                  |                       |
 |     +----------------+                |                       |
 |     | Erlang ノード  |<---------------+                       |
 |     +----------------+               rpc                      |
 |                                                               |
 +---------------------------------------------------------------+
Erlangクラスタ
分散Erlangを実現するErlangノードの集合。マジッククッキーで一意識別される。
Erlangノード
Erlangプロセスに対する仮想的なホスト。Erlangクラスタを構成する要素。各ノードネットワーク上に繋がれた別のコンピュータにあってもよい。
  Erlang ノードの名前  =   識別名 @ FQDN   (FQDN は名前解決可能であること)
注意事項

ノード間の通信を実現するのに epmd*4というデーモンが走る。epmd は以下のポートを使用する*5

トランスポートプロトコルポート
TCP4369

ノードネットワークを越えて通信する場合、このポートを(ファイアウォールなどで)ブロックしてはいけない。

また、分散 Erlang で使用するポートがあれば、それらのポートもブロックしてはいけない。

マジッククッキーは、デフォルトで以下のファイルの内容が使用される。

 ~/.erlang.cookie

また、各ノードリソースは同じものに揃えておく必要がある。

ノードの起動方法

Erlang ノードの一般的な起動方法は以下である。(-name 以外は省略可能)

 $ erl -name ノードの名前 -setcookie マジッククッキー -kernel inet_dist_listen_min 使用するポートの最小値 \
                                                              inet_dist_listen_max 使用するポートの最大値

ノードを同一のホストOS上で走らせる場合は、以下の方法もある。-name の代わりに -sname を使う。-sname に与えるのは FQDN でなくともよい。

 $ erl -sname ノードの名前(例えば、node01 とか。ノード名は node01@`hostname` と解釈される)
試す

上で作った server モジュールclient モジュールErlang クラスタで動かしてみる。(clientコメントアウトしたところは元に戻しておく)

最初は、同一ホストOS 上でのやり方を試す。

私の環境では hostname コマンドは mylinux01 を返す。確認してみる。

$ hostname
mylinux01     # (当然ながら、この名前は環境毎に何でもよい)

では、node01@mylinux01 を起動して myserver という名前でサーバを起動する。

$ erl -sname node01     %% -sname 以外は、先程と同じ手順
Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.4  (abort with ^G)
(node01@mylinux01)1> c(server).    %% プロンプトにノード名(node01@mylinux01)が表示されている
{ok,server}
(node01@mylinux01)2> c(client).
{ok,client}
(node01@mylinux01)3> server:start(myserver).
true

TTY で node02@mylinux01 を起動して、BIF の rpc:call/4 で client:request_by_name/2 を遠隔呼び出ししてみる。

$ erl -sname node02
Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.4  (abort with ^G)
(node02@mylinux01)1> rpc:call(node01@mylinux01, client, request_by_name, [myserver, "Hello"]).
"olleH"

次に、別ホストOS でのやり方を試。。したいが、環境がないので FQDNノードを起動するだけに留める。

私の環境では、NICIPアドレスに 'mylinux01.localdomain' という名前が /etc/hosts で設定してある。(nsswitch は files 優先)

(/etc/hosts の一部抜粋)
192.168.1.101 mylinux01.localdomain

まず、準備。

  • (厳密には FQDN でない?けれど、)FQDN として mylinux01.localdomain を使用すれば問題ない
  • TCP:4369 は(実際にはブロックしてあるが、ネットワーク通信しないので)問題ない
  • マジッククッキーは(ここでは省略しても問題ないはずだが、) ここでは、"abc" にする

では、別々の TTY で server の起動と client の遠隔呼び出しを行なう。-name には「識別子 @ FQDN」を使用する。

$ erl -name node01@mylinux01.localdomain -setcookie abc  %% -name を -setcookie を指定
Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.4  (abort with ^G)
(node01@mylinux01.localdomain)1> c(server).
{ok,server}
(node01@mylinux01.localdomain)2> c(client).
{ok,client}
(node01@mylinux01.localdomain)3> server:start(myserver).
true
$ erl -name node02@mylinux01.localdomain -setcookie abc %% -name を -setcookie を指定
Erlang R13B03 (erts-5.7.4) [source] [64-bit] [smp:2:2] [rq:2] [async-threads:0] [hipe] [kernel-poll:false]

Eshell V5.7.4  (abort with ^G)
(node02@mylinux01.localdomain)1> rpc:call(node01@mylinux01.localdomain, client, request_by_name, [myserver, "Hello"]).
"olleH"

稼動時の注意

epmd は一度ノードを起動すると、そのまま常駐する(ようである)。

ネットワーク設定や、(以前に間違えて指定した)ノード名なども記憶しているようであり、その後に設定や指定を変更しても、うまくいかない場合がある。

そんな時は(設定を再読み込みする方法とかあるもしれないが、)epmd を kill するのが手っ取り早い。

$ killall epmd

epmd が起動しているかどうかは psnetstat で確認できる。

$ ps aux | grep [e]pmd
                :
... (略) ... /usr/lib/erlang/erts-5.7.4/bin/epmd -daemon
netstat -tuanp | grep [e]pmd
      :
tcp     0      0 0.0.0.0:4369     0.0.0.0:*          LISTEN      2915/epmd       
tcp     0      0 127.0.0.1:4369   127.0.0.1:51477    ESTABLISHED 2915/epmd       
tcp     0      0 127.0.0.1:4369   127.0.0.1:36594    ESTABLISHED 2915/epmd   

----

参考書籍

プログラミングErlang

プログラミングErlang

オーム社のページ(サンプルソースダウンロードなど) : Ohmsha | 商品一覧

*1:Light Weight Process

*2プロセス識別子

*3:MFA と呼ばれる。

*4Erlang Port Mapper Daemon

*5:*参考書籍には UDP も、と書いてあるが何か使用している雰囲気がない。。。

|