Ractor超入門

はじめに

この記事はRuby3で導入される並列・並行処理のための新機能Ractorに興味のある方向けの記事になります。 簡単なRactorを使ったサンプルコードを解説しつつ、理解を深めることができるように書いてみました(ほとんど未来の僕へ向けた記事になっている感じはありますが……)。

また、Ractor自体でどういったことができるのかを調べた結果をまとめた記事でもあります。 そのため後半はRactorで色々遊んだ時のコードをもとに挙動を解説しています。

環境

  • Windows10 2004
  • WSL2(Ubuntu 18.04)
  • ruby 3.0.0dev (2020-09-07T04:29:42Z master 17a27060a7) [x86_64-linux]

Ractorとは?

そもそもRactorについて知らない人もいると思いますので、簡単に紹介します。

RactorとはRuby3で導入される並列・並行処理のための新機能になります。機能自体の提案は数年前からあり、当時はGuildという名前で提案されていました。

しかし、ゲーム業界から「Guildという名前は使っているので、別の名前にしてほしい」という声があり、現在のRactorへと変わりました。

Actorモデルを参考にしているようで、そのためRactor(Ruby’s Actor)という名前に変更されたようです。

Ractorは並行実行の単位であり、それぞれが並行して実行されます。 たとえば以下のコードではputs :helloputs :helloはそれぞれ並行して実行されます。

Ractor.new do
  5.times do
    puts :hello
  end
end

5.times do
    puts :world
end

このコードを実行すると以下のような結果になります。

world
helloworld

hello
world
helloworld

helloworld

hello

このようにそれぞれの処理を並行して実行できます。

またRactorは別のRactorへとオブジェクトを送受信し、同期しながら実行することもできます。同期の方法としてはpush型pull型の二つがあります。

たとえばpush型の場合は以下のようなコードになります。

r1 = Ractor.new do
    :hoge
end

r2 = Ractor.new do
    puts :fuga, Ractor.recv
end

r2.send(r1.take)

r2.take
# => :fuga, :hoge

Ractorではsendメソッドを使い、別のRactorへとオブジェクトを送ることができます。上記のコードだと

r2.send(r1.take)

の部分でr2へと送信しています。

送信されたオブジェクトはRactor内でRactor.recvで受け取ることができ

r2 = Ractor.new do
    puts :fuga, Ractor.recv # 
end

r2.send(r1.take)で送られたオブジェクトを受け取ってputsメソッドに渡すことができます。

またRactorが実行された結果を受け取る際にtakeメソッドを使います。 ですのでr1.take:hogeを受け取っています。

つまり、r2.send(r1.take)ではr1の実行結果を受け取り、それをr2へと送信しています。 そして、r2内のputs :fuga, Ractor.recvputs :fuga, :hogeとなり、fugahogeがそれぞれ出力されるということです。

これがpush型でのオブジェクトをやり取りしている流れになります。

対してpull型は以下のようなコードになります。

r1 = Ractor.new 42 do |arg|
    Ractor.yield arg
end

r2 = Ractor.new r1 do |r1|
    r1.take
end

puts r2.take

Ractor.newで渡した引数は|arg|のようにブロック内で使用できる変数として受け取ることができます。

例えば以下のコードはr1takeメソッドが実行されるまで処理を待ちます。

r1 = Ractor.new 42 do |arg|
    Ractor.yield arg
end

またRactor.newには別のRactorを渡すことができるため以下のように書くことができます。

r2 = Ractor.new r1 do |r1|
    r1.take
end

これでr1が引数として受け取った42r2の中で受け取ることができます。

最後にputs r2.take42を受け取って出力しています。

pull型はこういった流れになります。

ざっくりと解説すると

  • push型: Ractor#send + Ractor.recv
  • pull型: Ractor.yield + Ractor#take

という感じです。

より詳細なRactorの解説に関しては下記のリンクを参照していただければと思います。

Ractorのコード

Ractorの生成

RactorRactor.newでブロックに実行したい処理を書きます。

Ractor.new do
  # このブロックが並行に実行される
end

このブロック内の処理が並行実行されます。

つまり、以下のようなコードの場合

Ractor.new do
    10.times do
        puts :hoge
    end
end

10.times do
    puts :fuga
end

:hoge:fugaがそれぞれ並行に出力されます。

またRactor.newに実行したい処理をブロックとして渡しますので以下のように書くこともできます。

Ractor.new{
    10.times{
        puts :hoge
    }
}

またキーワード引数nameを使って名前を付けることもでき、Ractor#nameで名前をを受け取ることもできます。

r = Ractor.new name: 'r1' do
    puts :hoge
end

p r.name
# => "r1"

これにより、どのRactorで処理が実行されているかを確認することもできそうです。

Ractorへ引数を渡す

Ractor.newに引数を渡すことでブロック内にオブジェクトを渡すことができます。

r = Ractor.new :hoge do |a|
    p a
end

r.take
# => :hoge

このように引数を経由してオブジェクトを渡すことができます。

また複数の引数を渡すこともできます

r = Ractor.new :hoge, :fuga do |a, b|
    p a
    p b
end

r.take
# => fuga
# => hoge

このようにArrayを渡すこともできます。

r = Ractor.new [:hoge, :fuga] do |a|
    p a.inspect
end

r.take
# => "[:hoge, :fuga]"

ちなみに、|a||a, b|に変更すると

r = Ractor.new [:hoge, :fuga] do |a, b|
    p a
    p b
end

r.take
# => :hoge
# => :fuga

という出力結果になります。これはa, b = [:hoge, :fuga]と同じ挙動と解釈されているようです。

またHashの場合は

r = Ractor.new({:hoge => 42, :fuga => 21}) do |a|
    p a
    p a[:hoge]
end

r.take
# => {:hoge=>42, :fuga=>21}
# => 42

と出力されます。 ちなみに、Ractor.newの後に()で括っていないとSyntaxErrorになるので注意が必要です。

r = Ractor.new({:hoge => 42, :fuga => 21}) do |a|
    p a
    p a[:hoge]
end

r.take
# => SyntaxError

Ractorでの返り値

Ractorでは実行されるブロック内の返り値をtakeメソッドで受け取ることができます。

r = Ractor.new do
    :hoge
end

p r.take
# => :hoge

ちなみに、ブロック内でreturnをするとLocalJumpErrorとなるようです。

r = Ractor.new do
    return :fuga
    :hoge
end

p r.take
# => LocalJumpError

Ractor内での例外

Ractor内での例外は以下のようにして受け取ることができます。

r = Ractor.new do
    raise 'error'
end

begin
    r.take
rescue Ractor::RemoteError => e
    p e.message
end

ちなみに、Ractor内でも例外処理を書くことはできます。

r = Ractor.new name: 'r1' do
    begin
        raise 'error'
    rescue => e
        p e.message
    end
end

r.take

またドキュメントによるとRactorのブロック内から返ってきた値を受け取る領域で例外をキャッチできるようです。 つまり以下のようなコードも書くことができます。

r1 = Ractor.new do
    raise 'error'
end

r2 = Ractor.new r1 do |r1|
    begin
        r1.take
    rescue Ractor::RemoteError => e
        p e.message
    end
end 

r2.take
# => "thrown by remote Ractor."

Ractorでの並行実行

簡単な例

こんな感じでRactorで並行実行を行うことができます。

Ractor.new do
    3.times do
        puts 42
    end
end

3.times do
    puts 21
end

実行すると4221という出力がそれぞればらばらに表示されます。

ちょっとした例

以下のような感じで複数のworkerRactorで生成し、それをpipeで経由して値を渡し、結果をまとめることができます。

require 'prime'

pipe = Ractor.new do
  loop do
    Ractor.yield Ractor.recv
  end
end

N = 1000
RN = 10
workers = (1..RN).map do
  Ractor.new pipe do |pipe|
    while n = pipe.take
      Ractor.yield [n, n.prime?]
    end
  end
end

(1..N).each{|i|
  pipe << i
}

pp (1..N).map{
  r, (n, b) = Ractor.select(*workers)
  [n, b]
}.sort_by{|(n, b)| n}
# => 0 ~ 999 までの数値が素数かどうかの結果を出力

このコードでは10個のworkerを生成し、pipeを経由してそれぞれのworkerにオブジェクトを渡しています。 また、受け取ったオブジェクトをRactor.yield [n, n.prime?]で返しています。

こんな感じでworkerを複数作り、pipe経由で処理させたり、結果を受け取ることができます。

よしなにworkerなどを生成して処理させるクラスを書いてみる

先ほどのコードだとworker内の処理が後々で大きくなることもありそうだったので、以下のようにworkerをよしなに生成してくれるクラスを書いてみました。

class Ninsoku
    def initialize(task, worker_count: 10)
      @task = task
      @pipe = create_pipe
      @workers = create_workers(worker_count)
    end

    def send(arg)
        @pipe.send arg
    end

    def run
        yield Ractor.select(*@workers)
    end

    def create_pipe
        Ractor.new do
            loop do
                Ractor.yield Ractor.recv
            end
        end
    end

    def create_workers(worker_count)
        (1..worker_count).map do
            Ractor.new @pipe, @task do |pipe, task|
                loop do 
                  arg = pipe.take
                  task.send arg
                  Ractor.yield task.take
                end
            end
        end
    end
