Ractor超入門
はじめに
この記事はRuby3で導入される並列・並行処理のための新機能Ractorに興味のある方向けの記事になります。
簡単なRactorを使ったサンプルコードを解説しつつ、理解を深めることができるように書いてみました(ほとんど未来の僕へ向けた記事になっている感じはありますが……)。
また、Ractor自体でどういったことができるのかを調べた結果をまとめた記事でもあります。
そのため後半はRactorで色々遊んだ時のコードをもとに挙動を解説しています。
環境
- Windows10 2004
- WSL2(Ubuntu 18.04)
- ruby 3.0.0dev (2020-09-07T04:29:42Z master 17a27060a7) [x86_64-linux]
Ractorとは?
そもそもRactorについて知らない人もいると思いますので、簡単に紹介します。
RactorとはRuby3で導入される並列・並行処理のための新機能になります。機能自体の提案は数年前からあり、当時はGuildという名前で提案されていました。
しかし、ゲーム業界から「Guildという名前は使っているので、別の名前にしてほしい」という声があり、現在のRactorへと変わりました。
Actorモデルを参考にしているようで、そのためRactor(Ruby’s Actor)という名前に変更されたようです。
Ractorは並行実行の単位であり、それぞれが並行して実行されます。
たとえば以下のコードではputs :helloとputs :helloはそれぞれ並行して実行されます。
Ractor.new do 5.times do puts :hello end end 5.times do puts :world end
このコードを実行すると以下のような結果になります。
world helloworld hello world helloworld helloworld hello
このようにそれぞれの処理を並行して実行できます。
またRactorは別のRactorへとオブジェクトを送受信し、同期しながら実行することもできます。同期の方法としてはpush型とpull型の二つがあります。
たとえばpush型の場合は以下のようなコードになります。
r1 = Ractor.new do :hoge end r2 = Ractor.new do puts :fuga, Ractor.recv end r2.send(r1.take) r2.take # => :fuga, :hoge
Ractorではsendメソッドを使い、別のRactorへとオブジェクトを送ることができます。上記のコードだと
r2.send(r1.take)
の部分でr2へと送信しています。
送信されたオブジェクトはRactor内でRactor.recvで受け取ることができ
r2 = Ractor.new do puts :fuga, Ractor.recv # end
r2.send(r1.take)で送られたオブジェクトを受け取ってputsメソッドに渡すことができます。
またRactorが実行された結果を受け取る際にtakeメソッドを使います。
ですのでr1.takeは:hogeを受け取っています。
つまり、r2.send(r1.take)ではr1の実行結果を受け取り、それをr2へと送信しています。
そして、r2内のputs :fuga, Ractor.recvはputs :fuga, :hogeとなり、fugaとhogeがそれぞれ出力されるということです。
これがpush型でのオブジェクトをやり取りしている流れになります。
対してpull型は以下のようなコードになります。
r1 = Ractor.new 42 do |arg| Ractor.yield arg end r2 = Ractor.new r1 do |r1| r1.take end puts r2.take
Ractor.newで渡した引数は|arg|のようにブロック内で使用できる変数として受け取ることができます。
例えば以下のコードはr1でtakeメソッドが実行されるまで処理を待ちます。
r1 = Ractor.new 42 do |arg| Ractor.yield arg end
またRactor.newには別のRactorを渡すことができるため以下のように書くことができます。
r2 = Ractor.new r1 do |r1| r1.take end
これでr1が引数として受け取った42をr2の中で受け取ることができます。
最後にputs r2.takeで42を受け取って出力しています。
pull型はこういった流れになります。
ざっくりと解説すると
push型:Ractor#send+Ractor.recvpull型:Ractor.yield+Ractor#take
という感じです。
より詳細なRactorの解説に関しては下記のリンクを参照していただければと思います。
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues
Ractorのコード
Ractorの生成
RactorはRactor.newでブロックに実行したい処理を書きます。
Ractor.new do # このブロックが並行に実行される end
このブロック内の処理が並行実行されます。
つまり、以下のようなコードの場合
Ractor.new do 10.times do puts :hoge end end 10.times do puts :fuga end
:hogeと:fugaがそれぞれ並行に出力されます。
またRactor.newに実行したい処理をブロックとして渡しますので以下のように書くこともできます。
Ractor.new{ 10.times{ puts :hoge } }
またキーワード引数nameを使って名前を付けることもでき、Ractor#nameで名前をを受け取ることもできます。
r = Ractor.new name: 'r1' do puts :hoge end p r.name # => "r1"
これにより、どのRactorで処理が実行されているかを確認することもできそうです。
Ractorへ引数を渡す
Ractor.newに引数を渡すことでブロック内にオブジェクトを渡すことができます。
r = Ractor.new :hoge do |a| p a end r.take # => :hoge
このように引数を経由してオブジェクトを渡すことができます。
また複数の引数を渡すこともできます
r = Ractor.new :hoge, :fuga do |a, b| p a p b end r.take # => fuga # => hoge
このようにArrayを渡すこともできます。
r = Ractor.new [:hoge, :fuga] do |a| p a.inspect end r.take # => "[:hoge, :fuga]"
ちなみに、|a|を|a, b|に変更すると
r = Ractor.new [:hoge, :fuga] do |a, b| p a p b end r.take # => :hoge # => :fuga
という出力結果になります。これはa, b = [:hoge, :fuga]と同じ挙動と解釈されているようです。
またHashの場合は
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a| p a p a[:hoge] end r.take # => {:hoge=>42, :fuga=>21} # => 42
と出力されます。
ちなみに、Ractor.newの後に()で括っていないとSyntaxErrorになるので注意が必要です。
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a| p a p a[:hoge] end r.take # => SyntaxError
Ractorでの返り値
Ractorでは実行されるブロック内の返り値をtakeメソッドで受け取ることができます。
r = Ractor.new do :hoge end p r.take # => :hoge
ちなみに、ブロック内でreturnをするとLocalJumpErrorとなるようです。
r = Ractor.new do return :fuga :hoge end p r.take # => LocalJumpError
Ractor内での例外
Ractor内での例外は以下のようにして受け取ることができます。
r = Ractor.new do raise 'error' end begin r.take rescue Ractor::RemoteError => e p e.message end
ちなみに、Ractor内でも例外処理を書くことはできます。
r = Ractor.new name: 'r1' do begin raise 'error' rescue => e p e.message end end r.take
またドキュメントによるとRactorのブロック内から返ってきた値を受け取る領域で例外をキャッチできるようです。 つまり以下のようなコードも書くことができます。
r1 = Ractor.new do raise 'error' end r2 = Ractor.new r1 do |r1| begin r1.take rescue Ractor::RemoteError => e p e.message end end r2.take # => "thrown by remote Ractor."
Ractorでの並行実行
簡単な例
こんな感じでRactorで並行実行を行うことができます。
Ractor.new do 3.times do puts 42 end end 3.times do puts 21 end
実行すると42と21という出力がそれぞればらばらに表示されます。
ちょっとした例
以下のような感じで複数のworkerをRactorで生成し、それをpipeで経由して値を渡し、結果をまとめることができます。
require 'prime' pipe = Ractor.new do loop do Ractor.yield Ractor.recv end end N = 1000 RN = 10 workers = (1..RN).map do Ractor.new pipe do |pipe| while n = pipe.take Ractor.yield [n, n.prime?] end end end (1..N).each{|i| pipe << i } pp (1..N).map{ r, (n, b) = Ractor.select(*workers) [n, b] }.sort_by{|(n, b)| n} # => 0 ~ 999 までの数値が素数かどうかの結果を出力
このコードでは10個のworkerを生成し、pipeを経由してそれぞれのworkerにオブジェクトを渡しています。
また、受け取ったオブジェクトをRactor.yield [n, n.prime?]で返しています。
こんな感じでworkerを複数作り、pipe経由で処理させたり、結果を受け取ることができます。
よしなにworkerなどを生成して処理させるクラスを書いてみる
先ほどのコードだとworker内の処理が後々で大きくなることもありそうだったので、以下のようにworkerをよしなに生成してくれるクラスを書いてみました。
class Ninsoku def initialize(task, worker_count: 10) @task = task @pipe = create_pipe @workers = create_workers(worker_count) end def send(arg) @pipe.send arg end def run yield Ractor.select(*@workers) end def create_pipe Ractor.new do loop do Ractor.yield Ractor.recv end end end def create_workers(worker_count) (1..worker_count).map do Ractor.new @pipe, @task do |pipe, task| loop do arg = pipe.take task.send arg Ractor.yield task.take end end end end end
Ninsoku.newでpipeとworkerを生成しています。またtaskは処理させたい内容をRactorで渡し、workerで実行させています。
実際に使うケースとしてはこんな感じです。
task = Ractor.new do func = lambda{|n| n.downcase } loop do Ractor.yield func.call(Ractor.recv) end end ninsoku = Ninsoku.new(task) ('A'..'Z').each{|i| ninsoku.send i } ('A'..'Z').map{ ninsoku.run{|r, n| puts n } } # => a ~ z までが並行して出力される
このクラスはあとでgemにでもしてみようかと思います。
gemにしてみました(rubygemsにはpushしていませんが……)
例えば、僕の住んでいる島根県浜田市のAED位置情報をRactorを使って処理してみます。 なお、島根県のAED位置情報は島根県が公開しているオープンデータを使用させていただきました。この場を借りて感謝を申し上げます。
require "rorker" require "csv" task = Ractor.new do func = lambda{|row| row.map{|value| if value =~ /浜田市/ row end }.compact } loop do Ractor.yield func.call(Ractor.recv) end end rorker = Rorker.new(task) csv = CSV.read "a.csv" csv.each do |row| rorker.send row end n = 0 while n < csv.count rorker.run{|worker, result| if !result.empty? puts result end } n += 1 end
こんな感じで読み取ったCSVを一行づつworkerに渡して並行処理で必要なデータをとってくることもできます。
RactorでNumbered Parameterを使う
Ractorでは処理をブロックで渡しますので、Numbered Parameterを使って引数を受け取ることもできます。
r = Ractor.new :hoge do puts _1 end r.take # => hoge
ちなみに、複数の引数でも動作します。
r = Ractor.new :hoge, :hoge do puts _1 puts _2 end r.take # => hoge # => fuga
複数渡した場合は渡された順番通りに_1から_9に渡されるみたいです。
ちなみに、Hashを渡した場合はこんな感じになります。
r = Ractor.new ({hoge: 1, fuga: 2}) do _1.map do |key, value| p ":#{key} => #{value}" end end r.take # => ":hoge => 1" # => ":fuga => 2"
=>を使ったHashも同様の結果になりました
r = Ractor.new({:hoge => 1, :fuga => 2}) do _1.map do |key, value| p ":#{key} => #{value}" end end r.take # => ":hoge => 1" # => ":fuga => 2"
ただし、Arrayの場合は少し挙動が異なります
r = Ractor.new [1, 2, 3] do puts _1 puts _1.class puts _2 puts _2.class puts _3 puts _3.class end r.take #=> 1 #=> Integer #=> 2 #=> Integer #=> 3 #=> Integer
どうやら通常通り複数の引数を渡した時のようにArrayの先頭から順番に渡されるようです。 おそらくは以下のように解釈されているのではないかと思います。
_1, _2, _3 = [1, 2, 3]
ちなみに、Numbered Parameterで受け取れる数より大きなArrayを渡すと
r = Ractor.new [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] do puts _1 puts _2 puts _3 puts _4 puts _5 puts _6 puts _7 puts _8 puts _9 end r.take #=> 1 #=> 2 #=> 3 #=> 4 #=> 5 #=> 6 #=> 7 #=> 8 #=> 9
このように取得できるのはNumbered Parameterが受け取れる範囲までのようです
Ractor内でNumbered Parameterを使う場合はHashを引数に渡した際か
r = Ractor.new ({hoge: 1, fuga: 2}) do |hash| hash.map do p ":#{_1} => #{_2}" end end r.take ":hoge => 1" ":fuga => 2"
またはいくつかの引数を渡した時に省略して書きたいときに使うことになりそうです
r = Ractor.new :hoge, :fuga do p _1 p _2 end r.take # => :hoge # => :fuga
おわりに
この記事を読んでRactorについて興味を持って頂ければ幸いです。
今後もRactorを使って試したコードを追加していこうと思います
参考
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- https://gist.github.com/niku/c19da11edf0b97470af27844b44d12fa
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues