【ステップアップ】「Pythonの実践」簡単速習‼【キューイングシステム/応用⑪】

こんにちはヤク学長です。
データサイエンティスト兼ファーマシストで、アルゴリズムやBI開発を行っています。

本記事の目的は、「pythonの基本操作を知る」ことを目的としています。

【ステップアップ】「Pythonの実践」簡単速習‼【データ解析/応用⑩】

【本記事のもくじ】

まず、「Python」に真剣に取り組むための概要を解説します。
下記の方法で、簡単に概要を抑えることができます。

  • 1.キューイングシステム

それでは、上から順番に見ていきます。
なお、本上記の方法を順番に抑えれば成果が出ます。

記事の内容は「転載 & 引用OK」問題ありません。

1.キューイングシステム

キューイングシステムとは

キューイングシステムとは、複数のタスクがある場合に、それらのタスクを一時的に保存し、順番に実行するシステムです。キューは、データ構造の一種であり、先入れ先出し(FIFO)のルールに従って、最初に追加されたものが最初に処理されるようになっています。

キューイングシステムは、主に次のような場面で使用されます。

  • コンピュータネットワークにおけるパケット転送や、プロセスの実行を管理するためのスケジューリングシステム。
  • サーバーやレストランなどの待ち行列管理システム。
  • ログ処理やデータ処理のためのバッチ処理システム。

キューイングシステムには、多数のアルゴリズムがあります。例えば、最も単純なFIFO方式の他、優先度付きキュー、ラウンドロビン、最短ジョブファーストなどがあります。

ZeroMQのPubとSub

ZeroMQは、高速でスケーラブルなメッセージングおよび通信ライブラリです。略してZMQとも呼ばれ、マルチプロセス間の通信や、分散システムの開発に利用されることがあります。

ZMQは、多くの言語で利用可能なAPIを提供していますが、Pythonにおいてはpyzmqというライブラリが提供されています。pyzmqを使用することで、PythonでZMQを利用することができます。

ZMQは、様々なパターンの通信方式をサポートしています。たとえば、pub-subパターンやreq-repパターンなどです。これらのパターンを使用することで、簡単かつ柔軟な通信システムを実現することができます。

以下は、Pythonでpub-subパターンを使用して、簡単なZMQプログラムを実行する例です。

import zmq
# パブリッシャー
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")
while True:
message = "A".encode('utf-8')
socket.send(message)
# サブスクライバー
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, b'')
while True:
message = socket.recv()
print(message)

上記の例では、pub-subパターンを使用しています。パブリッシャーは、”tcp://*:5556″のアドレスでバインドされ、ループ内で”A”というメッセージを送信しています。サブスクライバーは、”tcp://localhost:5556″のアドレスに接続し、空のフィルターで受信待ち状態になっています。パブリッシャーがメッセージを送信すると、サブスクライバーはそのメッセージを受信し、標準出力に表示することで、受信したことを確認しています。

ZeroMQのPushとPull

ZeroMQは、異なるプログラミング言語で書かれたアプリケーション間でメッセージを送受信するための高速なプロトコルを提供するライブラリです。ZeroMQには、多数の通信パターンがあり、その中には、”PUSH-PULL” パターンがあります。

PUSH-PULLパターンでは、複数の “Pusher” と “Puller” と呼ばれるエンティティがあります。 Pusherはメッセージをキューに入れ、Pullerはキューからメッセージを取得します。 Pusherは1つ以上のPullerにメッセージを送信できます。複数のPusherがある場合、Pullerはそれらからメッセージを受信します。つまり、Pusherは複数のコンシューマにメッセージを配信し、Pullerはキューから順番にメッセージを取得することができます。

PythonでZeroMQを使うには、pyzmqと呼ばれるライブラリをインストールする必要があります。以下は、PythonでPUSH-PULLパターンを実現する例です。

import zmq
import time
import threading
context = zmq.Context()
def pusher():
socket = context.socket(zmq.PUSH)
socket.bind("tcp://*:5557")
for i in range(5):
message = "Pusher message #%s" % i
socket.send_string(message)
print(f"Sent: {message}")
time.sleep(1)
socket.close()
def puller():
socket = context.socket(zmq.PULL)
socket.connect("tcp://localhost:5557")
while True:
message = socket.recv_string()
print(f"Received: {message}")
pusher_thread = threading.Thread(target=pusher)
puller_thread = threading.Thread(target=puller)
pusher_thread.start()
puller_thread.start()

この例では、Pusherが5つのメッセージを生成し、Pullerがそれらを取得します。各メッセージは1秒間隔で送信され、Pullerは受信するたびにメッセージを印刷します。

WindowsにRabbitMQをインストールする

Pythonでキューイングシステムを使用するためには、まずキューイングシステムをインストールする必要があります。代表的なキューイングシステムとしては、RabbitMQ、Apache Kafka、AWS SQS、Google Cloud Pub/Subなどがあります。

