原创作者: lzy.je   阅读:4573次   评论:0条   更新时间:2011-06-01    

          前几天写了篇《gen_server tasting 之超简单名称服务 》东西,亲身体验了 erlang otp 的强悍威力。不过正所谓“超简单”,那个版本还是很初级的,所以这两天边继续研究边动手,开发迭代版本的名称服务。

 

在这个版本中,需要提供如下功能:

 

  1. 使用 otp 的 supervisor 监控树,保证服务可靠性。
  2. 添加日志功能,通过定制 sasl alarm_handler 来记录警告事件。
  3. 将名称服务打包为 application,暂且叫 vsns 吧,very stabilization name server 呵呵。
  4. 开放 socket 服务 (使用半阻塞的混合模式),使用 vsns://verb /param 自定义协议对外提供访问支持。

最终验证性的功能测试用例如下,主要的测试代码位于 test/0 方法中,其上的几个方法都用于 socket 通信:

 

-module(vsns_tcp_client).

-author(lzy).
-email(lzy.dev@gmail.com).
-date("2009.02.06").
-vsn(0.11).

-compile(export_all).

conn() ->
	{ok, Socket} = gen_tcp:connect("localhost", 8304,
		[binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
	Socket.

eval(Socket, Args, AssertVal) ->
	ok = gen_tcp:send(Socket, Args),
	receive
		{tcp, _, AssertVal} ->
			io:format("Ok. ~p = ~p.~n", [Args, AssertVal]);
		{tcp_closed, _} ->
			case Args of
				<<"vsns://kernel_oops">> ->
					io:format("Ok. kernel_oops = tcp_closed.~n");
				_Other ->
					io:format("Connection abort by server.~n")
			end;
		Other  ->
			io:format("Assert faild. ~p != ~p.~n", [Other, AssertVal])
	end,
	inet:setopts(Socket, [{active, once}]).

close(Socket) ->
	gen_tcp:close(Socket).

test() ->
	S = conn(),

	eval(S, <<"vsns://remove_all">>, <<"ack">>),

	eval(S, <<"vsns://save/abc/123">>, <<"">>),
	eval(S, <<"vsns://save/abc/456">>, <<"123">>),
	eval(S, <<"vsns://save/abc/789">>, <<"456">>),

	eval(S, <<"vsns://load_all">>, <<"ack">>),

	eval(S, <<"vsns://remove/abc">>, <<"789">>),
	eval(S, <<"vsns://remove/not_value">>, <<"">>),

	eval(S, <<"foo">>, <<"unknow">>),

	eval(S, <<"vsns://kernel_oops">>, <<"">>),

	ok = close(S),

	pass.

%% File end.

 

          实际实现 supervisor 监控树、日志和警告事件功能的过程,也是学习 《Erlang 程序设计》的过程。

 

          首先,为名称服务添加监控进程。erlang otp 监控树很简单,只需要实现一个 supervisor behaviour module 提供给 otp supervisor 模块就可以,前面版本的名称服务是通过 erlang shell 启动的,在以后将由这个监控进程来启动她,主要的启动代码在 init/1 方法中,监控模块代码如下:

 

-module(name_server_sup).

-author(lzy).
-email(lzy.dev@gmail.com).
-date("2009.02.04").
-vsn(0.1).

-behaviour(supervisor).

%% gen_supervisor behaviour callback functions.
-export([init/1]).

%% Interface functions.
-export([start/0, start_in_shell/0, start_link/1]).

start() ->
	spawn(fun() -> supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []) end).

start_in_shell() ->
	{ok, Pid} = supervisor:start_link({local, ?MODULE}, ?MODULE, _Arg = []),
	unlink(Pid).

start_link(Args) ->
	supervisor:start_link({local, ?MODULE}, ?MODULE, Args).

init([]) ->
	gen_event:swap_handler(alarm_handler, {alarm_handler, swap}, {vsns_alarm_handler, foo}),

	{ok, {
			{one_for_one, 3, 10},
			[{
				vsns_name_server,
				{name_server, start_link, []},
				permanent,
				1,
				worker,
				[name_server]
			}]		
	}}.

%% File end.

 

          有了这个 name_server_sup 就不怕 name_server 崩溃了,supervisor 进程会负责重新启动,对于描述监控策略的数据结构可参考 erlang doc。其中的 vsns_alarm_handler 是定制的警告事件处理模块,负责将服务中的报警记录到 erlang sasl 日志中,后期可以使用 rb 工具来查看处理。接下来就是警告日志处理模块代码:

 

-module(vsns_alarm_handler).

-author(lzy).
-email(lzy.dev@gmail.com).
-date("2009.02.04").
-vsn(0.11).

-behaviour(gen_event).

%% gen_event behaviour callback functions.
-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).

