Code Reading (Perl): Starman (PSGI Web Server)

Starman のコード読んでみたのでまとめ。

Starmanとは

超簡単な使い方

とりあえず動かしてみるには
まずcpanmでインストールし

$ cpanm Starman

雑なPlackアプリケーションを用意する。ファイル名は app.psgi とする。

use strict;
use warnings;

# 渡ってきた環境変数を表示するひと
my $app = sub {
    my $env = shift;
    my @output;
    for my $k (sort keys %$env) {
        push @output, "$k = " . ($env->{$k} || '') . " \n";
    }
    [ 200, [ "Content-Type" => "text/plain" ], [@output] ];
};

$app;

starmanコマンドで実行すると

$ starman app.psgi

うごいた!

ちなみに普通にWebアプリを書くときは、Catalyst, Mojolicious などの Web Application Framework を使いましょう。

実装について

ここからコードについてのメモ。

まずは継承ツリーはこう

  • Net::Server
    • Net::Server::PreForkSimple
  • Net::Serverは Extensible, general Perl server engine ということでPerlでサーバーを作るための定型処理をやってくれるモジュール。
  • Net::Server::PreForkSimpleNet::Server::PreForkは prefork な処理を追加するためのモジュール達。
  • Starmanの特徴は、性能について等いくつか載っているけどメインは PSGI compatible だよって事。

というわけで主役の process_request() メソッドを見ていく。

sub process_request {
    my $self = shift;
    my $conn = $self->{server}->{client};

$self->{server}->{client} にはクライアントのソケットが入ってくる。

    while ( $self->{client}->{keepalive} ) {
        last if !$conn->connected;

        # Read until we see all headers
        last if !$self->_read_headers;

        my $env = {
            REMOTE_ADDR     => $self->{server}->{peeraddr},
            REMOTE_HOST     => $self->{server}->{peerhost} || $self->{server}->{peeraddr},
            SERVER_NAME     => $self->{server}->{sockaddr}, # XXX: needs to be resolved?
            SERVER_PORT     => $self->{server}->{sockport},
            SCRIPT_NAME     => '',
            'psgi.version'      => [ 1, 1 ],
            'psgi.errors'       => *STDERR,
            'psgi.url_scheme'   => 'http',
            'psgi.nonblocking'  => Plack::Util::FALSE,
            'psgi.streaming'    => Plack::Util::TRUE,
            'psgi.run_once'     => Plack::Util::FALSE,
            'psgi.multithread'  => Plack::Util::FALSE,
            'psgi.multiprocess' => Plack::Util::TRUE,
            'psgix.io'          => $conn,
            'psgix.input.buffered' => Plack::Util::TRUE,
        };

Plack アプリに渡ってくる env を作るところ。

        # Parse headers
        my $reqlen = parse_http_request(delete $self->{client}->{headerbuf}, $env);
        if ( $reqlen == -1 ) {
            # Bad request
            DEBUG && warn "[$$] Bad request\n";
            $self->_http_error(400, { SERVER_PROTOCOL => "HTTP/1.0" });
            last;
        }

HTTPヘッダーのパース。XSで書いたモジュール(HTTP::Parser::XS)を使っている。
DEBUGはデバッグのとき真になる定数。DEBUG && warn "..." ていうのは見やすいから真似しよう。

        # Initialize PSGI environment
        # Determine whether we will keep the connection open after the request
        my $connection = delete $env->{HTTP_CONNECTION};
        my $proto = $env->{SERVER_PROTOCOL};
        if ( $proto && $proto eq 'HTTP/1.0' ) {
            if ( $connection && $connection =~ /^keep-alive$/i ) {
                # Keep-alive only with explicit header in HTTP/1.0
                $self->{client}->{keepalive} = 1;
            }
            else {
                $self->{client}->{keepalive} = 0;
            }
        }

keepalive にするかどうかの処理がいろいろ。細かいところはパス。HTTP/1.1でかつ指定がなければ真にする。

        $self->_prepare_env($env);

        # Run PSGI apps
        my $res = Plack::Util::run_app($self->{app}, $env);

        if (ref $res eq 'CODE') {
            $res->(sub { $self->_finalize_response($env, $_[0]) });
        } else {
            $self->_finalize_response($env, $res);
        }

        DEBUG && warn "[$$] Request done\n";

ここが Plack アプリを実行するところ。_prepare_env() と _finalize_response() は後で読む。

        if ( $self->{client}->{keepalive} ) {
            # If we still have data in the input buffer it may be a pipelined request
            if ( $self->{client}->{inputbuf} ) {
                if ( $self->{client}->{inputbuf} =~ /^(?:GET|HEAD)/ ) {
                    if ( DEBUG ) {
                        warn "Pipelined GET/HEAD request in input buffer: " 
                            . dump( $self->{client}->{inputbuf} ) . "\n";
                    }

                    # Continue processing the input buffer
                    next;
                }
                else {
                    # Input buffer just has junk, clear it
                    if ( DEBUG ) {
                        warn "Clearing junk from input buffer: "
                            . dump( $self->{client}->{inputbuf} ) . "\n";
                    }

                    $self->{client}->{inputbuf} = '';
                }
            }

            DEBUG && warn "[$$] Waiting on previous connection for keep-alive request...\n";

            my $sel = IO::Select->new($conn);
            last unless $sel->can_read($self->{options}->{keepalive_timeout});
        }
    }

    DEBUG && warn "[$$] Closing connection\n";
}

