【中級者向け】Python と UDP計測(GB Focus) で脳波計測をリアルタイム描画する方法

6. 集中度・リラックス度をリアルタイムプロット

ここからは、matplotlibなどを用いて、データをリアルタイムでプロットしていきます。

6.1. モジュールのインポート

まずは、新たに4つのライブラリからモジュールをインポートします。

import threading
from datetime import datetime
from collections import deque
from matplotlib import pyplot as plt
モジュールをインポート(1行目)

それぞれ以下のような用途で使います

  • datetime: タイムスタンプの解析と経過時間の計算
  • threading: OSC サーバーをバックグラウンドで実行する
  • deque: 効率的なデータ保存用データ構造
  • pyplot: データのプロット

6.2. データ保存用変数の作成

次に、データ保存用の変数を、「受信するポート番号」の定義の下に追記します(addressesのところにエラーが出ますが気にしないでください)。

# データ保存用
data_storage = {
    address: {
        "time": deque(maxlen=100),  # 時間データ
        "value": deque(maxlen=100)   # データ値
    }
    for address in addresses
}

ここでは、アドレスごとに、時間とデータ値のdequeを格納する辞書を作成しています。maxlen=100とすることで、データが100個を超えると古いデータから削除されるようになります。

addressesは、今回追記した部分より後(27行目あたり)で定義しているので、今回追記した部分の手前に移動します。

これでエラーが消えました。

コードが長くなってきので、設定と処理がわかりやすいように、コメントを入れておきましょう

# 設定 ===================================
…
# 処理 ===================================

さらに、下記の2行を「処理」の下に追記します。

data_lock = threading.Lock() # おまじない(OSCとプロットのスレッド間のデータ競合を防ぐ)
start_time = None  # 最初のデータの時刻を記録

今、全体のコードは下記の通りとなっています。

import threading
from datetime import datetime
from collections import deque
from matplotlib import pyplot as plt
from pythonosc import dispatcher, osc_server

# 設定 ===================================

# 受信するポート番号
PORT = 9000

# 受信するアドレス
addresses = ["/Attention", "/Meditation"]


# 処理 ===================================

data_lock = threading.Lock() # おまじない(OSCとプロットのスレッド間のデータ競合を防ぐ)
start_time = None  # 最初のデータの時刻を記録

# データ保存用
data_storage = {
    address: {
        "time": deque(maxlen=100),  # 時間データ
        "value": deque(maxlen=100)   # データ値
    }
    for address in addresses
}

# 受信したデータを処理する関数
def handler(address, *args):
    print(f"{address} → {args}")

# ディスパッチャ(アドレスと関数を紐付ける)を作成
disp = dispatcher.Dispatcher()

# アドレスと関数を紐付ける
for address in addresses:
    disp.map(address, handler)

# サーバーを起動
print(f"> OSC 受信 (UDP:{PORT}) … Ctrl-C で終了")
server = osc_server.ThreadingOSCUDPServer(("", PORT), disp) # サーバーを作成
server.serve_forever() # サーバーを起動(Ctrl+Cで終了)

6.3. ハンドラーの拡張

次に、handler 関数を拡張していきます。

まずは、経過時間と計測値を取得できるようにしましょう。

handler関数を以下の通りに更新します。

# 受信したデータを処理する関数
def handler(address, *args):
    # argsから計測値とタイムスタンプを取得
    value, timestamp_str = args 

    # おまじない(グローバル変数`start_time`を使うことを宣言)
    global start_time 

    # タイムスタンプを文字列からdatetime型に変換
    timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")  

    # 最初のデータの時刻を記録
    if start_time is None:
        start_time = timestamp  

    # 経過時間(秒)を計算
    seconds_passed = (timestamp - start_time).total_seconds()

    print(f"経過時間: {seconds_passed:.2f}秒, アドレス: {address}, 値: {value}")

この状態で、コードをターミナルで実行してみると、以下のように、経過時間と計測値が出力されます。

これで、経過時間と計測値が取得できるようになりました!

あとは、この値を保存するコードを追記しましょう。

   # データを保存
    with data_lock:  # データ競合を防ぐためにロックを使用
        data_storage[address]["time"].append(seconds_passed)  # 経過時間
        data_storage[address]["value"].append(value)  # 計測値

6.4. プロット処理を追加

いよいよ、プロットを行います。

まずは、プロットの処理を書く場所をわかりやすくコメントで示しておきましょう。以下のコードを、「サーバーを起動」というコメントの上に追記します。

…
# プロット ----------

# OSCサーバー ----------
…

次に、プロットの初期設定をします。plt.ion() でインタラクティブモード(リアルタイムでプロットするモード)を起動し、figure の作成や軸ラベルなどの設定をします。

# インタラクティブモードを有効にする
plt.ion()  

# 図の作成と設定
fig, ax = plt.subplots(figsize=(10, 6), dpi=200)
ax.set_xlabel("Time (s)") # X軸のラベルを設定
ax.set_ylim(0, 100) # Y軸の範囲を設定

# 各アドレスごとにラインを作成
lines = {
    address: ax.plot([], [], label=address)[0]
    for address in addresses
}
ax.legend()  # 凡例を表示

次に、データを更新する関数を作成します。少し長いですが、下記の関数を追記してください。

