[Python] マルチプロセスな処理を実装して、処理を高速化する
こんにちは、@yoheiMuneです。
Pythonでは、multiprocessingモジュールを利用して、簡単にマルチプロセスプログラミングを行うことができます。今回はプロセスを複数立ち上げる実装方法を、ブログに書きたいと思います。
Pythonの場合、グローバルインタプリタロック(GIL)という仕組みがあり、同一プロセス上では(例えマルチスレッドだったとしても)同時に1つの処理しか並列して行われません(その恩恵として、変数のスレッドセーフが保たれる)。ただこの仕組みのため、マルチコアなPCなどでは思うようにパフォーマンスが出ない場合があります(特にCPUバウンドな処理の場合)。
プロセスを分けるとGILの仕組みから解放されるため、CPUがマルチコアな場合にマシン性能を生かすことができます。今日はその、マルチプロセスなお話です。
17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.1 ドキュメント
最後になりますが本ブログでは、Python・Go言語・Linux・Node.js・フロントエンド・インフラ・開発関連・Swift・Java・機械学習など雑多に情報発信をしていきます。自分の第2の脳にすべく、情報をブログに貯めています。気になった方は、本ブログのRSSやTwitterをフォローして頂けると幸いです ^ ^。
最後までご覧頂きましてありがとうございました!
Pythonでは、multiprocessingモジュールを利用して、簡単にマルチプロセスプログラミングを行うことができます。今回はプロセスを複数立ち上げる実装方法を、ブログに書きたいと思います。
目次
Pythonで並列処理を実装するには
多くのプログラミング言語では並列処理の実装方法として、マルチスレッドとマルチプロセスの2つが提供されています。マルチスレッドはプロセス1つでスレッドを複数にする方法で、マルチプロセスはプロセスを複数立ち上げる(ps auxなどで複数見える)方法です。Pythonの場合、グローバルインタプリタロック(GIL)という仕組みがあり、同一プロセス上では(例えマルチスレッドだったとしても)同時に1つの処理しか並列して行われません(その恩恵として、変数のスレッドセーフが保たれる)。ただこの仕組みのため、マルチコアなPCなどでは思うようにパフォーマンスが出ない場合があります(特にCPUバウンドな処理の場合)。
プロセスを分けるとGILの仕組みから解放されるため、CPUがマルチコアな場合にマシン性能を生かすことができます。今日はその、マルチプロセスなお話です。
準備
Pythonにおけるマルチプロセスな実装はmultiprocessingモジュールを用いますが、これは標準モジュールなので、以下のインポートするだけで利用できます。import multiprocessing
サブプロセスを作成して、マルチプロセスに処理を実行する
multiprocessing.Processを用いると、指定した関数をマルチプロセスで処理することができます。
from multiprocessing import Process
# 呼び出したい関数
def f1(name):
print("Hello", name)
print("Sleeping... 3s")
time.sleep(3)
print("Good morning", name)
if __name__ == "__main__":
# サブプロセスを作成します
p = Process(target=f1, args=("Bob",))
# 開始します
p.start()
print("Process started.")
# サブプロセス終了まで待ちます
p.join()
print("Process joined.")
上記を実行すると以下のような出力となり、f1の中でsleepする処理も含め、メインプロセスがそれを待つようになっています。Process started. Hello Bob Sleeping... 3s Good morning Bob Process joined.
キューを用いて、プロセス間でデータのやり取りを行う
キュー(Queue)を用いることで、安全に(プロセスセーフでかつスレッドセーフに)値の受け渡しを行うことができます。
from multiprocessing import Queue
def f2(q):
time.sleep(3)
# 3秒後に、キューに値を渡します.
q.put([42, None, "Hello"])
if __name__ == "__main__":
# スレッド間でやり取りするためのキューを作成します.
q = Queue()
# キューを引数に渡して、サブプロセスを作成します.
p = Process(target=f2, args=(q,))
# サブプロセスを開始します.
p.start()
# q.get()できるまで待ちます.
print(q.get())
# サブプロセス完了を待ちます.
p.join()
上記を実行すると以下のような出力となります。[42, None, 'Hello']
パイプを用いて、プロセス間でデータのやり取りを行う
キューと同じように、パイプ(Pipe)を使うことでもプロセス間で値のやり取りをすることができます。ただし、パイプの両端で自由に読み書きをすると、データが破損する可能性があるので、基本的にはどちらかを読み取り専用、もう一方を書き込み専用に使うと良いです(異なるプロセス/スレッドが同じ端で同時に読み書きすると、データ破損する可能性があり、このバグは毎回再現するわけではないので、デバッグが非常に大変です・・)。
from multiprocessing import Pipe
def f3(conn):
time.sleep(3)
# パイプにデータを送信します.
conn.send({ "age" : 30, "name" : "Yohei" })
# パイプをクローズします.
conn.close()
# クローズ後に書き込もうとすると、エラーになります(OSError: handle is closed).
# conn.send([42, None, "Hello"])
if __name__ == "__main__":
# Pipeを生成します(デフォルトでは双方向にやり取りできるパイプ)
# 双方向にやり取りできますが、両端で自由に読み書きしているとデータが壊れる可能性があるので、
# 基本的にはどちらかを書き込み専用、どちらかを読み込み専用に扱います.
parent_conn, child_conn = Pipe()
# Pipeの片方の端を、サブプロセスに渡します.
p = Process(target=f3, args=(child_conn,))
# サブプロセスを開始します.
p.start()
# Pipeから値が取得できるまで待ちます.
print(parent_conn.recv())
# サブプロセスの終了を待ちます.
p.join()
上記を実行すると、以下のように出力されます。
{'age': 30, 'name': 'Yohei'}
共有メモリを用いて、プロセス間でデータを共有する
今まではキューやパイプという仕組みを通してデータをやり取りする方法でしたが、共有メモリ(Shared memory)を使うことでプロセスセーフ(スレッドセーフ)に変数自体を共有することができます。
from multiprocessing import Value, Array
def f5(n, a):
n.value = 3.1415926
for i in range(len(a)):
a[i] *= -1
if __name__ == "__main__":
# 共有メモリ(Value)を作成します.
num = Value('d', 0.0)
# 共有メモリ(Array)を作成します.
arr = Array('i', range(10))
# サブプロセスを作り、実行します.
p = Process(target=f5, args=(num, arr))
p.start()
p.join()
# 共有メモリ(Value)から値を取り出します
print(num.value)
# 共有メモリ(Array)から値を取り出します
print(arr[:])
上記を実行すると以下のような出力にされます。3.1415926 [0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
マネージャーを用いて、プロセス間でデータを共有する
共有メモリと似ていますが、マネージャーを用いることでもプロセス間の状態の共有を行うことができます。共有メモリよりも扱えるデータ型が多いです(辞書型なども扱える)。
from multiprocessing import Manager
def f6(d, l):
# 辞書型に値を詰め込みます.
d[1] = '1'
d["2"] = 2
d[0.25] = None
# 配列を操作します(ここでは逆順に).
l.reverse()
if __name__ == "__main__":
# マネージャーを生成します.
with Manager() as manager:
# マネージャーから辞書型を生成します.
d = manager.dict()
# マネージャーから配列を生成します.
l = manager.list(range(10))
# サブプロセスを作り実行します.
p = Process(target=f6, args=(d,l))
p.start()
p.join()
# 辞書からデータを取り出します.
print(d)
# 配列からデータを取り出します.
print(l)
上記を実行すると以下の通りです。
{1: '1', '2': 2, 0.25: None}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
Lockを用いて、実行順を制御する
ロック(Lock)が提供するロックの取得と解放の機能を使うことで、処理順を制御することができます。
from multiprocessing import Lock
def f4(lock, i):
# ロックを取得します.
lock.acquire()
# ロック中は、他のプロセスやスレッドがロックを取得できません(ロックが解放されるまで待つ)
try:
print('Hello', i)
finally:
# ロックを解放します.
lock.release()
if __name__ == "__main__":
# ロックを作成します.
lock = Lock()
for num in range(10):
Process(target=f4, args=(lock, num)).start()
上記を実行すると、以下のように出力されます。Hello 0 Hello 1 Hello 2 Hello 3 Hello 4 Hello 5 Hello 6 Hello 7 Hello 8 Hello 9
プロセスプールを使って、サブプロセスを使い回す
プール(Pool)を用いることで、指定した個数のサブプロセスを作り、それらで処理をマルチプロセスに実行することができます。
from multiprocessing import Pool, TimeoutError
def f7(x):
return x*x
if __name__ == "__main__":
# 4つのプロセスを開始します.
with Pool(processes=4) as pool:
# 0〜9の10個の値を、4つのプロセスで処理します.
print(pool.map(f7, range(10)))
# print: [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
# ひとつ上と似ていますが、プロセスの実行順が不定になります.
for i in pool.imap_unordered(f7, range(10)):
print(i)
# print: 0, 1, 4, 16, 9, 25, 36, 49, 64, 81
# "f7(20)"という処理を非同期に実行します(利用するプロセスは1つのみ)
res = pool.apply_async(f7, (20,))
# 処理結果が返ってくるまで、最大1秒間待機します
print(res.get(timeout=1))
# print: 400
# 上記の仕組みを連続して呼ぶと、複数のプロセスで処理を行うことができます.
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(10)]
results = [res.get(timeout=1) for res in multiple_results]
# 10個出力されます
print(results)
# print : [37604, 37607, 37606, 37605, 37604, 37607, 37606, 37605, 37604, 37607]
# ただしユニークにすると4つのみ(=プールしたプロセス数)であることがわかります
print(set(results), len(set(results)))
# {37604, 37605, 37606, 37607} 4
# タイムアウトが発生する場合のサンプルです
res = pool.apply_async(time.sleep, (10,))
try:
res.get(timeout=1)
except TimeoutError as e:
print("Timeout.....!!!", e)
この仕組みを用いると、最大同時実行数などを制御しながら処理を行うことができます。参考資料
Pythonのマルチプロセスは、以下のページを参考にしました。ありがとうございます。17.2. multiprocessing — プロセスベースの並列処理 — Python 3.6.1 ドキュメント
最後に
Pythonで処理の並列化を行いたい案件があり色々と調べていて、その結果をブログに残しました。Pythonは面白いですね。今後もブログを書いていきたいと思います。最後になりますが本ブログでは、Python・Go言語・Linux・Node.js・フロントエンド・インフラ・開発関連・Swift・Java・機械学習など雑多に情報発信をしていきます。自分の第2の脳にすべく、情報をブログに貯めています。気になった方は、本ブログのRSSやTwitterをフォローして頂けると幸いです ^ ^。
最後までご覧頂きましてありがとうございました!