ここでは、RabbitMQを使用したキューイングシステムの具体例を紹介します。

まず、RabbitMQをインストールします。インストール方法は、オフィシャルサイトのドキュメントを参照してください。

https://www.rabbitmq.com/download.html

次に、pikaというPythonライブラリをインストールします。pikaは、RabbitMQのクライアントライブラリで、PythonからRabbitMQを操作するために使用されます。インストールには、pipコマンドを使用します。

pip install pika

pikaを使用して、RabbitMQに接続し、メッセージを送信する例を示します。

import pika
# RabbitMQに接続する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
# メッセージを送信する
channel.basic_publish(exchange='', routing_key='hello', body='Hello World!')
print(" [x] Sent 'Hello World!'")
# 接続を閉じる
connection.close()

上記のコードでは、まずRabbitMQに接続しています。次に、キューを作成しています。そして、メッセージを送信しています。

次に、pikaを使用して、RabbitMQからメッセージを受信する例を示します。

import pika
# RabbitMQに接続する
connection = pika.BlockingConnection(pika.ConnectionParameters(host='localhost'))
channel = connection.channel()
# キューを作成する
channel.queue_declare(queue='hello')
# メッセージを受信するためのコールバック関数
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
# メッセージを受信する
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel.start_consuming()

上記のコードでは、まずRabbitMQに接続しています。次に、キューを作成しています。そして、メッセージを受信するためのコールバック関数を定義しています。最後に、メッセージを受信する処理を開始しています。

celeryとは

Pythonでキューイングシステムを使用するためには、いくつかのライブラリやフレームワークがあります。代表的なものとしては、CeleryやRQ(Redis Queue)、huey、mrqなどがあります。

以下は、Celeryを使用したキューイングシステムの例です。Celeryは、分散タスクキューとして動作し、複数のワーカーを使用してタスクを実行します。

from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y

上記のコードでは、Celeryオブジェクトを作成し、タスクをデコレートすることで、Celeryのタスクとして実行できます。タスクは、add関数のように、引数を受け取り、結果を返すように定義されます。brokerパラメータは、Celeryが使用するメッセージブローカーを指定します。この例では、pyamqpを使用していますが、Redisなどの他のメッセージブローカーを使用することもできます。

タスクを実行するには、Celeryのコマンドラインツールを使用します。以下は、addタスクを呼び出す例です。

$ celery -A tasks worker --loglevel=info
$ python
>>> from tasks import add
>>> result = add.delay(4, 4)
>>> result.wait()
8

上記のコードでは、celery workerコマンドを使用して、Celeryワーカーを起動し、addタスクを実行します。add.delay()メソッドを使用して、タスクを非同期に呼び出し、result.wait()メソッドを使用して、タスクの実行が完了するまで待機します。

Celeryは、非常に強力で拡張性があり、分散タスクキューとしての使用に加え、タスクスケジューリング、リトライ、結果の保存などの機能も提供しています。

celeryでタスクを非同期で実行する

Celeryは、Pythonで実装された分散タスクキューシステムで、非同期タスクキューとして使用されることがあります。Celeryは、タスクを非同期で実行できるため、長時間実行されるタスクやバッチ処理など、実行に時間がかかる処理を実行する際に役立ちます。

以下は、Celeryを使用して非同期タスクを実行する例です。まず、必要なライブラリをインストールします。

pip install celery

以下は、Celeryを使用して非同期でタスクを実行する例です。

from celery import Celery
app = Celery('tasks', broker='pyamqp://guest@localhost//')
@app.task
def add(x, y):
return x + y
result = add.delay(4, 4)
# 結果の取得
print(result.get())

この例では、Celeryを使用して add() 関数を非同期で呼び出しています。add() 関数は2つの引数を受け取り、それらを加算して結果を返します。 @app.task デコレータを使用して、 add() 関数をCeleryタスクとしてマークしています。

タスクを実行するには、 add() 関数を delay() メソッドで呼び出します。このメソッドは、タスクを実行するのに必要なすべての情報を含む AsyncResult オブジェクトを返します。

AsyncResult オブジェクトの get() メソッドを使用して、タスクの結果を取得できます。 get() メソッドは、タスクが完了するまでブロックされ、結果が返されます。

この例では、 add() 関数が非同期で呼び出され、実行が開始された後、 get() メソッドが使用されるまでブロックされます。


というわけで、今回は以上です。大変大変お疲れ様でした。
引き続きで、徐々に発信していきます。

コメントや感想を受け付けています。ちょっとした感想でもいいので嬉しいです。

それでは、以上です。

【ステップアップ】「Pythonの実践」簡単速習‼【非同期処理 asynico/応用⑫】

最新情報をチェックしよう!