def update_plot():
    """ プロットを更新する関数 """
    min_time = 0  # 最小時間を初期化
    max_time = 0  # 最大時間を初期化

    with data_lock:  # データ競合を防ぐためにロックを使用
        for address in addresses:
            # データが存在するかチェック
            if data_storage[address]["time"] and data_storage[address]["value"]:
                # ラインのデータを更新
                lines[address].set_data(
                    data_storage[address]["time"],
                    data_storage[address]["value"]
                )

                # 最小・最大時間を更新
                min_time = max(min_time, min(data_storage[address]["time"]))
                max_time = max(max_time, max(data_storage[address]["time"]))
            else:
                # データが存在しない場合は空にする
                lines[address].set_data([], []) 

    # X軸の範囲を更新
    ax.set_xlim(min_time, max_time)

    # 再描画
    fig.canvas.draw()
    fig.canvas.flush_events()

処理の流れは下記の通りです。

  1. 最小・最大時間を記憶する変数を用意
  2. データをもとにプロットの折れ線(ライン)を更新
  3. X軸の調整
  4. 図の再描画

最後に、この関数を使用してリアルタイムプロットできるようにしましょう。

まずは、OSCデータを取得するサーバーの処理を少し修正します(サーバーをバックグラウンドで動かすようにします)。

def start_server():
    """OSCサーバーを起動する関数"""
    print(f"> OSC 受信 (UDP:{PORT}) … プロットウィンドウを閉じると終了")
    server = osc_server.ThreadingOSCUDPServer(("", PORT), disp)
    server.serve_forever()

# サーバーをバックグラウンドで開始
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()

そして、この下にプロットを更新する処理を書きます。

# プロット表示とリアルタイム更新
plt.show()
try:
    while plt.get_fignums():  # ウィンドウが開いている間
        update_plot()
        plt.pause(0.1)  # 100ms待機
except KeyboardInterrupt:
    print("\n終了")

これでプログラムは完成です!(次項に、ここまでのコード全体を記載しています。)

うまく書けていれば、下記のようなプロットが表示されるはずです。

6.5. ここまでのソースコード

import threading
from datetime import datetime
from collections import deque
from matplotlib import pyplot as plt
from pythonosc import dispatcher, osc_server

# 設定 ===================================

# 受信するポート番号
PORT = 9000

# 受信するアドレス
addresses = ["/Attention", "/Meditation"]


# 処理 ===================================

data_lock = threading.Lock() # おまじない(OSCとプロットのスレッド間のデータ競合を防ぐ)
start_time = None  # 最初のデータの時刻を記録

# データ保存用
data_storage = {
    address: {
        "time": deque(maxlen=100),  # 時間データ
        "value": deque(maxlen=100)   # データ値
    }
    for address in addresses
}

# 受信したデータを処理する関数
def handler(address, *args):
    # argsから計測値とタイムスタンプを取得
    value, timestamp_str = args 

    # おまじない(グローバル変数`start_time`を使うことを宣言)
    global start_time 

    # タイムスタンプを文字列からdatetime型に変換
    timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")  

    # 最初のデータの時刻を記録
    if start_time is None:
        start_time = timestamp  

    # 経過時間(秒)を計算
    seconds_passed = (timestamp - start_time).total_seconds()

    print(f"経過時間: {seconds_passed:.2f}秒, アドレス: {address}, 値: {value}")

    # データを保存
    with data_lock:  # データ競合を防ぐためにロックを使用
        data_storage[address]["time"].append(seconds_passed)  # 経過時間
        data_storage[address]["value"].append(value)  # 計測値

# ディスパッチャ(アドレスと関数を紐付ける)を作成
disp = dispatcher.Dispatcher()

# アドレスと関数を紐付ける
for address in addresses:
    disp.map(address, handler)


# プロット ----------
# インタラクティブモードを有効にする
plt.ion()  

# 図の作成と設定
fig, ax = plt.subplots(figsize=(10, 6), dpi=200)
ax.set_xlabel("Time (s)") # X軸のラベルを設定
ax.set_ylim(0, 100) # Y軸の範囲を設定

# 各アドレスごとにラインを作成
lines = {
    address: ax.plot([], [], label=address)[0]
    for address in addresses
}
ax.legend()  # 凡例を表示

def update_plot():
    """ プロットを更新する関数 """
    min_time = 0  # 最小時間を初期化
    max_time = 0  # 最大時間を初期化

    with data_lock:  # データ競合を防ぐためにロックを使用
        for address in addresses:
            # データが存在するかチェック
            if data_storage[address]["time"] and data_storage[address]["value"]:
                # ラインのデータを更新
                lines[address].set_data(
                    data_storage[address]["time"],
                    data_storage[address]["value"]
                )

                # 最小・最大時間を更新
                min_time = max(min_time, min(data_storage[address]["time"]))
                max_time = max(max_time, max(data_storage[address]["time"]))
            else:
                # データが存在しない場合は空にする
                lines[address].set_data([], []) 

    # X軸の範囲を更新
    ax.set_xlim(min_time, max_time)

    # 再描画
    fig.canvas.draw()
    fig.canvas.flush_events()

# OSCサーバー ----------

def start_server():
    """OSCサーバーを起動する関数"""
    print(f"> OSC 受信 (UDP:{PORT}) … プロットウィンドウを閉じると終了")
    server = osc_server.ThreadingOSCUDPServer(("", PORT), disp)
    server.serve_forever()

# サーバーをバックグラウンドで開始
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()

# プロット表示とリアルタイム更新
plt.show()
try:
    while plt.get_fignums():  # ウィンドウが開いている間
        update_plot()
        plt.pause(0.1)  # 100ms待機
except KeyboardInterrupt:
    print("\n終了")

次のページでは、生波形をリアルタイム描画します。