はじめに
この記事はRuby3で導入される並列・並行処理のための新機能Ractor
に興味のある方向けの記事になります。
簡単なRactor
を使ったサンプルコードを解説しつつ、理解を深めることができるように書いてみました(ほとんど未来の僕へ向けた記事になっている感じはありますが……)。
また、Ractor
自体でどういったことができるのかを調べた結果をまとめた記事でもあります。
そのため後半はRactor
で色々遊んだ時のコードをもとに挙動を解説しています。
環境
- Windows10 2004
- WSL2(Ubuntu 18.04)
- ruby 3.0.0dev (2020-09-07T04:29:42Z master 17a27060a7) [x86_64-linux]
Ractorとは?
そもそもRactor
について知らない人もいると思いますので、簡単に紹介します。
Ractor
とはRuby3で導入される並列・並行処理のための新機能になります。機能自体の提案は数年前からあり、当時はGuild
という名前で提案されていました。
しかし、ゲーム業界から「Guildという名前は使っているので、別の名前にしてほしい」という声があり、現在のRactor
へと変わりました。
Actor
モデルを参考にしているようで、そのためRactor(Ruby’s Actor)
という名前に変更されたようです。
Ractor
は並行実行の単位であり、それぞれが並行して実行されます。
たとえば以下のコードではputs :hello
とputs :hello
はそれぞれ並行して実行されます。
Ractor.new do 5.times do puts :hello end end 5.times do puts :world end
このコードを実行すると以下のような結果になります。
world helloworld hello world helloworld helloworld hello
このようにそれぞれの処理を並行して実行できます。
またRactor
は別のRactor
へとオブジェクトを送受信し、同期しながら実行することもできます。同期の方法としてはpush型
とpull型
の二つがあります。
たとえばpush型
の場合は以下のようなコードになります。
r1 = Ractor.new do :hoge end r2 = Ractor.new do puts :fuga, Ractor.recv end r2.send(r1.take) r2.take # => :fuga, :hoge
Ractor
ではsend
メソッドを使い、別のRactor
へとオブジェクトを送ることができます。上記のコードだと
r2.send(r1.take)
の部分でr2
へと送信しています。
送信されたオブジェクトはRactor
内でRactor.recv
で受け取ることができ
r2 = Ractor.new do puts :fuga, Ractor.recv # end
r2.send(r1.take)
で送られたオブジェクトを受け取ってputs
メソッドに渡すことができます。
またRactor
が実行された結果を受け取る際にtake
メソッドを使います。
ですのでr1.take
は:hoge
を受け取っています。
つまり、r2.send(r1.take)
ではr1
の実行結果を受け取り、それをr2
へと送信しています。
そして、r2
内のputs :fuga, Ractor.recv
はputs :fuga, :hoge
となり、fuga
とhoge
がそれぞれ出力されるということです。
これがpush型
でのオブジェクトをやり取りしている流れになります。
対してpull型
は以下のようなコードになります。
r1 = Ractor.new 42 do |arg| Ractor.yield arg end r2 = Ractor.new r1 do |r1| r1.take end puts r2.take
Ractor.new
で渡した引数は|arg|
のようにブロック内で使用できる変数として受け取ることができます。
例えば以下のコードはr1
でtake
メソッドが実行されるまで処理を待ちます。
r1 = Ractor.new 42 do |arg| Ractor.yield arg end
またRactor.new
には別のRactor
を渡すことができるため以下のように書くことができます。
r2 = Ractor.new r1 do |r1| r1.take end
これでr1
が引数として受け取った42
をr2
の中で受け取ることができます。
最後にputs r2.take
で42
を受け取って出力しています。
pull型
はこういった流れになります。
ざっくりと解説すると
push型
:Ractor#send
+Ractor.recv
pull型
:Ractor.yield
+Ractor#take
という感じです。
より詳細なRactor
の解説に関しては下記のリンクを参照していただければと思います。
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues
Ractorのコード
Ractorの生成
Ractor
はRactor.new
でブロックに実行したい処理を書きます。
Ractor.new do # このブロックが並行に実行される end
このブロック内の処理が並行実行されます。
つまり、以下のようなコードの場合
Ractor.new do 10.times do puts :hoge end end 10.times do puts :fuga end
:hoge
と:fuga
がそれぞれ並行に出力されます。
またRactor.new
に実行したい処理をブロックとして渡しますので以下のように書くこともできます。
Ractor.new{ 10.times{ puts :hoge } }
またキーワード引数name
を使って名前を付けることもでき、Ractor#name
で名前をを受け取ることもできます。
r = Ractor.new name: 'r1' do puts :hoge end p r.name # => "r1"
これにより、どのRactor
で処理が実行されているかを確認することもできそうです。
Ractorへ引数を渡す
Ractor.new
に引数を渡すことでブロック内にオブジェクトを渡すことができます。
r = Ractor.new :hoge do |a| p a end r.take # => :hoge
このように引数を経由してオブジェクトを渡すことができます。
また複数の引数を渡すこともできます
r = Ractor.new :hoge, :fuga do |a, b| p a p b end r.take # => fuga # => hoge
このようにArray
を渡すこともできます。
r = Ractor.new [:hoge, :fuga] do |a| p a.inspect end r.take # => "[:hoge, :fuga]"
ちなみに、|a|
を|a, b|
に変更すると
r = Ractor.new [:hoge, :fuga] do |a, b| p a p b end r.take # => :hoge # => :fuga
という出力結果になります。これはa, b = [:hoge, :fuga]
と同じ挙動と解釈されているようです。
またHash
の場合は
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a| p a p a[:hoge] end r.take # => {:hoge=>42, :fuga=>21} # => 42
と出力されます。
ちなみに、Ractor.new
の後に()
で括っていないとSyntaxError
になるので注意が必要です。
r = Ractor.new({:hoge => 42, :fuga => 21}) do |a| p a p a[:hoge] end r.take # => SyntaxError
Ractorでの返り値
Ractorでは実行されるブロック内の返り値をtake
メソッドで受け取ることができます。
r = Ractor.new do :hoge end p r.take # => :hoge
ちなみに、ブロック内でreturn
をするとLocalJumpError
となるようです。
r = Ractor.new do return :fuga :hoge end p r.take # => LocalJumpError
Ractor内での例外
Ractor内での例外は以下のようにして受け取ることができます。
r = Ractor.new do raise 'error' end begin r.take rescue Ractor::RemoteError => e p e.message end
ちなみに、Ractor内でも例外処理を書くことはできます。
r = Ractor.new name: 'r1' do begin raise 'error' rescue => e p e.message end end r.take
またドキュメントによるとRactorのブロック内から返ってきた値を受け取る領域で例外をキャッチできるようです。 つまり以下のようなコードも書くことができます。
r1 = Ractor.new do raise 'error' end r2 = Ractor.new r1 do |r1| begin r1.take rescue Ractor::RemoteError => e p e.message end end r2.take # => "thrown by remote Ractor."
Ractorでの並行実行
簡単な例
こんな感じでRactorで並行実行を行うことができます。
Ractor.new do 3.times do puts 42 end end 3.times do puts 21 end
実行すると42
と21
という出力がそれぞればらばらに表示されます。
ちょっとした例
以下のような感じで複数のworker
をRactor
で生成し、それをpipe
で経由して値を渡し、結果をまとめることができます。
require 'prime' pipe = Ractor.new do loop do Ractor.yield Ractor.recv end end N = 1000 RN = 10 workers = (1..RN).map do Ractor.new pipe do |pipe| while n = pipe.take Ractor.yield [n, n.prime?] end end end (1..N).each{|i| pipe << i } pp (1..N).map{ r, (n, b) = Ractor.select(*workers) [n, b] }.sort_by{|(n, b)| n} # => 0 ~ 999 までの数値が素数かどうかの結果を出力
このコードでは10個のworker
を生成し、pipe
を経由してそれぞれのworker
にオブジェクトを渡しています。
また、受け取ったオブジェクトをRactor.yield [n, n.prime?]
で返しています。
こんな感じでworker
を複数作り、pipe
経由で処理させたり、結果を受け取ることができます。
よしなにworkerなどを生成して処理させるクラスを書いてみる
先ほどのコードだとworker
内の処理が後々で大きくなることもありそうだったので、以下のようにworker
をよしなに生成してくれるクラスを書いてみました。
class Ninsoku def initialize(task, worker_count: 10) @task = task @pipe = create_pipe @workers = create_workers(worker_count) end def send(arg) @pipe.send arg end def run yield Ractor.select(*@workers) end def create_pipe Ractor.new do loop do Ractor.yield Ractor.recv end end end def create_workers(worker_count) (1..worker_count).map do Ractor.new @pipe, @task do |pipe, task| loop do arg = pipe.take task.send arg Ractor.yield task.take end end end end end
Ninsoku.new
でpipe
とworker
を生成しています。またtask
は処理させたい内容をRactor
で渡し、worker
で実行させています。
実際に使うケースとしてはこんな感じです。
task = Ractor.new do func = lambda{|n| n.downcase } loop do Ractor.yield func.call(Ractor.recv) end end ninsoku = Ninsoku.new(task) ('A'..'Z').each{|i| ninsoku.send i } ('A'..'Z').map{ ninsoku.run{|r, n| puts n } } # => a ~ z までが並行して出力される
このクラスはあとでgem
にでもしてみようかと思います。
gem
にしてみました(rubygemsにはpushしていませんが……)
例えば、僕の住んでいる島根県浜田市のAED位置情報をRactorを使って処理してみます。 なお、島根県のAED位置情報は島根県が公開しているオープンデータを使用させていただきました。この場を借りて感謝を申し上げます。
require "rorker" require "csv" task = Ractor.new do func = lambda{|row| row.map{|value| if value =~ /浜田市/ row end }.compact } loop do Ractor.yield func.call(Ractor.recv) end end rorker = Rorker.new(task) csv = CSV.read "a.csv" csv.each do |row| rorker.send row end n = 0 while n < csv.count rorker.run{|worker, result| if !result.empty? puts result end } n += 1 end
こんな感じで読み取ったCSVを一行づつworkerに渡して並行処理で必要なデータをとってくることもできます。
RactorでNumbered Parameterを使う
Ractor
では処理をブロックで渡しますので、Numbered Parameter
を使って引数を受け取ることもできます。
r = Ractor.new :hoge do puts _1 end r.take # => hoge
ちなみに、複数の引数でも動作します。
r = Ractor.new :hoge, :hoge do puts _1 puts _2 end r.take # => hoge # => fuga
複数渡した場合は渡された順番通りに_1
から_9
に渡されるみたいです。
ちなみに、Hashを渡した場合はこんな感じになります。
r = Ractor.new ({hoge: 1, fuga: 2}) do _1.map do |key, value| p ":#{key} => #{value}" end end r.take # => ":hoge => 1" # => ":fuga => 2"
=>
を使ったHashも同様の結果になりました
r = Ractor.new({:hoge => 1, :fuga => 2}) do _1.map do |key, value| p ":#{key} => #{value}" end end r.take # => ":hoge => 1" # => ":fuga => 2"
ただし、Arrayの場合は少し挙動が異なります
r = Ractor.new [1, 2, 3] do puts _1 puts _1.class puts _2 puts _2.class puts _3 puts _3.class end r.take #=> 1 #=> Integer #=> 2 #=> Integer #=> 3 #=> Integer
どうやら通常通り複数の引数を渡した時のようにArrayの先頭から順番に渡されるようです。 おそらくは以下のように解釈されているのではないかと思います。
_1, _2, _3 = [1, 2, 3]
ちなみに、Numbered Parameter
で受け取れる数より大きなArray
を渡すと
r = Ractor.new [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] do puts _1 puts _2 puts _3 puts _4 puts _5 puts _6 puts _7 puts _8 puts _9 end r.take #=> 1 #=> 2 #=> 3 #=> 4 #=> 5 #=> 6 #=> 7 #=> 8 #=> 9
このように取得できるのはNumbered Parameter
が受け取れる範囲までのようです
Ractor
内でNumbered Parameter
を使う場合はHash
を引数に渡した際か
r = Ractor.new ({hoge: 1, fuga: 2}) do |hash| hash.map do p ":#{_1} => #{_2}" end end r.take ":hoge => 1" ":fuga => 2"
またはいくつかの引数を渡した時に省略して書きたいときに使うことになりそうです
r = Ractor.new :hoge, :fuga do p _1 p _2 end r.take # => :hoge # => :fuga
おわりに
この記事を読んでRactor
について興味を持って頂ければ幸いです。
今後もRactor
を使って試したコードを追加していこうと思います
参考
- A proposal of new concurrency model for Ruby 3
- Guild Prototype
- Ruby向け並列化機構Guildの試作
- Guild → Ractor
- [JA] Ractor report / Koichi Sasada ko1
- https://github.com/ko1/ruby/blob/ractor/ractor.ja.md
- Ractor - Ruby's Actor-like concurrent abstraction
- https://gist.github.com/niku/c19da11edf0b97470af27844b44d12fa
- Ractor: a proposal for a new concurrent abstraction without thread-safety issues