最後にまた keepalive の処理。ここもパス。keepalive の仕様や処理については後でちゃんと調べよう。

次は _prepare_env()

sub _prepare_env {
    my($self, $env) = @_;

    my $get_chunk = sub {
        if ($self->{client}->{inputbuf}) {
            my $chunk = delete $self->{client}->{inputbuf};
            return ($chunk, length $chunk);
        }
        my $read = sysread $self->{server}->{client}, my($chunk), CHUNKSIZE;
        return ($chunk, $read);
    };

CHUNKSIZE は 64 * 1024。クライアントソケットからの読み込み関数。

    my $chunked = do { no warnings; lc delete $env->{HTTP_TRANSFER_ENCODING} eq 'chunked' };

HTTP/1.1 Chunked transfer encoding かどうか。CONTENT_LENGTH を指定しないレスポンスらしい。この場合の処理が後に出てくるけど長くなるのでパス。

    if (my $cl = $env->{CONTENT_LENGTH}) {
        my $buf = Plack::TempBuffer->new($cl);
        while ($cl > 0) {
            my($chunk, $read) = $get_chunk->();

            if ( !defined $read || $read == 0 ) {
                die "Read error: $!\n";
            }

            $cl -= $read;
            $buf->print($chunk);
        }
        $env->{'psgi.input'} = $buf->rewind;
    } elsif ($chunked) {

# 略

    } else {
        $env->{'psgi.input'} = $null_io;
    }
}
  1. CONTENT_LENGTHがある場合、そのバイト数分読んで psgi.input に保存する。psgi.inputはPSGIによる拡張で、POSTやPUTの時にその内容が入る。IO::Handle-like object.
  2. CONTENT_LENGTHがない場合はHTTP/1.1 Chunked transfer encodingだけどその処理はパス。


最後 _finalize_response() 。もうちょっと。

