Code Reading (Perl): Starman (PSGI Web Server)
Starman のコード読んでみたのでまとめ。
Starmanとは
- Plack/PSGIに対応したPerl製のWeb Server。
- Perlと言えばこの人、miyagawa さん作。
- Plack/PSGI に関してはこのへんとか。
- Apacheやnginxをリバースプロキシとして、バックエンドにStarmanを使うなどするらしい。(e.g. ゆーすけべー日記)
- サーバーのタイプは、起動ログにあるように、Net::Server::PreFork 型。Apacheと同じ。
- prefork型というのは、予め一定数の子プロセスをforkしておき、並列に処理を行うタイプのサーバー。
- min_servers, min_spare_servers, max_spare_servers, max_serversなどの設定項目があり、子プロセスの数は、設定の範囲内で、接続数などに応じて動的に変動する。
- Apacheの場合も同じような設定項目がある。
超簡単な使い方
とりあえず動かしてみるには
まずcpanmでインストールし
$ cpanm Starman
雑なPlackアプリケーションを用意する。ファイル名は app.psgi とする。
use strict; use warnings; # 渡ってきた環境変数を表示するひと my $app = sub { my $env = shift; my @output; for my $k (sort keys %$env) { push @output, "$k = " . ($env->{$k} || '') . " \n"; } [ 200, [ "Content-Type" => "text/plain" ], [@output] ]; }; $app;
starmanコマンドで実行すると
$ starman app.psgi
ちなみに普通にWebアプリを書くときは、Catalyst, Mojolicious などの Web Application Framework を使いましょう。
実装について
ここからコードについてのメモ。
まずは継承ツリーはこう
- Net::Server
- Net::Server::PreForkSimple
- Net::Server::PreFork
- Net::Server::PreForkSimple
- Net::Serverは Extensible, general Perl server engine ということでPerlでサーバーを作るための定型処理をやってくれるモジュール。
- Net::Server::PreForkSimpleとNet::Server::PreForkは prefork な処理を追加するためのモジュール達。
- Starmanの特徴は、性能について等いくつか載っているけどメインは PSGI compatible だよって事。
というわけで主役の process_request() メソッドを見ていく。
sub process_request { my $self = shift; my $conn = $self->{server}->{client};
$self->{server}->{client} にはクライアントのソケットが入ってくる。
while ( $self->{client}->{keepalive} ) { last if !$conn->connected; # Read until we see all headers last if !$self->_read_headers; my $env = { REMOTE_ADDR => $self->{server}->{peeraddr}, REMOTE_HOST => $self->{server}->{peerhost} || $self->{server}->{peeraddr}, SERVER_NAME => $self->{server}->{sockaddr}, # XXX: needs to be resolved? SERVER_PORT => $self->{server}->{sockport}, SCRIPT_NAME => '', 'psgi.version' => [ 1, 1 ], 'psgi.errors' => *STDERR, 'psgi.url_scheme' => 'http', 'psgi.nonblocking' => Plack::Util::FALSE, 'psgi.streaming' => Plack::Util::TRUE, 'psgi.run_once' => Plack::Util::FALSE, 'psgi.multithread' => Plack::Util::FALSE, 'psgi.multiprocess' => Plack::Util::TRUE, 'psgix.io' => $conn, 'psgix.input.buffered' => Plack::Util::TRUE, };
Plack アプリに渡ってくる env を作るところ。
# Parse headers my $reqlen = parse_http_request(delete $self->{client}->{headerbuf}, $env); if ( $reqlen == -1 ) { # Bad request DEBUG && warn "[$$] Bad request\n"; $self->_http_error(400, { SERVER_PROTOCOL => "HTTP/1.0" }); last; }
HTTPヘッダーのパース。XSで書いたモジュール(HTTP::Parser::XS)を使っている。
DEBUGはデバッグのとき真になる定数。DEBUG && warn "..." ていうのは見やすいから真似しよう。
# Initialize PSGI environment # Determine whether we will keep the connection open after the request my $connection = delete $env->{HTTP_CONNECTION}; my $proto = $env->{SERVER_PROTOCOL}; if ( $proto && $proto eq 'HTTP/1.0' ) { if ( $connection && $connection =~ /^keep-alive$/i ) { # Keep-alive only with explicit header in HTTP/1.0 $self->{client}->{keepalive} = 1; } else { $self->{client}->{keepalive} = 0; } }
keepalive にするかどうかの処理がいろいろ。細かいところはパス。HTTP/1.1でかつ指定がなければ真にする。
$self->_prepare_env($env); # Run PSGI apps my $res = Plack::Util::run_app($self->{app}, $env); if (ref $res eq 'CODE') { $res->(sub { $self->_finalize_response($env, $_[0]) }); } else { $self->_finalize_response($env, $res); } DEBUG && warn "[$$] Request done\n";
ここが Plack アプリを実行するところ。_prepare_env() と _finalize_response() は後で読む。
if ( $self->{client}->{keepalive} ) { # If we still have data in the input buffer it may be a pipelined request if ( $self->{client}->{inputbuf} ) { if ( $self->{client}->{inputbuf} =~ /^(?:GET|HEAD)/ ) { if ( DEBUG ) { warn "Pipelined GET/HEAD request in input buffer: " . dump( $self->{client}->{inputbuf} ) . "\n"; } # Continue processing the input buffer next; } else { # Input buffer just has junk, clear it if ( DEBUG ) { warn "Clearing junk from input buffer: " . dump( $self->{client}->{inputbuf} ) . "\n"; } $self->{client}->{inputbuf} = ''; } } DEBUG && warn "[$$] Waiting on previous connection for keep-alive request...\n"; my $sel = IO::Select->new($conn); last unless $sel->can_read($self->{options}->{keepalive_timeout}); } } DEBUG && warn "[$$] Closing connection\n"; }
最後にまた keepalive の処理。ここもパス。keepalive の仕様や処理については後でちゃんと調べよう。
次は _prepare_env()
sub _prepare_env { my($self, $env) = @_; my $get_chunk = sub { if ($self->{client}->{inputbuf}) { my $chunk = delete $self->{client}->{inputbuf}; return ($chunk, length $chunk); } my $read = sysread $self->{server}->{client}, my($chunk), CHUNKSIZE; return ($chunk, $read); };
CHUNKSIZE は 64 * 1024。クライアントソケットからの読み込み関数。
my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };
HTTP/1.1 Chunked transfer encoding かどうか。CONTENT_LENGTH を指定しないレスポンスらしい。この場合の処理が後に出てくるけど長くなるのでパス。
if (my $cl = $env->{CONTENT_LENGTH}) { my $buf = Plack::TempBuffer->new($cl); while ($cl > 0) { my($chunk, $read) = $get_chunk->(); if ( !defined $read || $read == 0 ) { die "Read error: $!\n"; } $cl -= $read; $buf->print($chunk); } $env->{'psgi.input'} = $buf->rewind; } elsif ($chunked) { # 略 } else { $env->{'psgi.input'} = $null_io; } }
- CONTENT_LENGTHがある場合、そのバイト数分読んで psgi.input に保存する。psgi.inputはPSGIによる拡張で、POSTやPUTの時にその内容が入る。IO::Handle-like object.
- CONTENT_LENGTHがない場合はHTTP/1.1 Chunked transfer encodingだけどその処理はパス。
最後 _finalize_response() 。もうちょっと。
sub _finalize_response { my($self, $env, $res) = @_;
$env は Plack アプリに渡ってくるものと同じ環境変数の HashRef。$res は Plack アプリの返した値。
my $protocol = $env->{SERVER_PROTOCOL}; my $status = $res->[0]; my $message = status_message($status); my(@headers, %headers); push @headers, "$protocol $status $message"; # Switch on Transfer-Encoding: chunked if we don't know Content-Length. my $chunked; while (my($k, $v) = splice @{$res->[1]}, 0, 2) { next if $k eq 'Connection'; push @headers, "$k: $v"; $headers{lc $k} = $v; }
@headers と %headers にレスポンスヘッダーをためていく。@headersが実際の出力用のテキストで、%headersは内部処理用にキーを小文字に合わせたもの。
if ( $protocol eq 'HTTP/1.1' ) { if ( !exists $headers{'content-length'} ) { if ( $status !~ /^1\d\d|[23]04$/ ) { DEBUG && warn "[$$] Using chunked transfer-encoding to send unknown length body\n"; push @headers, 'Transfer-Encoding: chunked'; $chunked = 1; } } elsif ( my $te = $headers{'transfer-encoding'} ) { if ( $te eq 'chunked' ) { DEBUG && warn "[$$] Chunked transfer-encoding set for response\n"; $chunked = 1; } } } else { if ( !exists $headers{'content-length'} ) { DEBUG && warn "[$$] Disabling keep-alive after sending unknown length body on $protocol\n"; $self->{client}->{keepalive} = 0; } }
transfer encoding 絡みの処理は軽く流して...
if ( ! $headers{date} ) { push @headers, "Date: " . time2str( time() ); } # Should we keep the connection open? if ( $self->{client}->{keepalive} ) { push @headers, 'Connection: keep-alive'; } else { push @headers, 'Connection: close'; } my $conn = $self->{server}->{client}; # Buffer the headers so they are sent with the first write() call # This reduces the number of TCP packets we are sending syswrite $conn, join( $CRLF, @headers, '' ) . $CRLF;
ここの最後の行でやっとHTTPヘッダーの出力。
if (defined $res->[2]) { Plack::Util::foreach($res->[2], sub { my $buffer = $_[0]; if ($chunked) { my $len = length $buffer; $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF; } syswrite $conn, $buffer; DEBUG && warn "[$$] Wrote " . length($buffer) . " bytes\n"; }); syswrite $conn, "0$CRLF$CRLF" if $chunked;
$res->[2] は Plack アプリが返す3つめの値なので、テキストの配列など。
Plack::Util::foreach は第一引数の配列の要素ごとに、第二引数のコールバックを呼ぶだけ。なので $chunked を無視すればただ全部の行をクライアントソケットに syswrite するだけ。
} else { return Plack::Util::inline_object write => sub { my $buffer = $_[0]; if ($chunked) { my $len = length $buffer; $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF; } syswrite $conn, $buffer; DEBUG && warn "[$$] Wrote " . length($buffer) . " bytes\n"; }, close => sub { syswrite $conn, "0$CRLF$CRLF" if $chunked; }; } }
そして最後がよく分からないが、Plack::Util::inline_object は、その引数を使って Plack::Util::Prototype のオブジェクトを作る。そんな使い方もあるらしい。