複数の接続に対応するには?

サーバー側の現在の実装では、1つのクライアントとの接続しか処理できないことがわかりました。 ではこのような問題に対応するにはどうしたらいいでしょうか。

複数のクライアントに対応する方法

1つのサーバーで複数のクライアントからの接続を同時に処理するためには、以下の方法があります。

  • プロセスを使う: 各クライアントの接続を別々のプロセスで処理します。これにより、各プロセスが独立して動作し、同時に複数のクライアントを処理できます。

  • スレッドを使う: 各クライアントの接続を別々のスレッドで処理します。これにより、同時に複数のクライアントからのリクエストを処理できます。

  • 非同期I/Oを使う: Pythonのasyncioモジュールを使って、非同期にクライアントの接続を処理します。これにより、I/O待ちの間も他のクライアントの処理が可能になります。

プロセスやスレッドは、OSにおけるプログラムの実行単位というレベルの話になります。

プロセスは、それぞれが独立した状態です。 そのため、プロセス間でデータの共有や受け渡しを行うためにはなんらかの準備が必要となります。 逆に言えばそれだけ独立性が高く、1つのプロセスがクラッシュしても他のプロセスには影響しにくい構造になっています。

スレッドは、同じプロセス内で小分けにされたような感じで考えてください。 そのためスレッド間のデータはプロセス内で共有された形になります。 処理はしやすくなる反面、共有しているデータに対するアクセス方法[1]に注意が必要です。

        sequenceDiagram
    participant Thread1 as スレッド1
    participant Thread2 as スレッド2
    participant Shared as 共有データ

    Thread1->>Shared: 書き込み(更新)
    Thread2->>Shared: 読み取り(同時アクセス)
    Note over Thread1,Thread2: レースコンディション発生
    

注釈

レースコンディション: 複数のスレッドが同時に共有データにアクセスし、予期しない結果を引き起こす状態を指します。

レースコンディションを防ぐためには、共有するデータに対して『今は書き込み禁止』『今は書き込み可能』といったフラグのようなものを用意する必要があります。 この手の技術としては、ミューテックス(Mutex)やセマフォ(Semaphore)などがあります。 言語によっては特定の操作の前後にロックをかけるといった方法も用意されることがあります[2]

非同期I/Oは比較的最近の技術で、Pythonではasyncioモジュールを使うことで実現できます。

マルチプロセス型のサーバー

ここでは比較的実装の簡単な、マルチプロセス型のサーバーを実装してみます。 マルチプロセス型のサーバーは、各クライアントの接続を別々のプロセスで処理するため、各プロセスが独立して動作します。

 1import socket
 2import multiprocessing  # マルチプロセス対応のため追加
 3
 4HOST = '127.0.0.1'  # ローカルホスト
 5PORT = 10000        # 任意の空いているポート番号
 6
 7def handle_client(conn, addr):
 8    print(f"[server] Connected by {addr}")
 9    # ソケットをファイルのように扱うため、makefile()でファイルオブジェクトを作成
10    # 'rw'は読み書き両用、encoding='utf-8'で送受信する文字列のエンコーディングを指定
11    # newline='\n'は、行末をLFに統一
12    with conn.makefile('rw', encoding='utf-8', newline='\n') as f:
13        while True:
14            # 5. f.readline()で一行受信 (read)
15            expr = f.readline().strip()
16            if not expr:
17                print("[server] Connection closed (EOF)")
18                break
19            if expr == "quit":
20                print("[server] Connection closed by client")
21                break
22            try:
23                result = str(eval(expr, {"__builtins__": {}}))
24            except Exception as e:
25                result = f"Error: {e}"
26            # 6. f.write()で一行送信 (write)
27            f.write(result + '\n')
28            # バッファをフラッシュして、データを即座に送信
29            f.flush()
30
31def run_server():
32    # 1. ソケットの作成
33    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
34    # サーバーを閉じた後、すぐに同じポートで再起動できるように設定
35    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
36    try:
37        # 2. bind()で、作成したソケットにIPアドレスとポート番号を割り当てる
38        sock.bind((HOST, PORT))
39        # 3. listen()で、クライアントからの接続を待ち受ける状態に移行
40        sock.listen()
41        print(f"[server] Listening on {HOST}:{PORT}")
42        while True:
43            # 4. accept()で、クライアントからの接続要求を受け入れ、
44            #    新しいソケット(conn)とクライアントのアドレス(addr)を取得
45            conn, addr = sock.accept()
46            # マルチプロセスでクライアントを処理
47            process = multiprocessing.Process(target=handle_client, args=(conn, addr))
48            process.start()
49            conn.close() # 親プロセス側では閉じておく
50    finally:
51        print("[server] Shutting down")
52        sock.close()
53
54if __name__ == '__main__':
55    run_server()

少しわかりにくいですね、変更点を出すとこうなります。

 1--- /home/runner/work/2025-network-doc/2025-network-doc/source/transport/tcp-programming/source/calc_server.py
 2+++ /home/runner/work/2025-network-doc/2025-network-doc/source/transport/tcp-programming/source/calc_server_multiprocess.py
 3@@ -1,5 +1,5 @@
 4 import socket
 5-import ast # 現在は不要でした(削除可能)
 6+import multiprocessing  # マルチプロセス対応のため追加
 7 
 8 HOST = '127.0.0.1'  # ローカルホスト
 9 PORT = 10000        # 任意の空いているポート番号
10@@ -43,8 +43,10 @@
11             # 4. accept()で、クライアントからの接続要求を受け入れ、
12             #    新しいソケット(conn)とクライアントのアドレス(addr)を取得
13             conn, addr = sock.accept()
14-            with conn:
15-                handle_client(conn, addr)
16+            # マルチプロセスでクライアントを処理
17+            process = multiprocessing.Process(target=handle_client, args=(conn, addr))
18+            process.start()
19+            conn.close() # 親プロセス側では閉じておく
20     finally:
21         print("[server] Shutting down")
22         sock.close()

見ての通りで、接続状態を保持する connを利用して、multiprocessing.Processを使って新しい子プロセスを生成しています。

現代OSの基礎概念のひとつで、プロセスからプロセスを生成する際の考え方があります。 自身(実行中のプロセス自身)のコピーをコピーするもので、UNIX(Linux)界隈ではforkと呼ばれています。

        flowchart TD
    subgraph 親プロセス
        P[カーネル空間]
        A[ユーザープロセス]
    end
    P -- fork要求 --> K[カーネル]
    K -- プロセス複製 --> C[子プロセス]
    C -.->|独立して実行| A
    style K fill:#f9f,stroke:#333,stroke-width:2px
    style C fill:#bbf,stroke:#333,stroke-width:2px
    style A fill:#bfb,stroke:#333,stroke-width:2px
    

注釈

Windowsではforkは存在しません、代わりにCreateProcessというAPIが存在します。 これは、子プロセスを生成することに変わりませんが、親プロセスの状態をコピーするわけではありません。 実際に起動させたいプログラムを指定して起動させます。

forkでは親プロセスの状態をコピーして動くため、『開いているファイル』や『開いているソケット』などもコピーを持つことになります。 そこで、コピー元(親プロセス)側は即座にソケット(接続)を閉じておくことにしましょう。これで『なぜか二股になっていた』状態が解消します。 そしてwhileのループの冒頭に戻り、次の sock.accept() で次の接続を待つようになります。