sub _finalize_response {
    my($self, $env, $res) = @_;

$env は Plack アプリに渡ってくるものと同じ環境変数の HashRef。$res は Plack アプリの返した値。

    my $protocol = $env->{SERVER_PROTOCOL};
    my $status   = $res->[0];
    my $message  = status_message($status);

    my(@headers, %headers);
    push @headers, "$protocol $status $message";

    # Switch on Transfer-Encoding: chunked if we don't know Content-Length.
    my $chunked;
    while (my($k, $v) = splice @{$res->[1]}, 0, 2) {
        next if $k eq 'Connection';
        push @headers, "$k: $v";
        $headers{lc $k} = $v;
    }

@headers と %headers にレスポンスヘッダーをためていく。@headersが実際の出力用のテキストで、%headersは内部処理用にキーを小文字に合わせたもの。

    if ( $protocol eq 'HTTP/1.1' ) {
        if ( !exists $headers{'content-length'} ) {
            if ( $status !~ /^1\d\d|[23]04$/ ) {
                DEBUG && warn "[$$] Using chunked transfer-encoding to send unknown length body\n";
                push @headers, 'Transfer-Encoding: chunked';
                $chunked = 1;
            }
        }
        elsif ( my $te = $headers{'transfer-encoding'} ) {
            if ( $te eq 'chunked' ) {
                DEBUG && warn "[$$] Chunked transfer-encoding set for response\n";
                $chunked = 1;
            }
        }
    } else {
        if ( !exists $headers{'content-length'} ) {
            DEBUG && warn "[$$] Disabling keep-alive after sending unknown length body on $protocol\n";
            $self->{client}->{keepalive} = 0;
        }
    }

transfer encoding 絡みの処理は軽く流して...

    if ( ! $headers{date} ) {
        push @headers, "Date: " . time2str( time() );
    }

    # Should we keep the connection open?
    if ( $self->{client}->{keepalive} ) {
        push @headers, 'Connection: keep-alive';
    } else {
        push @headers, 'Connection: close';
    }

    my $conn = $self->{server}->{client};

    # Buffer the headers so they are sent with the first write() call
    # This reduces the number of TCP packets we are sending
    syswrite $conn, join( $CRLF, @headers, '' ) . $CRLF;

ここの最後の行でやっとHTTPヘッダーの出力。

    if (defined $res->[2]) {
        Plack::Util::foreach($res->[2], sub {
            my $buffer = $_[0];
            if ($chunked) {
                my $len = length $buffer;
                $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF;
            }
            syswrite $conn, $buffer;
            DEBUG && warn "[$$] Wrote " . length($buffer) . " bytes\n";
        });

        syswrite $conn, "0$CRLF$CRLF" if $chunked;

$res->[2] は Plack アプリが返す3つめの値なので、テキストの配列など。
Plack::Util::foreach は第一引数の配列の要素ごとに、第二引数のコールバックを呼ぶだけ。なので $chunked を無視すればただ全部の行をクライアントソケットに syswrite するだけ。

    } else {
        return Plack::Util::inline_object
            write => sub {
                my $buffer = $_[0];
                if ($chunked) {
                    my $len = length $buffer;
                    $buffer = sprintf( "%x", $len ) . $CRLF . $buffer . $CRLF;
                }
                syswrite $conn, $buffer;
                DEBUG && warn "[$$] Wrote " . length($buffer) . " bytes\n";
            },
            close => sub {
                syswrite $conn, "0$CRLF$CRLF" if $chunked;
            };
    }
}

そして最後がよく分からないが、Plack::Util::inline_object は、その引数を使って Plack::Util::Prototype のオブジェクトを作る。そんな使い方もあるらしい。

まとめ

  • StarmanPerl製の Web server。PSGI 対応で Prefork型。
  • 実は Prefork な処理は親クラス達がやってくれる。
  • StarmanPSGI なアプリの処理をしている。その処理自体は意外と簡単。HTTP/1.1 の処理が結構いろいろ。

変なところあったら突っ込みお願いします。

Perlのprintfは引数を好きな順番で扱うことが出来る

何これ便利

1$ とか 2$ とかで文字列に続く引数の1番目、2番目を指定できる。perldoc -f sprintf より:

printf '%2$d %1$d', 12, 34;      # prints "34 12"

1個の引数を2回使うこともできる。CPAN本にあった例(P.155):

use Text::Xslate qw(html_escape);
# ...
sprintf '<a href="%1$s">%1$s</a>', html_escape($original);

ちなみに本ではこの例の主役は Xslate ではなく、もちろん sprintf でもなく、 URI::Find なわけだけど。どこまで載っけていいのかわからないから省略してある。


Perl CPANモジュールガイド

Perl CPANモジュールガイド

今日覚えたPerlの一行野郎

  • インストールされてるCPANモジュールの一覧を出す
% perl -MExtUtils::Installed -e 'print "$_\n" for ExtUtils::Installed->new->modules' > modules.txt

cpanmに食わせてインストールできる

% cpanm < modules.txt
% perl -MEncode=encode_utf8 -E 'say encode_utf8("\x{3042}\x{3044}\x{3046}\x{3048}\x{304a}")'
あいうえお

ただのメモ: Web Application Framework とか PSGI 関連とかのモジュール

たくさんあって楽しすぎるからメモっといて後々調べていく。

あ。。。

書いてる途中でPSGIのページにほとんど載ってる事に気づいたが気にしてはいけない。

ここから取った: Tatsuhiko Miyagawa - The Tale of Plack 1/2 - YouTube

ただのメモ: Perl5開発環境構築

% curl -LO http://xrl.us/perlbrew
% perl perlbrew install
% rm perlbrew
% ~/perl5/perlbrew/bin/perlbrew init

(add this in .zshrc: 'source ~/perl5/perlbrew/etc/bashrc')
% perlbrew install perl-5.12.2
% perlbrew switch perl-5.12.2

