こんにちはヤク学長です。
データサイエンティスト兼ファーマシストで、アルゴリズムやBI開発を行っています。

本記事の目的は、「pythonの基本操作を知る」ことを目的としています。

【ステップアップ】「Pythonの実践」簡単速習‼【簡易テスト/応用④】

【本記事のもくじ】

まず、「Python」に真剣に取り組むための概要を解説します。
下記の方法で、簡単に概要を抑えることができます。

  • 1.並列化

それでは、上から順番に見ていきます。
なお、本上記の方法を順番に抑えれば成果が出ます。

記事の内容は「転載 & 引用OK」問題ありません。

1.並列化

マルチスレッドとマルチプロセス

マルチスレッドとマルチプロセスは、コンピュータの並列処理においてよく使われる技術で、並列処理を実現するために使用されます。

マルチスレッドは、1つのプロセス内で複数のスレッドを同時に動作させる技術です。スレッドとは、プログラムの実行単位であり、同じプロセス内で共有されたメモリ空間を使用します。複数のスレッドが同時に実行されることで、処理速度を向上させることができます。ただし、スレッド同士が共有するメモリ空間を操作する際に、同期やロックの問題が発生する場合があり、それによってプログラムのバグが発生することがあります。

一方、マルチプロセスは、複数のプロセスを同時に動作させる技術です。プロセスとは、プログラムの実行単位であり、それぞれが独自のメモリ空間を持ちます。複数のプロセスが同時に実行されることで、複数のCPUを使用し、処理速度を向上させることができます。また、プロセス同士が独立しているため、同期やロックの問題が発生することはありません。ただし、プロセス間の通信を行う場合には、明示的にデータを共有する必要があります。

マルチスレッドとマルチプロセスは、どちらが優れているというわけではなく、それぞれの技術には長所と短所があります。適切に選択することで、並列処理を効果的に行い、処理速度の向上やシステムの応答性の向上などのメリットを得ることができます。

スレッド

Pythonの標準ライブラリであるthreadingを使用することで、マルチスレッドプログラミングを簡単に実現することができます。threadingモジュールは、Pythonのスレッドを扱うためのクラスや関数を提供します。

以下に、threadingを使用してスレッドを作成する方法と、スレッド間のデータの共有方法について説明します。

スレッドの作成

スレッドを作成するには、threading.Threadクラスを使用します。Threadクラスには、実行する関数またはメソッドを指定するtarget引数があります。以下に例を示します。

import threading

def worker():
print('Working...')
# スレッドを作成して実行する
t = threading.Thread(target=worker)
t.start()

print('Done.')

この例では、worker関数を実行するスレッドを作成しています。Threadクラスのインスタンスを作成し、startメソッドを呼び出すことで、スレッドが実行されます。worker関数は、t.start()の呼び出しとは別に実行され、スレッドとして実行されます。Done.というメッセージは、スレッドの実行が終了した後に表示されます。

スレッド間のデータ共有

複数のスレッドが同じ変数やオブジェクトにアクセスする場合、同期が必要になります。Pythonのthreadingモジュールでは、Lockオブジェクトを使用することで、複数のスレッドが同じ変数やオブジェクトにアクセスする際に同期をとることができます。

以下に、Lockオブジェクトを使用して複数のスレッドが同じ変数を操作する例を示します。

import threading

# 共有変数
counter = 0
# Lockオブジェクト
lock = threading.Lock()

def worker():
global counter
# Lockを取得する
lock.acquire()

try:
# 共有変数を操作する
counter += 1
finally:
# Lockを解放する
lock.release()
# スレッドを作成して実行する
threads = []
for i in range(10):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
# すべてのスレッドの実行が終了するまで待機する
for t in threads:
t.join()

print('counter:', counter)

レッドは、worker関数を実行し、counter変数を1ずつ増加させます。複数のスレッドが同時にcounter変数を増加させようとすると、競合が発生し、予期せぬ結果が得られる可能性があります。そのため、スレッドがcounter変数を操作する前に、Lockオブジェクトを取得するようにしています。Lockオブジェクトを取得することで、他のスレッドがcounter変数にアクセスできなくなります。counter変数を操作した後は、Lockオブジェクトを解放することで、他のスレッドがcounter変数にアクセスできるようになります。

上記のコードでは、threadsというリストにThreadオブジェクトを追加し、それぞれのスレッドを開始しています。joinメソッドを使用して、すべてのスレッドの実行が終了するまで待機します。最後に、counter変数の値を表示しています。

スレッド間のデータ共有(Queue)

もうひとつのスレッド間のデータ共有の方法として、Queueを使用することができます。Queueは、スレッド間でデータを受け渡すためのデータ構造です。以下に、Queueを使用して複数のスレッドがデータを受け渡す例を示します。

import threading
import queue
import time

# Queueオブジェクト
q = queue.Queue()
def producer():
for i in range(10):
# Queueにデータを追加する
q.put(i)
time.sleep(1)
def consumer():
while True:
# Queueからデータを取得する
data = q.get()
if data is None:
break
print(data)
time.sleep(0.5)
# 生産者スレッドを作成して実行する
pt = threading.Thread(target=producer)
pt.start()
# 消費者スレッドを複数作成して実行する
threads = []
for i in range(3):
t = threading.Thread(target=consumer)
threads.append(t)
t.start()
# 生産者スレッドの実行が終了するまで待機する
pt.join()