init(Args) ->
	io:format("vsns_alarm_handler init : ~p.~n", [Args]),
	{ok, Args}.

handle_event({set_alarm, {remove_all, From}}, _State) ->
	error_logger:error_msg("vsns depot clear by ~p started.~n.", [From]),
	{ok, _State};

handle_event({clear_alarm, {remove_all, From}}, _State) ->
	error_logger:error_msg("vsns depot clear by ~p done.~n.", [From]),
	{ok, _State};

handle_event(Event, State) ->
	error_logger:error_msg("unmatched event: ~p.~n", [Event, State]),
	{ok, State}.

handle_call(_Req, State) ->
	{ok, State, State}.
	
handle_info(_Info, State) ->
	{ok, State}.

terminate(_Reason, _State) ->
	ok.

code_change(_OldVsn, State, _Extra) ->
	{ok, State}.

%% File end.

 

          归根到底,就是通过 error_logger:error_msg 调用来记录日志。当然还涉及到 erlang sasl 的配置:

 

%% file name: sasl_log.config
%% auther: lzy
%% email: lzy.dev@gmail.com
%% date: 2009.02.04
%% version: 0.1

[{sasl, [
	{sasl_error_logger, false}, 
	{errlog_type, error},
	{error_logger_mf_dir, "./logs"},
	%% 10M per log file.
	{error_logger_mf_maxbytes, 1048760},
	{error_logger_mf_maxfiles, 5}
]}].

%% File end.
 

          该配置文件可以通过 erlang shell 的 启动启动参数指定。-boot start_sasl -config .\sasl_log。再接下来就是打包 vsns application,这需要一个 application 描述文件和一个 application behavior 模块,很简单具体配置参数语意可参考 erlang doc。

 

%% file name: vsns.app
%% auther: lzy
%% email: lzy.dev@gmail.com
%% date: 2009.02.05
%% version: 0.1

{
	application, vsns,
	[
		{description, "very stabilization name service."},
		{vsn, "1.0a"},
		{modules, [vsns_app, vsns_supervisor, name_server, vsns_alarm_handler]},
		{registered, [vsns_supervisor, name_server]},
		{applications, [kernel, stdlib]},
		{mod, {vsns_app, []}},
		{start_phases, []}
	]
}.

%% File end.

 
-module(vsns_app).

-author(lzy).
-email(lzy.dev@gmail.com).
-date("2009.02.05").
-vsn(0.1).

-behavior(application).

-export([start/2, stop/1]).

start(_Type, Args) ->
	name_server_sup:start_link(Args).

stop(_State) ->
	void.

%% File end.

 

          经过这样的包装,就可以通过 application:start(vsns) 调用来启动 vsns 服务。通过 appmon 工具可以看到如下进程树:

 

vsns 进程树

 

到这里,我们就可以通过 erlang 来使用 vsns 了。

 

C:\Program Files\erl5.6.4\usr\lzy_app\vsns>..\..\..\bin\erl.exe -sname vsns +P 1
02400 -smp enable +S 1 -boot start_sasl -config sasl_log
Eshell V5.6.4  (abort with ^G)
(vsns@srclzy)1> application:start(vsns).
vsns_alarm_handler init : {foo,{alarm_handler,[]}}.
name_server starting.
ok
(vsns@srclzy)2> name_server:save(abc, 123).
undefined
(vsns@srclzy)3> name_server:load_all().
[{abc,123}]

 

          最后还需要一个 socket tcp 服务器,来将 vsns 暴露出来,允许其它 client 来使用服务。otp 中没有类似的 socket server behavior,但可以通过 gen_server 来实现,当然甚至可以实现一个非 otp 相关的 socket 服务器。这里 Serge Aleynikov 实现了一个很好 tcp 服务器,基于有限状态机模式来处理请求,在此做了很好的阐述:Building a Non-blocking TCP server using OTP principles ,不过恐怕需要代理来打开连接。在他给出的代码中,我添加了几行代码,将 socket server 提供的服务是做为可配置的,通过 application 环境来配置 socket server 使用的 gen_fsm behaviour module,大约位于 tcp_server_app 模块的 15 和 27 行。

 