(installed at: ~/perl5/perlbrew/perls/perl-5.12.2/bin/perl)

% cpanm install
% curl -L http://cpanmin.us | perl - App::cpanminus
% cpanm App::cpanoutdated
% cpanm App::pmuninstall

MySQL INDEX 実験してみた

ソーシャルゲーム開発者なら知っておきたい MySQL INDEX + EXPLAIN入門 | 株式会社インフィニットループ技術ブログを読んで自分で実験してみたくなって書いた。コードぐっちゃぐちゃだけど飽きてきたから忘れないうちに貼っちゃう。

ダミーデータ生成とか、結果をCSVファイルに吐き出すとか、ただそれだけ。

それっぽいデータが取れたので満足。

USE test;

CREATE TABLE IF NOT EXISTS test_records (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  col_1 int(11) NOT NULL,
  col_2 varchar(20) COLLATE utf8_unicode_ci NOT NULL,
  col_3 int(11) NOT NULL,
  col_4 int(11) NOT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

CREATE TABLE IF NOT EXISTS test_indexed_records (
  id bigint(20) unsigned NOT NULL AUTO_INCREMENT,
  col_1 int(11) NOT NULL,
  col_2 varchar(20) COLLATE utf8_unicode_ci NOT NULL,
  col_3 int(11) NOT NULL,
  col_4 int(11) NOT NULL,
  PRIMARY KEY (id)
) ENGINE=InnoDB  DEFAULT CHARSET=utf8 COLLATE=utf8_unicode_ci;

CREATE INDEX col_1_index ON test_indexed_records(col_1);
CREATE INDEX col_2_index ON test_indexed_records(col_2);

なぜかRakefile

# -*- coding: utf-8 -*-

require 'pp'
require 'mysql2'
require 'active_record'
require 'benchmark'

ActiveRecord::Base.configurations = {
  'development' => {
    :adapter => 'mysql2',
    :database => 'test'
  }
}
ActiveRecord::Base.establish_connection('development')
class TestRecords < ActiveRecord::Base; end
class TestIndexedRecords < ActiveRecord::Base; end

TABLES = [TestRecords, TestIndexedRecords]

namespace :db do
  desc "create table"
  task :create do
  end

  def db_seed(count = 1000, data_max = 3000)
    count.times do
      TABLES.each do |t|
        t.create(:col_1 => rand(data_max),
                 :col_2 => rand(data_max),
                 :col_3 => rand(data_max),
                 :col_4 => rand(data_max))
      end
    end
  end

  desc "insert seed data"
  task :seed do
    count = (ENV["COUNT"] || 1000).to_i
    puts "inserting #{count} records"
    db_seed(count)
  end


  def db_count(table)
    table.count
  end
  desc "show record count"
  task :count do
    puts db_count(TestRecords)
    puts db_count(TestIndexedRecords)
  end

  desc "clean up data"
  task :clean do
    TestRecords.delete_all
    TestIndexedRecords.delete_all
  end
end

namespace :bench do
  desc "set up and run benchmarks"
  task :all => "db:clean" do
    upto = (ENV["upto"] || 10).to_i
    by   = (ENV["by"]   || upto / 10).to_i

    #upto, by = 100_000, 10_000
    #upto, by = 10_000, 2_000
    #upto, by = 10, 2

    result = {}
    Benchmark.bm do |x|
      by.step(upto, by) do |n|
        db_seed(by)

        c = TestRecords.count
        count = c.to_s.rjust(upto.to_s.length)
        t = x.report("count(#{count}) w/o index:") do
          sql = "select * from test_records where col_1 = 2000"
          10.times { TestRecords.find_by_sql sql }
        end
        result[c] ||= {}
        result[c][:unindexed] = t

        c = TestIndexedRecords.count
        count = c.to_s.rjust(upto.to_s.length)
        t = x.report("count(#{count}) w/  index:") do
          sql = "select * from test_indexed_records where col_1 = 2000"
          10.times { TestIndexedRecords.find_by_sql sql }
        end
        result[c] ||= {}
        result[c][:indexed] = t
      end
    end

    def output(filename, results)
      File.open(filename, 'w') do |f|
        f.puts "count,unindexed,indexed"
        results.each do |k, rs|
          r1 = rs[:unindexed]
          r2 = rs[:indexed]
          f.puts "#{k},#{r1.real},#{r2.real}"
        end
      end
    end

    output("result.csv", result)
  end
end