# Queueに終了を示すデータを追加する
for i in range(3):
q.put(None)
# 消費者スレッドの実行が終了するまで待機する
for t in threads:
t.join()

この例では、producer関数がQueueにデータを追加し、consumer関数が`

Queueからデータを取得しています。consumer関数は、Queueからデータを取得するために無限ループを実行しています。Queueから取得したデータがNoneであれば、ループを終了します。producerスレッドは、Queueに10個のデータを追加しています。producerスレッドの実行が終了した後、consumerスレッドがNoneを取得するまで待機します。

Queueを使用することで、複数のスレッドが同時にデータを受け渡すことができます。Queueは、スレッドセーフなデータ構造であるため、複数のスレッドが同時にデータを取得しようとしても、競合が発生することはありません。

GIL(グローバルインタプリタロック)

Pythonには、GILと呼ばれる仕組みがあります。GILとは、Pythonの処理系が一度に実行できるスレッドを1つに制限する仕組みです。つまり、Pythonのスレッドは、実際には1つのCPUコアでしか実行されません。

GILは、Pythonのスレッドが競合状態を起こすことを防ぐために導入されたものです。しかし、GILの影響で、PythonのスレッドはCPUコアを複数使って並列に処理することができないため、CPUバウンドな処理には向いていません。

一方、I/Oバウンドな処理(ファイル入出力、ネットワーク通信など)には向いています。I/Oバウンドな処理では、CPUの処理がI/O待ちになることが多いため、スレッドを切り替えて別のI/O処理を実行することで、全体の処理時間を短縮することができます。また、GILは、C拡張モジュールを使用した処理など、Pythonの処理系外で実行される処理には影響を与えません。

スレッドに渡す引数

Pythonのthreadingモジュールを使ってスレッドを作成する場合、スレッドに渡す引数を指定することができます。スレッドに渡す引数は、スレッドを作成する際のargs引数にタプル形式で指定します。

以下は、引数を渡してスレッドを作成する例です。

import threading
def worker(num, name):
print("Worker %s: %s" % (num, name))
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i, "Thread %s" % i))
threads.append(t)
t.start()

Pythonのthreadingモジュールを使ってスレッドを作成する場合、スレッドに渡す引数を指定することができます。スレッドに渡す引数は、スレッドを作成する際のargs引数にタプル形式で指定します。

以下は、引数を渡してスレッドを作成する例です。

import threading
def worker(num, name):
print("Worker %s: %s" % (num, name))

threads = [] for i in range(5): t = threading.Thread(target=worker, args=(i, "Thread %s" % i)) threads.append(t) t.start()

この例では、workerという関数を作成し、引数としてnumnameを受け取っています。threadsというリストにスレッドオブジェクトを追加し、start()メソッドでスレッドを開始しています。

target引数には、スレッドが実行する関数を指定します。args引数には、worker関数に渡す引数をタプル形式で指定します。この例では、スレッドごとに異なるnumnameを指定しています。

スレッドに渡す引数は、タプルで指定することができます。引数が複数ある場合は、カンマで区切ってタプルにまとめます。例えば、(1, "test", True)という引数を渡す場合は、以下のように指定します。

t = threading.Thread(target=worker, args=(1, "test", True))

このように、スレッドに渡す引数を指定することで、スレッドの処理に必要な情報をスレッド内で利用することができます。

デーモンスレッド

Pythonのthreadingモジュールでは、デーモンスレッドを作成することができます。デーモンスレッドは、プログラムが終了した際に自動的に終了するスレッドであり、バックグラウンドで実行する処理に利用されます。

デーモンスレッドを作成する場合、スレッドオブジェクトのdaemon属性をTrueに設定します。デフォルトでは、daemon属性はFalseになっているため、明示的に設定する必要があります。

以下は、デーモンスレッドを作成する例です。

import threading
import time

def worker():
while True:
print("Worker thread is running...")
time.sleep(1)
t = threading.Thread(target=worker)
t.daemon = True
t.start()
print("Main thread is running...")
time.sleep(5)
print("Main thread is done.")

この例では、workerという関数を作成し、1秒ごとにメッセージを出力する無限ループを実行しています。tというスレッドオブジェクトを作成し、daemon属性をTrueに設定しています。t.start()でスレッドを開始しています。

daemon属性をTrueに設定することで、プログラムが終了する際に自動的にスレッドが終了するようになります。この例では、メインスレッドが開始され、5秒待機した後に終了するため、5秒後にスレッドが自動的に終了します。

デーモンスレッドを作成する場合、注意点があります。デーモンスレッドは、プログラムが終了した際に自動的に終了するため、スレッド内で実行している処理が不完全な状態で終了する可能性があります。したがって、デーモンスレッド内で実行する処理は、安全に中断できるように実装する必要があります。

生存中のThreadオブジェクト全てのリスト

Pythonのthreadingモジュールでは、enumerate()関数を使用して現在生存しているThreadオブジェクトのリストを取得することができます。enumerate()関数は、現在生存している全てのスレッドをリストとして返します。

以下は、enumerate()関数を使用して現在生存している全てのスレッドをリストとして取得する例です。

import threading
import time

def worker():
while True:
print("Worker thread is running...")
time.sleep(1)
t1 = threading.Thread(target=worker)
t1.start()

t2 = threading.Thread(target=worker)
t2.start()
while True:
threads = threading.enumerate()
print("Number of threads:", len(threads))
for thread in threads:
print("Thread name:", thread.getName())
time.sleep(5)

この例では、workerという関数を作成し、無限ループでメッセージを出力するスレッドを2つ作成しています。その後、無限ループで現在生存しているスレッドの数と各スレッドの名前を表示しています。time.sleep(5)により、5秒ごとに現在のスレッドリストが表示されます。

この例では、2つのスレッドを作成しているため、初回の出力で”Number of threads: 3″と表示されます。スレッドの数が3となる理由は、Threadオブジェクトを作成したスレッドの他に、メインスレッドが存在するためです。その後、5秒ごとにスレッドの数と各スレッドの名前が表示されます。

enumerate()関数は、現在生存している全てのスレッドをリストとして返すため、メインスレッドも含まれます。そのため、スレッドの数が常に1つ多くなることに注意してください。

タイマー

Pythonのthreadingモジュールを使用すると、指定された時間後に関数を実行するタイマーを作成することができます。Timerクラスを使用して、指定された秒数後に関数を実行することができます。以下は、Timerクラスを使用してタイマーを作成する簡単な例です。

import threading

def hello():
print("Hello, world!")
timer = threading.Timer(5.0, hello)
timer.start()

Pythonのthreadingモジュールを使用すると、指定された時間後に関数を実行するタイマーを作成することができます。Timerクラスを使用して、指定された秒数後に関数を実行することができます。以下は、Timerクラスを使用してタイマーを作成する簡単な例です。

python
import threading def hello(): print("Hello, world!") timer = threading.Timer(5.0, hello) timer.start()

この例では、helloという関数を定義し、その関数を5秒後に実行するタイマーを作成しています。Timerクラスのコンストラクタには、最初の引数として待機する秒数を指定し、2番目の引数として実行する関数を指定します。timer.start()により、タイマーが開始されます。

5秒後に、hello関数が実行され、”Hello, world!”というメッセージが表示されます。

また、Timerクラスは、スレッドを使用して実装されているため、タイマーをキャンセルすることもできます。cancel()メソッドを使用して、タイマーをキャンセルすることができます。以下は、タイマーをキャンセルする例です。

import threading

def hello():
print("Hello, world!")
timer = threading.Timer(5.0, hello)
timer.start()

# タイマーをキャンセルする
timer.cancel()

この例では、Timerクラスを使用してタイマーを作成していますが、timer.cancel()により、タイマーがキャンセルされます。このため、5秒後にhello関数は実行されません。

スレッドのLockとRLock

Pythonのthreadingモジュールでは、複数のスレッドが同時に同じリソースにアクセスする場合に、競合状態(コンカレンシー)を防ぐために、LockおよびRLockオブジェクトが使用されます。

Lockオブジェクトは、単一のスレッドがリソースにアクセスできるようにするために使用されます。Lockオブジェクトが取得されるまで、他のスレッドはブロックされます。

以下は、Lockオブジェクトを使用する例です。

import threading

shared_resource = 0
lock = threading.Lock()
def increment():
global shared_resource
lock.acquire()
shared_resource += 1
lock.release()
threads = []
for i in range(10):
thread = threading.Thread(target=increment)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()

print(shared_resource)

この例では、incrementという関数を定義し、共有リソースとしてshared_resource変数を使用します。Lockオブジェクトを使用して、単一のスレッドがshared_resourceにアクセスできるようにしています。

10個のスレッドを作成し、increment関数を呼び出すことで、shared_resourceの値を1ずつ増加させています。各スレッドは、lock.acquire()によりLockオブジェクトを取得し、lock.release()によりLockオブジェクトを解放します。

thread.join()により、各スレッドが終了するのを待っています。最後に、shared_resourceの値が10になることが期待されます。

RLockオブジェクトは、同じスレッドが同じリソースに再帰的にアクセスできるようにするために使用されます。つまり、同じスレッドがRLockオブジェクトを複数回取得できます。

以下は、RLockオブジェクトを使用する例です。

import threading

shared_resource = 0
lock = threading.RLock()
def increment():
global shared_resource
lock.acquire()
lock.acquire() # RLockを再帰的に取得できる
shared_resource += 1
lock.release()
lock.release()
threads = []
for i in range(10):
thread = threading.Thread(target=increment)
thread.start()
threads.append(thread)
for thread in threads:
thread.join()

print(shared_resource)

この例では、increment関数の中でRLockオブジェクトが再帰的に取得できるようにしています。increment関数内でlock.acquire()が2回呼び出されています。これにより、同じスレ

ッドが2回RLockオブジェクトを取得できます。この例では、10個のスレッドが作成され、increment関数を呼び出してshared_resourceの値を1ずつ増加させています。

最初のスレッドがlock.acquire()を呼び出すと、RLockオブジェクトが取得されます。2回目のlock.acquire()呼び出しは、スレッド自体によって取得されたRLockオブジェクトであるため、ブロックされずに処理を続行できます。lock.release()により、2回目に取得したRLockオブジェクトが解放されます。最後に、lock.release()により、最初に取得したRLockオブジェクトが解放されます。

なお、RLockオブジェクトは、Lockオブジェクトよりも複雑であるため、パフォーマンスが若干低下する場合があります。また、RLockオブジェクトは、多くの場合、スレッドセーフなデータ構造やアルゴリズムを実装する場合に有用です。

以上のように、LockおよびRLockオブジェクトは、競合状態を回避するための重要なツールであり、複数のスレッドが同時にリソースにアクセスする場合に使用されます。RLockオブジェクトは、再帰的なアクセスを許可するため、より複雑なアルゴリズムを実装する場合に便利です。

セマフォ

セマフォは、プロセスやスレッドの間で共有される整数変数で、並行処理における同期や排他制御を実現するために使用されます。セマフォには、値として非負整数が格納され、初期値を指定することができます。

セマフォは、2つの操作をサポートしています。

  • P (プロセスまたはプログラム) 操作: セマフォの値を1減らし、値が負になっている場合は、呼び出し元をブロックします。これは、リソースを使用するために待機するために使用されます。

  • V (VacateまたはRelease) 操作: セマフォの値を1増やし、値が0以下である場合は、待機中のプロセスまたはスレッドを起動して実行可能状態にします。これは、リソースの使用が完了したことを通知するために使用されます。

セマフォは、複数のプロセスやスレッドが同時にリソースにアクセスする場合に役立ちます。例えば、プロデューサーとコンシューマーの問題において、バッファに格納されたアイテム数を管理するためにセマフォが使用されます。プロデューサーは、アイテムをバッファに追加する前に、バッファに空きスロットがあることを確認するために、セマフォを使用します。同様に、コンシューマーは、アイテムを取り出す前に、バッファにアイテムが存在することを確認するために、セマフォを使用します。

Pythonでは、Semaphoreオブジェクトを使用してセマフォを実装できます。Semaphoreオブジェクトは、初期値を指定できます。Semaphoreオブジェクトを使用するためには、acquireメソッドを使用してリソースをロックし、releaseメソッドを使用してリソースを解放します。

以下は、Pythonでセマフォを使用した例です。

import threading

semaphore = threading.Semaphore(3)
def worker():
with semaphore:
print(f"{threading.current_thread().name} acquired semaphore")
# Do some work here
print(f"{threading.current_thread().name} released semaphore")
threads = [threading.Thread(target=worker) for _ in range(10)]
for thread in threads:
thread.start()

for thread in threads:
thread.join()

この例では、セマフォの初期値を3に設定し、10個のスレッドを作成しています

キュー

キューは、データを保持し、FIFO(先入れ先出し)の順序でデータを取り出すことができるデータ構造です。キューには、以下の2つの操作があります。

  • Enqueue(エンキュー):データをキューに追加する操作です。新しい要素は、キューの末尾に追加されます。

  • Dequeue(デキュー):キューからデータを取り出す操作です。最も古い要素は、キューの先頭から取り出されます。

キューは、複数のスレッドが共有することができるため、マルチスレッド環境でのデータ共有に使用されます。Pythonには、スレッドセーフなキューであるQueueオブジェクトがあります。Queueオブジェクトには、以下の2つの操作があります。

  • put(プット):データをキューに追加する操作です。

  • get(ゲット):キューからデータを取り出す操作です。

Queueオブジェクトには、他にもいくつかのメソッドがあります。たとえば、emptyメソッドは、キューが空であるかどうかを返します。また、fullメソッドは、キューが満杯であるかどうかを返します。

以下は、PythonでQueueオブジェクトを使用した例です。

import queue
import threading
def producer(q):
for i in range(10):
q.put(i)
def consumer(q):
while True:
item = q.get()
if item is None:
break
print(f"Got item: {item}")
q.task_done()
q = queue.Queue()
producer_thread = threading.Thread(target=producer, args=(q,))
consumer_thread = threading.Thread(target=consumer, args=(q,))
producer_thread.start()
consumer_thread.start()
q.join()
q.put(None)
producer_thread.join()
consumer_thread.join()

この例では、2つのスレッドを使用しています。1つのスレッドは、データを生成し、もう1つのスレッドは、データを消費します。Queueオブジェクトは、2つのスレッドで共有されます。producer関数は、putメソッドを使用してデータをキューに追加し、consumer関数は、getメソッドを使用してデータを取り出します。task_doneメソッドは、データの処理が完了したことを示します。最後に、putメソッドにNoneを渡して、キューが空であることを示します。joinメソッドは、すべての要素が処

理されるまで待ち合わせるために使用されます。

この例では、キューが空になるまで処理を待つため、q.join()が使用されます。joinメソッドは、task_doneメソッドが呼び出されるまでブロックされます。task_doneメソッドは、データの処理が完了したことを示します。

キューには、他にもいくつかの種類があります。たとえば、優先度付きキューは、各要素に優先度があり、優先度の高い要素が優先的に取り出されます。また、並列処理で使用されるプロセス間通信(IPC)キューもあります。Pythonでは、multiprocessingモジュールを使用してIPCキューを作成できます。

キューは、非常に一般的なデータ構造であり、マルチスレッドやマルチプロセスの並列処理で頻繁に使用されます。Queueオブジェクトは、Pythonの標準ライブラリに含まれているため、使用するための追加のライブラリをインストールする必要がありません。

イベント

Pythonのスレッディングモジュールには、イベントオブジェクトが用意されています。イベントオブジェクトは、スレッド間でシグナルを送信するために使用されます。イベントは、Eventクラスのインスタンスであり、 set()clear()wait()の3つのメソッドを提供します。

set()メソッドは、イベントオブジェクトを”セット”し、 wait()メソッドによってブロックされているスレッドを解放します。 clear()メソッドは、イベントオブジェクトを”クリア”し、以降のwait()メソッドの呼び出しをブロックします。

以下は、イベントオブジェクトを使用した簡単な例です。

import threading
event = threading.Event()
def worker():
print("Worker is waiting for event")
event.wait()
print("Worker has received the event")
t = threading.Thread(target=worker)
t.start()
print("Main is sleeping for 5 seconds")
time.sleep(5)
print("Main is setting the event")
event.set()

この例では、2つのスレッドが使用されています。worker()関数は、イベントがセットされるまで待機し、その後、メッセージを表示します。メインスレッドは、イベントをセットする前に5秒間待機し、set()メソッドを呼び出してイベントをセットします。

この例では、 wait()メソッドによってworker()関数のスレッドがブロックされます。event.set()によって、 wait()メソッドからのブロックが解除され、worker()関数は実行を再開して、メッセージを表示します。

コンディション

Pythonのスレッディングモジュールには、複数のスレッドが同期的に動作するためのConditionオブジェクトが用意されています。 Conditionオブジェクトは、Lockオブジェクトと組み合わせて使用されます。Conditionオブジェクトを使用すると、スレッド間での通信が容易になります。

Conditionオブジェクトには、wait()notify()、および notify_all()の3つのメソッドがあります。

wait()メソッドは、条件が満たされるまでスレッドをブロックし、他のスレッドがnotify()またはnotify_all()メソッドを呼び出すことで、条件が満たされたことを知らせます。

notify()メソッドは、wait()メソッドによってブロックされているスレッドのうち、最初に待機したスレッドを解放します。

notify_all()メソッドは、wait()メソッドによってブロックされているすべてのスレッドを解放します。

以下は、Conditionオブジェクトを使用した簡単な例です。

import threading
condition = threading.Condition()
def producer():
global x
for i in range(1, 6):
with condition:
x = i
print(f"Producer set x to {i}")
condition.notify()
condition.wait()
def consumer():
global x
while True:
with condition:
condition.wait()
print(f"Consumer got x = {x}")
condition.notify()
if x == 5:
break
t1 = threading.Thread(target=producer)
t2 = threading.Thread(target=consumer)
t1.start()
t2.start()
t1.join()
t2.join()

この例では、2つのスレッドが使用されています。producer()関数は、変数xに1〜5の値を設定し、 condition.notify()で通知します。consumer()関数は、 condition.wait()でブロックされ、 condition.notify()で通知を受け取ると、変数xの値を表示します。変数xが5になると、consumer()関数が終了します。

withブロック内の condition.wait()が呼び出されると、ロックがリリースされ、スレッドはブロックされます。通知が来た場合、スレッドはロックを再取得し、次の処理に進みます。withブロックを使用することで、ロックを明示的に取得および解放する必要がなくなり、コードが簡単になります。

バリア

Pythonのスレッディングモジュールには、複数のスレッドが同期的に動作するためのBarrierオブジェクトが用意されています。Barrierオブジェクトは、指定された数のスレッドがwait()メソッドを呼び出すまで、全員が待機します。指定された数のスレッドがwait()メソッドを呼び出すと、すべてのスレッドが同時に実行を再開します。

以下は、Barrierオブジェクトを使用した簡単な例です。

import threading
barrier = threading.Barrier(3)
def worker():
print(f"Worker {threading.current_thread().name} is waiting.")
barrier.wait()
print(f"Worker {threading.current_thread().name} resumed.")
t1 = threading.Thread(target=worker, name='A')
t2 = threading.Thread(target=worker, name='B')
t3 = threading.Thread(target=worker, name='C')
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()

この例では、3つのスレッドが使用されています。worker()関数は、最初にbarrier.wait()で待機し、すべてのスレッドが到着すると同時に実行を再開します。

Barrierオブジェクトは、特定の数のスレッドが同時に実行することが必要な場合に役立ちます。たとえば、データを収集する複数のスレッドがあり、すべてのスレッドがデータを収集し終えた後に、データを処理するスレッドが必要な場合に使用することができます。Barrierオブジェクトを使用することで、データ収集スレッドがすべてのデータを処理する前に、データ処理スレッドが処理を開始することを防止できます。

マルチプロセス

マルチプロセスは、複数のプロセスを同時に実行することで、複数のタスクを効率的に処理することができる並列処理の一つです。Pythonでは、multiprocessingモジュールを使用して、マルチプロセスを実現することができます。

multiprocessingモジュールは、スレッドと同じように、複数のタスクを並列に実行するための抽象化されたインタフェースを提供します。しかし、スレッドとは異なり、プロセスはそれぞれ独自のメモリ空間を持ちます。これにより、プロセス間でのメモリ共有が困難になるため、プロセス間通信(IPC)の機構が必要になります。

以下は、multiprocessingモジュールを使用した簡単な例です。

import multiprocessing
def worker(num):
"""各プロセスによって実行される関数"""
print(f"Worker {num} is starting...")
return
if __name__ == '__main__':
processes = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()

この例では、5つのプロセスが作成されます。各プロセスは、worker()関数を実行します。Processオブジェクトを作成する際に、target引数に実行する関数、args引数に関数に渡す引数を指定します。start()メソッドでプロセスを開始し、join()メソッドで各プロセスが終了するまで待機します。

マルチプロセスは、CPUバウンドなタスクを並列に処理する場合に効果的です。また、メモリを共有しないため、スレッドよりも安全であると言われています。ただし、プロセスの作成やプロセス間通信にはオーバーヘッドが発生するため、I/Oバウンドなタスクの場合はスレッドの方が効率的です。

ワーカープロセスのプールで非同期

multiprocessingモジュールには、ワーカープロセスのプールを管理するためのPoolクラスがあります。Poolクラスを使用すると、複数のワーカープロセスを起動し、それらにジョブを割り当てることができます。各ジョブは、ワーカープロセスのいずれかによって非同期に実行されます。

以下は、Poolクラスを使用してワーカープロセスのプールを作成し、非同期でジョブを実行する簡単な例です。

import multiprocessing
import time
def worker(num):
"""各プロセスによって実行される関数"""
print(f"Worker {num} is starting...")
time.sleep(2)
print(f"Worker {num} is done.")
return
if __name__ == '__main__':
with multiprocessing.Pool(processes=2) as pool:
jobs = [pool.apply_async(worker, args=(i,)) for i in range(4)]
for job in jobs:
job.get()

この例では、2つのワーカープロセスを持つプールが作成されます。apply_async()メソッドを使用して、4つのジョブがプールに追加されます。apply_async()メソッドは、非同期にジョブを実行し、AsyncResultオブジェクトを返します。AsyncResultオブジェクトのget()メソッドを呼び出すことで、ジョブが完了するまで待機します。

Poolクラスには、map()メソッドやimap()メソッドなど、他にも便利なメソッドがあります。map()メソッドを使用すると、複数のジョブを一度にプールに追加し、返された結果をリストとしてまとめることができます。imap()メソッドは、map()メソッドと同様にジョブを一度に追加しますが、返された結果をジェネレーターとして返すため、必要に応じて結果を処理することができます。

ワーカープロセスのプールでブロック

multiprocessingモジュールのPoolクラスを使用して、ワーカープロセスのプールを作成し、複数のジョブを並列実行することができます。しかし、ジョブが完了するまで待機する場合、デッドロックに陥る可能性があります。これは、ワーカープロセスが完了する前に、プール内のすべてのプロセスがジョブを実行し終えてしまったためです。この状態を回避するために、apply_async()メソッドの代わりにapply()メソッドを使用することができます。

apply()メソッドは、apply_async()メソッドと同様に、プールにジョブを追加しますが、ジョブが完了するまでブロックします。apply()メソッドを使用する場合、プール内のワーカープロセスがジョブを完了する前に新しいジョブを追加することができません。

以下は、apply()メソッドを使用して、ワーカープロセスのプールを作成し、複数のジョブを実行する例です。

import multiprocessing
import time
def worker(num):
"""各プロセスによって実行される関数"""
print(f"Worker {num} is starting...")
time.sleep(2)
print(f"Worker {num} is done.")
return
if __name__ == '__main__':
with multiprocessing.Pool(processes=2) as pool:
results = [pool.apply(worker, args=(i,)) for i in range(4)]
print(results)

この例では、2つのワーカープロセスを持つプールが作成されます。apply()メソッドを使用して、4つのジョブがプールに追加され、各ジョブが完了するまでブロックされます。apply()メソッドは、返された結果をリストとして返します。この例では、ジョブの結果を含むリストがresults変数に割り当てられます。

ワーカープロセスのプールでマップ

multiprocessingモジュールのPoolクラスを使用して、ワーカープロセスのプールを作成し、map()メソッドを使用して、シーケンス内の各要素に対して関数を適用することができます。この方法を使用すると、ジョブが完了するまで待機する必要がなく、複数のジョブを同時に実行できます。

map()メソッドは、関数と入力シーケンスを引数として受け取ります。map()メソッドは、入力シーケンスの各要素に対して関数を適用し、それらの結果をリストとして返します。

以下は、map()メソッドを使用して、ワーカープロセスのプールを作成し、複数のジョブを実行する例です。

import multiprocessing
import time
def worker(num):
"""各プロセスによって実行される関数"""
print(f"Worker {num} is starting...")
time.sleep(2)
print(f"Worker {num} is done.")
return num * 2
if __name__ == '__main__':
with multiprocessing.Pool(processes=2) as pool:
results = pool.map(worker, range(4))
print(results)

multiprocessingモジュールのPoolクラスを使用して、ワーカープロセスのプールを作成し、map()メソッドを使用して、シーケンス内の各要素に対して関数を適用することができます。この方法を使用すると、ジョブが完了するまで待機する必要がなく、複数のジョブを同時に実行できます。

map()メソッドは、関数と入力シーケンスを引数として受け取ります。map()メソッドは、入力シーケンスの各要素に対して関数を適用し、それらの結果をリストとして返します。

以下は、map()メソッドを使用して、ワーカープロセスのプールを作成し、複数のジョブを実行する例です。

import multiprocessing
import time
def worker(num):
"""各プロセスによって実行される関数"""
print(f"Worker {num} is starting...")
time.sleep(2)
print(f"Worker {num} is done.")
return num * 2
if __name__ == '__main__':
with multiprocessing.Pool(processes=2) as pool:
results = pool.map(worker, range(4))
print(results)

この例では、2つのワーカープロセスを持つプールが作成されます。map()メソッドを使用して、関数worker()がシーケンスrange(4)内の各要素に対して適用され、各ジョブが完了するまで待機しません。map()メソッドは、返された結果をリストとして返します。この例では、関数worker()の結果を含むリストがresults変数に割り当てられます。

map()メソッドは、各要素の処理が完了するまでブロックするため、大規模な入力シーケンスを使用する場合には注意が必要です。その場合、imap()メソッドを使用することができます。imap()メソッドは、map()メソッドと同様に、シーケンス内の各要素に対して関数を適用しますが、map()メソッドとは異なり、各要素の処理が完了するのを待たずに、イテレータを返します。これにより、シーケンスが大きく、メモリ使用量を抑えたい場合に有用です。

プロセス間通信に関して

プロセス間通信(Inter-Process Communication, IPC)とは、複数のプロセス間でデータや情報をやりとりするための手段です。複数のプロセスが同時に動作する場合、それぞれのプロセスは独立してメモリ空間を持つため、データを共有することができません。そのため、IPCを使ってプロセス間でデータをやりとりする必要があります。

Pythonでは、標準ライブラリのmultiprocessingモジュールを使って、プロセス間通信を実現することができます。以下に、multiprocessingモジュールでよく使われるプロセス間通信の手法について簡単に説明します。

  • パイプ(Pipe):2つのプロセス間で双方向の通信ができる方法です。親プロセスと子プロセス間でデータの受け渡しができます。
  • キュー(Queue):複数のプロセスで利用できるFIFO(First In First Out)のキューです。put()メソッドで要素を追加し、get()メソッドで要素を取得できます。
  • 共有メモリ(Shared Memory):複数のプロセスで共有できるメモリ領域です。ValueArrayといったデータ型を使ってアクセスできます。
  • マネージャー(Manager):複数のプロセスで共有できるPythonオブジェクトを提供します。ValueArrayといったデータ型の他、listdictなどのPython標準のデータ型を使ってアクセスできます。

これらの手法を使うことで、複数のプロセス間でデータを共有したり、同期したりすることができます。ただし、プロセス間通信を使う場合、データの受け渡しなどのオーバーヘッドがあるため、処理速度が遅くなる場合があります。また、プロセス間通信を実現するためのコードが複雑になることがあるため、注意が必要です。

パイプ

パイプ (Pipe) は、2つのプロセス間で双方向通信をするための方法の1つです。パイプは、親プロセスと子プロセスの2つのプロセスの間でデータの受け渡しを行うために使用されます。

パイプは、Unix系のオペレーティングシステムにおいて標準的に提供されているシステムコールで、Pythonのmultiprocessingモジュールでも利用することができます。

multiprocessingモジュールのPipe()関数を使用することで、双方向通信を行うパイプを作成することができます。この関数は、2つの接続済みソケットオブジェクトを返します。1つは読み込み専用で、もう1つは書き込み専用です。2つのプロセスがそれぞれ片方のソケットを持っているため、データを双方向に送信することができます。

以下は、パイプを使用して親プロセスと子プロセス間でデータを送受信する例です。

import multiprocessing
def child(conn):
data = conn.recv()
print('Child received:', data)
conn.send(data + ' - processed by child')
if __name__ == '__main__':
parent_conn, child_conn = multiprocessing.Pipe()
p = multiprocessing.Process(target=child, args=(child_conn,))
p.start()
parent_conn.send('Hello from parent')
response = parent_conn.recv()
print('Parent received:', response)
p.join()

この例では、親プロセスが子プロセスを生成し、パイプを介して文字列を送信します。子プロセスは、受信した文字列を加工して返信します。親プロセスは、子プロセスからの返信を受信して、その内容を表示します。

プロセス間での共有メモリ

プロセス間でデータを共有する方法の1つに、共有メモリを使用する方法があります。共有メモリは、複数のプロセスが同じメモリ領域を参照することができる仕組みです。これにより、プロセス間でデータを共有することができます。

Pythonでは、multiprocessingモジュールのValueやArrayクラスを使用して、共有メモリを実現することができます。Valueクラスは、1つの値を保持する共有メモリを表し、Arrayクラスは、複数の値を保持する共有メモリを表します。

以下は、Valueクラスを使用して、2つのプロセス間で共有変数を操作する例です。

import multiprocessing

def increment(value):
value.value += 1
print(value.value)
if __name__ == '__main__':
value = multiprocessing.Value('i', 0)
p1 = multiprocessing.Process(target=increment, args=(value,))
p2 = multiprocessing.Process(target=increment, args=(value,))
p1.start()
p2.start()

p1.join()
p2.join()

この例では、共有変数valueをValueクラスで生成し、2つのプロセスでincrement関数を呼び出して、valueの値を1ずつ増やしています。Valueクラスは、共有変数の型を指定する必要があります。ここでは、iを指定しています。iは整数型を表します。

共有メモリは、プロセス間で同期が必要な場合があります。例えば、2つのプロセスが同じ共有メモリを参照している場合、どちらかのプロセスがデータを更新していると、もう一方のプロセスが古いデータを参照してしまう可能性があります。このような問題を回避するために、Lockクラスなどを使用して、共有メモリへのアクセスを同期する必要があります。

マネージャー

Pythonのmultiprocessingモジュールでは、マルチプロセス間でデータを共有するためにManagerクラスが提供されています。このクラスを使用することで、複数のプロセスが同じPythonオブジェクトにアクセスできるようになります。

Managerクラスを使用する場合、共有するデータはValueArrayQueuedictlistNamespaceなど、いくつかの形式で作成することができます。これらのオブジェクトは、プロセス間で自動的に同期され、更新されます。

例えば、以下はManagerクラスを使用して、異なるプロセス間で共有されるリストを作成する方法です。

import multiprocessing

def worker(id, lst):
lst.append(id)
if __name__ == '__main__':
with multiprocessing.Manager() as manager:
lst = manager.list()
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i, lst))
jobs.append(p)
p.start()
for job in jobs:
job.join()

print(list(lst))

この例では、5つのプロセスを作成し、各プロセスはlstに自分のIDを追加します。lstManagerクラスを使用して作成された共有リストであるため、それぞれのプロセスがリストに対して変更を行っても、全てのプロセスがリストの最新状態を参照できるようになっています。最後に、lstの内容を出力することで、各プロセスが正しくリストに追加されたことを確認することができます。

Managerクラスは便利ですが、プロセス間通信にはコストがかかるため、使用には注意が必要です。また、共有されたデータが同期されるため、ロックの取得や解放の処理を行わなければならない場合があります。

別のマシンで走るプロセス間のネットワーク超しの共有

Pythonには、プロセス間通信を行うためのいくつかのモジュールが用意されています。そのうちの一つがmultiprocessingモジュールで、異なるマシン間でのプロセス間通信を行うための機能も提供しています。

multiprocessingモジュールを使用して異なるマシン間でのプロセス間通信を行う場合、以下のような手順を踏むことになります。

  • サーバープロセスを起動し、共有するオブジェクトを生成します。このとき、multiprocessing.Managerオブジェクトを使用すると便利です。
  • クライアントプロセスを起動し、サーバープロセスに接続します。
  • クライアントプロセスは、サーバープロセスが公開している共有オブジェクトにアクセスすることができます。アクセスする際には、通常のPythonオブジェクトのように操作することができます。

例えば、multiprocessing.Managerオブジェクトを使用して、異なるマシン間での共有データを管理する方法を示します。

from multiprocessing import Manager, Process
import time
def worker(d):
while True:
print(d)
time.sleep(1)
if __name__ == '__main__':
manager = Manager()
d = manager.dict()
p = Process(target=worker, args=(d,))
p.start()
while True:
d['time'] = time.time()
time.sleep(1)

この例では、multiprocessing.Managerオブジェクトを使用して、異なるマシン間で共有する辞書を作成しています。サーバー側のプロセスは辞書を定期的に表示するworker()関数を実行し、クライアント側のプロセスは定期的に辞書に現在の時刻を追加します。

このプログラムを実行するには、サーバーとクライアントのマシンでそれぞれプログラムを実行し、サーバー側のIPアドレスを指定して接続します。例えば、サーバー側のマシンで以下のように実行します。

python server.py

クライアント側のマシンで以下のように実行します。

python client.py <server-ip>

<server-ip>には、サーバー側のマシンのIPアドレスを指定します。これにより、クライアントプロセスはサーバープロセスに接続し、共有オブジェクトにアクセスすることができます。

高水準のインターフェース

高水準のインターフェースとは、プログラミングにおいて、低レベルな操作を隠蔽し、より簡単に操作できるインターフェースを提供する手法のことです。低レベルな操作とは、CPUやメモリなどのハードウェアに直接アクセスする操作や、ネットワーク通信におけるパケットの構築や解析などの操作を指します。

高水準のインターフェースを提供することにより、プログラマは実装の詳細について考える必要がなくなり、より簡単かつ効率的にプログラミングすることができます。また、高水準のインターフェースは、プログラマが理解しやすいドキュメントやチュートリアルが提供されていることが多く、学習コストも低くなります。

例えば、Pythonの標準ライブラリには、ファイル操作に関する高水準なインターフェースであるopen関数があります。この関数を使用することで、ファイルを読み書きするための詳細な操作を意識することなく、簡単にファイルの読み書きを行うことができます。

また、PythonのWebフレームワークであるDjangoは、高水準なインターフェースを提供することで、Webアプリケーションの開発を容易にします。Djangoを使用することで、HTTPリクエストやレスポンスの処理、データベースアクセス、認証などの機能を簡単に実装することができます。


というわけで、今回は以上です。大変大変お疲れ様でした。
引き続きで、徐々に発信していきます。

コメントや感想を受け付けています。ちょっとした感想でもいいので嬉しいです。

それでは、以上です。

最新情報をチェックしよう!