-module(tcp_server_app).

... ...

-define(DEF_SERVICE, tcp_echo_fsm).

... ...

start(_Type,  _Args) ->
    ListenPort = get_app_env(listen_port, ?DEF_PORT), 
    ServiceMod = get_app_env(service_mod, ?DEF_SERVICE), 
    supervisor:start_link({local, ?MODULE}, ?MODULE, [ListenPort, ServiceMod]).

... ...

 

          在 saleyn_tcp_server 中提供的是 echo 服务。为了将 saleyn_tcp_server 服务指定成 vsns,除了上面的修改外,剩下就只需要实现一个调用 vsns 的 gen_fsm behaviour module 了,代码很简单,是基于 tcp_echo_fsm 修改得来的,呵呵。

 

-module(vsns_tcp_fsm).

-author(lzy).
-email(lzy.dev@gmail.com).
-date("2009.02.06").
-vsn(0.1).
-remark("vsns_tcp_fsm used by saleyn_tcp_server appliction to support vsns socket server.").
-remark("It referenced from saleyn_tcp_server/tcp_echo_fsm module.").

-behaviour(gen_fsm).

-export([start_link/0, set_socket/2]).

%% gen_fsm callbacks
-export([init/1, handle_event/3,
         handle_sync_event/4, handle_info/3, terminate/3, code_change/4]).

%% FSM States
-export([
    'WAIT_FOR_SOCKET'/2,
    'WAIT_FOR_DATA'/2
]).

-record(state, {
                socket,    % client socket
                addr       % client address
               }).

-define(TIMEOUT, 120000).

%%%------------------------------------------------------------------------
%%% API
%%%------------------------------------------------------------------------

%%-------------------------------------------------------------------------
%% @spec (Socket) -> {ok,Pid} | ignore | {error,Error}
%% @doc To be called by the supervisor in order to start the server.
%%      If init/1 fails with Reason, the function returns {error,Reason}.
%%      If init/1 returns {stop,Reason} or ignore, the process is
%%      terminated and the function returns {error,Reason} or ignore,
%%      respectively.
%% @end
%%-------------------------------------------------------------------------
start_link() ->
    gen_fsm:start_link(?MODULE, [], []).

set_socket(Pid, Socket) when is_pid(Pid), is_port(Socket) ->
    gen_fsm:send_event(Pid, {socket_ready, Socket}).

%%%------------------------------------------------------------------------
%%% Callback functions from gen_server
%%%------------------------------------------------------------------------