end

Ninsoku.newpipeworkerを生成しています。またtaskは処理させたい内容をRactorで渡し、workerで実行させています。

実際に使うケースとしてはこんな感じです。

task = Ractor.new do
  func = lambda{|n| n.downcase }
  loop do
    Ractor.yield func.call(Ractor.recv)
  end
end

ninsoku = Ninsoku.new(task)

('A'..'Z').each{|i|
  ninsoku.send i
}

('A'..'Z').map{
    ninsoku.run{|r, n|
        puts n
    }
}
# => a ~ z までが並行して出力される

このクラスはあとでgemにでもしてみようかと思います。

gemにしてみました(rubygemsにはpushしていませんが……)

S-H-GAMELINKS/rorker

例えば、僕の住んでいる島根県浜田市AED位置情報をRactorを使って処理してみます。 なお、島根県AED位置情報は島根県が公開しているオープンデータを使用させていただきました。この場を借りて感謝を申し上げます。

島根県 オープンデータカタログサイト

require "rorker"
require "csv"

task = Ractor.new do
  func = lambda{|row| 
    row.map{|value|
      if value =~ /浜田市/
        row
      end  
    }.compact
  }
  loop do
    Ractor.yield func.call(Ractor.recv)
  end
end

rorker = Rorker.new(task)

csv = CSV.read "a.csv"

csv.each do |row|
  rorker.send row
end

n = 0

while n < csv.count
  rorker.run{|worker, result|
    if !result.empty?
      puts result
    end
  }
  n += 1
end

こんな感じで読み取ったCSVを一行づつworkerに渡して並行処理で必要なデータをとってくることもできます。

RactorでNumbered Parameterを使う

Ractorでは処理をブロックで渡しますので、Numbered Parameterを使って引数を受け取ることもできます。

r = Ractor.new :hoge do
    puts _1
end

r.take
# => hoge

ちなみに、複数の引数でも動作します。

r = Ractor.new :hoge, :hoge do
    puts _1
    puts _2
end

r.take
# => hoge
# => fuga

複数渡した場合は渡された順番通りに_1から_9に渡されるみたいです。

ちなみに、Hashを渡した場合はこんな感じになります。

r = Ractor.new ({hoge: 1, fuga: 2}) do
    _1.map do |key, value|
        p ":#{key} => #{value}"
    end
end

r.take
# => ":hoge => 1"
# => ":fuga => 2"

=>を使ったHashも同様の結果になりました

r = Ractor.new({:hoge => 1, :fuga => 2}) do
    _1.map do |key, value|
        p ":#{key} => #{value}"
    end
end

r.take
# => ":hoge => 1"
# => ":fuga => 2"

ただし、Arrayの場合は少し挙動が異なります

r = Ractor.new [1, 2, 3] do
    puts _1
    puts _1.class
    puts _2
    puts _2.class
    puts _3
    puts _3.class    
end

r.take
#=> 1
#=> Integer
#=> 2
#=> Integer
#=> 3
#=> Integer

どうやら通常通り複数の引数を渡した時のようにArrayの先頭から順番に渡されるようです。 おそらくは以下のように解釈されているのではないかと思います。

_1, _2, _3 = [1, 2, 3]

ちなみに、Numbered Parameterで受け取れる数より大きなArrayを渡すと

r = Ractor.new [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] do
    puts _1
    puts _2
    puts _3
    puts _4
    puts _5
    puts _6
    puts _7
    puts _8
    puts _9
end

r.take
#=> 1
#=> 2
#=> 3
#=> 4
#=> 5
#=> 6
#=> 7
#=> 8
#=> 9

このように取得できるのはNumbered Parameterが受け取れる範囲までのようです

Ractor内でNumbered Parameterを使う場合はHashを引数に渡した際か

r = Ractor.new ({hoge: 1, fuga: 2}) do |hash|
    hash.map do
        p ":#{_1} => #{_2}"
    end
end

r.take
":hoge => 1"
":fuga => 2"

またはいくつかの引数を渡した時に省略して書きたいときに使うことになりそうです

r = Ractor.new :hoge, :fuga do
    p _1
    p _2
end

r.take
# => :hoge
# => :fuga

おわりに

この記事を読んでRactorについて興味を持って頂ければ幸いです。 今後もRactorを使って試したコードを追加していこうと思います

参考