thread/threadingモジュールを使ってエコーサーバーのテストを書く

webサーバー/クライアントを実装するとき、server/clientが正しく連携するかテストしたいかと思います。しかし、serverとclientをシングルスレッドで同時に扱うことはできないので、単純にunittestを使っただけでは完全な自動テストを書くことはできません。このような場合、serverとclientを別スレッドで走らせ、それぞれのスレッド同士のタイミングをthreading.Eventを使って制御する方法を取ることができます。エコーサーバーを使った例だと、次のようになります。

#!/usr/bin/env python2.6
# -*- coding: utf-8 -*-
import thread
import socket
import threading
HOST = ''                 # Symbolic name meaning the local host
PORT = 50009             # Arbitrary non-privileged port
class ThreadTest:
  def main(self):
    self.server_ready = threading.Event()#サーバーのlistenまで実行したことをset()で通知
    self.client_ready = threading.Event()#クライアントが生成されたことを通知するため
    self.done = threading.Event()#クライアント・スレッドの関数が実行し終えたことを通知する
    #クライアント・スレッド上でself.client_ready_to_recv()を実行する。
    thread.start_new_thread(self.client_ready_to_recv, ())
    self.server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    self.server.bind((HOST, PORT))
    self.server.listen(1)
    if not self.server_ready.is_set():
      #server側の準備ができたことを通知。このタイミングでself.client_ready_to_recvの
      #self.server_ready.wait()より下を実行しはじめる。
      self.server_ready.set() 
    #client側の準備を待つ
    self.client_ready.wait()
    conn, addr = self.server.accept()
    print 'Connected by', addr
    while 1:
      data = conn.recv(1024)
      if not data: break
      conn.send(data)
    conn.close()
    #mainスレッドがclient用のスレッドが終わる前に終了することを防止する
    self.done.wait()

  def client_ready_to_recv(self):
    try:
      #サーバー側の準備を待つ
      self.server_ready.wait()
      self.client_ready.set()
      self.client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
      self.client.connect((HOST, PORT))
      self.client.send('Hello, world')
      data = self.client.recv(1024)
      self.client.close()
      assert('Hello, world' ==  data)
      print 'Received', repr(data)
    finally:
      #mainスレッドに実行し終えたことを通知する
      self.done.set()
      #このスレッドを終了させる
      thread.exit()
if __name__ == '__main__':
  for i in range(10):
    ThreadTest().main()

このプログラムのシングルスレッド版のserver/clientのプログラムは公式リファレンスから持って来ました。このプログラムを実行すると、次のようになります。


Connected by ('127.0.0.1', 39469)
Received 'Hello, world'
Connected by ('127.0.0.1', 39470)
Received 'Hello, world'
Connected by ('127.0.0.1', 39471)
Received 'Hello, world'
Connected by ('127.0.0.1', 39472)
Received 'Hello, world'
Connected by ('127.0.0.1', 39473)
Received 'Hello, world'
Connected by ('127.0.0.1', 39474)
Received 'Hello, world'
Connected by ('127.0.0.1', 39475)
Received 'Hello, world'
Connected by ('127.0.0.1', 39476)
Received 'Hello, world'
Connected by ('127.0.0.1', 39477)
Received 'Hello, world'
Connected by ('127.0.0.1', 39478)
Received 'Hello, world'

今回は、threading.Eventでタイミングを合わせる処理を行いました。このthreading.Eventはthreading.Lock(cpythonではCで実装されてる)とthireading.RLock(C実装もあるが、threading.Lockを使ったPython実装もある)をうまく組み合わせて実装されています。

このプログラムをunittestと連携させた例はCPythonのLib/test/test_socket.pyで見つけることができます。