%%-------------------------------------------------------------------------
%% Func: init/1
%% Returns: {ok, StateName, StateData}          |
%%          {ok, StateName, StateData, Timeout} |
%%          ignore                              |
%%          {stop, StopReason}
%% @private
%%-------------------------------------------------------------------------
init([]) ->
    process_flag(trap_exit, true),
    {ok, 'WAIT_FOR_SOCKET', #state{}}.

%%-------------------------------------------------------------------------
%% Func: StateName/2
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
'WAIT_FOR_SOCKET'({socket_ready, Socket}, State) when is_port(Socket) ->
    % Now we own the socket
    inet:setopts(Socket, [binary, {packet, 2}, {reuseaddr, true}, {active, once}]),
    {ok, {IP, _Port}} = inet:peername(Socket),
    {next_state, 'WAIT_FOR_DATA', State#state{socket=Socket, addr=IP}, ?TIMEOUT};

'WAIT_FOR_SOCKET'(Other, State) ->
    error_logger:error_msg("State: 'WAIT_FOR_SOCKET'. Unexpected message: ~p\n", [Other]),
    %% Allow to receive async messages
    {next_state, 'WAIT_FOR_SOCKET', State}.

%% Notification event coming from client
'WAIT_FOR_DATA'({data, Data}, #state{socket=S} = State) ->
    ok = handle_data(S, string:tokens(binary_to_list(Data), "/")),
    inet:setopts(S, [{active, once}]),
    {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT};

'WAIT_FOR_DATA'(timeout, State) ->
    error_logger:error_msg("~p Client connection timeout - closing.\n", [self()]),
    {stop, normal, State};

'WAIT_FOR_DATA'(Data, State) ->
    io:format("~p Ignoring data: ~p\n", [self(), Data]),
    {next_state, 'WAIT_FOR_DATA', State, ?TIMEOUT}.

%%-------------------------------------------------------------------------
%% Func: handle_event/3
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_event(Event, StateName, StateData) ->
    {stop, {StateName, undefined_event, Event}, StateData}.

%%-------------------------------------------------------------------------
%% Func: handle_sync_event/4
%% Returns: {next_state, NextStateName, NextStateData}            |
%%          {next_state, NextStateName, NextStateData, Timeout}   |
%%          {reply, Reply, NextStateName, NextStateData}          |
%%          {reply, Reply, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}                          |
%%          {stop, Reason, Reply, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_sync_event(Event, _From, StateName, StateData) ->
    {stop, {StateName, undefined_event, Event}, StateData}.

%%-------------------------------------------------------------------------
%% Func: handle_info/3
%% Returns: {next_state, NextStateName, NextStateData}          |
%%          {next_state, NextStateName, NextStateData, Timeout} |
%%          {stop, Reason, NewStateData}
%% @private
%%-------------------------------------------------------------------------
handle_info({tcp, Socket, Bin}, StateName, #state{socket=Socket} = StateData) ->
    % Flow control: enable forwarding of next TCP message
    inet:setopts(Socket, [{active, once}]),
    ?MODULE:StateName({data, Bin}, StateData);

handle_info({tcp_closed, Socket}, _StateName,
            #state{socket=Socket, addr=Addr} = StateData) ->
    error_logger:info_msg("~p Client ~p disconnected.\n", [self(), Addr]),
    {stop, normal, StateData};

handle_info(_Info, StateName, StateData) ->
    {noreply, StateName, StateData}.

%%-------------------------------------------------------------------------
%% Func: terminate/3
%% Purpose: Shutdown the fsm
%% Returns: any
%% @private
%%-------------------------------------------------------------------------
terminate(_Reason, _StateName, #state{socket=Socket}) ->
    (catch gen_tcp:close(Socket)),
    ok.

%%-------------------------------------------------------------------------
%% Func: code_change/4
%% Purpose: Convert process state when code is changed
%% Returns: {ok, NewState, NewStateData}
%% @private
%%-------------------------------------------------------------------------
code_change(_OldVsn, StateName, StateData, _Extra) ->
    {ok, StateName, StateData}.

handle_data(S, ["vsns:", "save", Key, Value]) ->
    gen_tcp:send(S, list_to_binary(swap_undefined(name_server:save(Key, Value))));

handle_data(S, ["vsns:", "load", Key]) ->
    gen_tcp:send(S, list_to_binary(swap_undefined(name_server:load(Key))));

handle_data(S, ["vsns:", "load_all"]) ->
    name_server:load_all(),
    gen_tcp:send(S, <<"ack">>);	% list_to_binary(name_server:load_all())

handle_data(S, ["vsns:", "remove", Key]) ->
    gen_tcp:send(S, list_to_binary(swap_undefined(name_server:remove(Key))));

handle_data(S, ["vsns:", "remove_all"]) ->
    name_server:remove_all(),
    gen_tcp:send(S, <<"ack">>);	% list_to_binary(name_server:remove_all())

handle_data(S, ["vsns:", "kernel_oops"]) ->
    gen_tcp:send(S, list_to_binary(name_server:kernel_oops()));

handle_data(S, _Data) ->
    gen_tcp:send(S, <<"unknow">>).

swap_undefined(undefined) ->
    "";

swap_undefined(Other) ->
    Other.

%	File end.

 

          主要的处理逻辑都在 handle_data 方法中了,在这里 string 处理不是重点,写得尽量简单。

 

截止到此,这个版本的 vsns 的功能已经都添加完了,不过好象还落下什么了?对,startup 相关代码和脚本。

 

-module(vsns_startup).

-author(lzy).
-email(lzy.dev@gmail.com).
-date("2009.02.06").
-vsn(0.1).

-export([start/0]).

start() ->
	application:set_env(saleyn_tcp_server, listen_port, 8304),
	application:set_env(saleyn_tcp_server, service_mod, vsns_tcp_fsm),
	application:start(saleyn_tcp_server),
	application:start(vsns),
	ok.

%% File end.

 

还有一个 msdos 批处理文件,这样启动服务就方便多了:

 

@echo off

C:
cd C:\Program Files\erl5.6.4\usr\lzy_app\saleyn_tcp_server
for %%c in (*.erl) do ..\..\..\bin\erlc %%c

cd C:\Program Files\erl5.6.4\usr\lzy_app\vsns
for %%c in (*.erl) do ..\..\..\bin\erlc %%c

cd C:\Program Files\erl5.6.4\usr\lzy_app

set ERL_MAX_PORTS = 102400

..\..\bin\erl.exe -sname vsns -boot start_sasl +P 102400 -smp enable +S 1 -pa .\vsns -pa .\saleyn_tcp_server -config .\vsns\sasl_log -s vsns_startup start

 

          erl 参数好多是用于 Erlang 服务应用性能优化的。通过上面的批处理脚本将 vsns(以及 saleyn_tcp_server)服务启来之后,就可以使用开始时定好的测试代码进行验证了,结果看起来还不错。

 

C:\Program Files\erl5.6.4\usr\lzy_app\vsns>..\..\..\bin\erl.exe
Eshell V5.6.4  (abort with ^G)
1> vsns_tcp_client:test().
Ok. <<"vsns://remove_all">> = <<"ack">>.
Ok. <<"vsns://save/abc/123">> = <<>>.
Ok. <<"vsns://save/abc/456">> = <<"123">>.
Ok. <<"vsns://save/abc/789">> = <<"456">>.
Ok. <<"vsns://load_all">> = <<"ack">>.
Ok. <<"vsns://remove/abc">> = <<"789">>.
Ok. <<"vsns://remove/not_value">> = <<>>.
Ok. <<"foo">> = <<"unknow">>.
Ok. kernel_oops = tcp_closed.
pass

 

单单一个 client 有什么意思,用 loadrunner 来点压力,让 vsns 活动活动筋骨。

 

loadrunner 场景

 

          使用 loadruner 模拟 50 并发不同的功能调用 vsns 用户,通过 appmon 工具可以看到 saleyn_tcp_server 处理前后产生如下进程树:

 

saleyn_tcp_server 监听时进程树

 

saleyn_tcp_server 处理50并发请求进程树

 

          再看看吞吐量和响应时间,erlang 还是相当猛,cpu、memory 资源消耗很小,要知道 erlang 和 loadrunner 可都是运行在我这台老笔记本上啊。

 

vsns 吞吐量

 

vsns 平均响应时间

 

          OK,先到这里。下周有时间打算对 vsns/erlang 做个性能测试,来验证下传说的 erlang 到底有多强,到时候把容错也加进来,看看在大并发压力下服务 oops 之后通过 supervisor restart 对吞吐量影响如何。

 

          忽然在想,是不是有必要好好看看 Apache CouchDB 源代码,呵呵。

 

vsns、saleyn_tcp_server 和本文涉及的代码、脚本参见 lzy_app.zip 附件。预祝好运。

 

// 2009.02.08 13:23 添加 ////

 

          忘了说明的是,这个版本的名称服务 name_server 模块和上个版本是有区别的,新版本不再允许指定名称数据所保存的字典进程。

 

评论 共 0 条 请登录后发表评论

发表评论

您还没有登录,请您登录后再发表评论

文章信息

  • lzy.je在2009-02-10创建
  • lzy.je在2011-06-01更新
  • 标签: otp gen_serve, vsns, saleyn_tcp_server
Global site tag (gtag.js) - Google Analytics