7. 生波形の取得とプロット
ここまでは 集中度とリラックス度を取得していましたが、他のデータの取得も試してみましょう。ここからは少しプログラムが複雑になってきます。
アドレス一覧を再掲載します。
- /Attention:集中度
- /Meditation:リラックス度
- /EEG :生波形
- /Bandpower :δ, θ, α, β, γ
- /Acc:加速度
- /Gyro:ジャイロ
- /EulerAngle:デバイスの角度
ここでは、/EEGを取得してみましょう。GB Focus でポートを追加し、以下の写真の「ポート3」のとおりに設定してください。

ポートを追加したら、再度「接続」してください。
次に、プログラムの上雨の「受信するアドレス」に、/BandPower を追加します。
addresses = ["/Attention", "/Meditation", "/EEG"]

このまま実行すると、下記のようなエラーが出ます

よくよくみてみると、取得した生波形の計測値には、セミコロン区切りで複数の値があることがみて取れます。
経過時間: 0.00秒, アドレス: /EEG, 値: 5.8639884;-0.9840023;-7.6275864;-10.040013;-13.711001;-11.677492;-0.792469;7.9420476;15.070419;21.756023;19.217237;5.1986957;-6.3247585;-7.989966;-6.4195466;-2.4467392;-0.16022202;1.3484182;10.547297;13.079433;-2.6237447;-16.977041;-13.428343;1.5808055;15.033743;16.605637;-2.8496656;-33.38901;-38.707684;-11.341251;13.103121;15.915452;10.884375;9.852546;9.397429;9.347432;5.769152;-6.6483426;-12.356766;-0.24747655;16.953405;23.240318;15.524784;-0.68166816;-12.595848;-16.602974;-22.433119;-24.353012;-14.597003;3.710437
生波形のデータは、0.2秒に1回、50個のデータがまとめて送信されます(250 Hz)。したがって、このまとまったデータを数値データに変換してから、データを保存する必要があります。
このように、アドレスごとに送信されるデータの特徴は少しずつ異なっています。各アドレスに対応した処理を書いていきましょう。
まずは、handler関数の最後の部分を変更し、条件分岐を作成します。
# アドレスごとの処理
if address in ["/Attention", "/Meditation"]:
# データを保存
with data_lock: # データ競合を防ぐためにロックを使用
data_storage[address]["time"].append(seconds_passed) # 経過時間
data_storage[address]["value"].append(value) # 計測値
elif address == "/EEG":
pass

pass は「何もしない」という意味です。
次に、/EEG の場合の処理を書いていきます。
# EEGデータをリストに変換
eeg_values = [float(v) for v in value.split(";")]
# EEG用の時間データを作成(50サンプル/0.2秒 x 波形の長さ)
eeg_time = [seconds_passed + i * 0.2 / 50 for i in range(len(eeg_values))]
with data_lock: # データ競合を防ぐためにロックを使用
data_storage[address]["time"].extend(eeg_time) # 時間データ
data_storage[address]["value"].extend(eeg_values) # 計測値

処理の流れは下記の通りです。
- EEGの文字列データを数値データ(のリスト)に変換
- タイムスタンプが1点しかないので、サンプリング間隔から計算して時間列を作成(※)
- データの保存(今回は append ではなく extend であることに注意)
※厳密には、データ送信の時間間隔は均一ではない(ちょうど0.2秒ではない)ので、本当は次のデータ送信時刻から逆算して時間の配列を作成する必要があります。処理が複雑になるため、今回は簡易的な方法で実装しています。
これでエラーは出なくなりますが、プロットはまだおかしいです。
課題
- EEGデータの量が多いため、短い時間しか表示されない。集中・リラックスが見えない。
- EEGデータが途切れている。

7.1. 課題1の修正
課題1を解決するために、data_storage で設定した maxlen を変更します。ただ、EEGと集中・リラックスで別の値を設定する必要があります。
まずは、address にサンプリング周波数の情報を持たせます。今後の各調整も考慮し、NamedTuple型の Address クラスを作っておきます。
from typing import NamedTuple
class AddressConfig(NamedTuple):
address: str
sampling_rate: int

addresses を変更し、ついでにプロットの表示時間を設定する変数を作成しておきます。
addresses = [
AddressConfig("/Attention", 1),
AddressConfig("/Meditation", 1),
AddressConfig("/EEG", 250)
]
plot_dur = 20 # プロットの表示時間(秒)

次に、data_storageで、maxlen = サンプリング周波数 x 表示時間 秒 に変更します。
# データ保存用
data_storage = {
conf.address: {
"time": deque(maxlen=conf.sampling_rate * plot_dur), # 時間データ
"value": deque(maxlen=conf.sampling_rate * plot_dur) # データ値
}
for conf in addresses
}

そして、以降でaddressを使用している3箇所を修正します。
…
# アドレスと関数を紐付ける
for conf in addresses:
disp.map(conf.address, handler)
…
# 各アドレスごとにラインを作成
lines = {
conf.address: ax.plot([], [], label=conf.address)[0]
for conf in addresses
}
…
with data_lock: # データ競合を防ぐためにロックを使用
for conf in addresses:
address = conf.address
# データが存在するかチェック



これで多少マシになりました。

7.2. 課題2の修正
次に、課題2(EEGが見切れている問題)を修正するため、サブプロットを増やしましょう。
まずは、プログラム上部の設定を変更します。
- AddressConfig に データカテゴリの情報を追加
- addresses でデータカテゴリを設定
- subplot_settings を追加
class AddressConfig(NamedTuple):
address: str
sampling_rate: int
category: str
…
# addresses = [
AddressConfig("/Attention", 1, "Mental State"),
AddressConfig("/Meditation", 1, "Mental State"),
AddressConfig("/EEG", 250, "EEG")
]
plot_dur = 20 # プロットの表示時間(秒)
subplot_settings = {
"Mental State": {
"ylim": (0, 100),
"xlabel": None,
"ylabel": "Mental State"
},
"EEG": {
"ylim": (-50, 50),
"xlabel": "Time (s)",
"ylabel": "Voltage (μV)"
}
}

プロットの初期設定も変更します。
nrows = len(subplot_settings)
fig, axes = plt.subplots(nrows, figsize=(10, 6), dpi=200, sharex=True)
axes_dict = {
title: ax
for title, ax in zip(subplot_settings.keys(), axes)
}
for title, ax in axes_dict.items():
ax.set_title(title) # タイトルを設定
ax.set_xlabel(subplot_settings[title]["xlabel"]) # X軸のラベルを設定
ax.set_ylabel(subplot_settings[title]["ylabel"]) # Y軸のラベルを設定
ax.set_ylim(subplot_settings[title]["ylim"]) # Y軸の範囲を設定
# 各アドレスごとにラインを作成
lines = {
conf.address: axes_dict[conf.category].plot([], [], label=conf.address)[0]
for conf in addresses
}
for ax in axes:
ax.legend() # 凡例を表示

さらに、update_plot() 関数内で一箇所修正を行います。
# X軸の範囲を更新
for ax in axes:
ax.set_xlim(min_time, max_time)

うまくプログラムが書けていれば、以下のように、プロットがいい感じになっているはずです!

7.3. ここまでのソースコード
import threading
from datetime import datetime
from collections import deque
from matplotlib import pyplot as plt
from pythonosc import dispatcher, osc_server
from typing import NamedTuple
class AddressConfig(NamedTuple):
address: str
sampling_rate: int
category: str
# 設定 ===================================
# 受信するポート番号
PORT = 9000
# 受信するアドレス
addresses = [
AddressConfig("/Attention", 1, "Mental State"),
AddressConfig("/Meditation", 1, "Mental State"),
AddressConfig("/EEG", 250, "EEG")
]
plot_dur = 20 # プロットの表示時間(秒)
subplot_settings = {
"Mental State": {
"ylim": (0, 100),
"xlabel": None,
"ylabel": "Mental State"
},
"EEG": {
"ylim": (-50, 50),
"xlabel": "Time (s)",
"ylabel": "Voltage (μV)"
}
}
# 処理 ===================================
data_lock = threading.Lock() # おまじない(OSCとプロットのスレッド間のデータ競合を防ぐ)
start_time = None # 最初のデータの時刻を記録
# データ保存用
data_storage = {
conf.address: {
"time": deque(maxlen=conf.sampling_rate * plot_dur), # 時間データ
"value": deque(maxlen=conf.sampling_rate * plot_dur) # データ値
}
for conf in addresses
}
# 受信したデータを処理する関数
def handler(address, *args):
# argsから計測値とタイムスタンプを取得
value, timestamp_str = args
# おまじない(グローバル変数`start_time`を使うことを宣言)
global start_time
# タイムスタンプを文字列からdatetime型に変換
timestamp = datetime.strptime(timestamp_str, "%Y-%m-%d %H:%M:%S.%f")
# 最初のデータの時刻を記録
if start_time is None:
start_time = timestamp
# 経過時間(秒)を計算
seconds_passed = (timestamp - start_time).total_seconds()
print(f"経過時間: {seconds_passed:.2f}秒, アドレス: {address}")
# アドレスごとの処理
if address in ["/Attention", "/Meditation"]:
# データを保存
with data_lock: # データ競合を防ぐためにロックを使用
data_storage[address]["time"].append(seconds_passed) # 経過時間
data_storage[address]["value"].append(value) # 計測値
elif address == "/EEG":
# EEGデータをリストに変換
eeg_values = [float(v) for v in value.split(";")]
# EEG用の時間データを作成(50サンプル/0.2秒 x 波形の長さ)
eeg_time = [seconds_passed + i * 0.2 / 50 for i in range(len(eeg_values))]
with data_lock: # データ競合を防ぐためにロックを使用
data_storage[address]["time"].extend(eeg_time) # 時間データ
data_storage[address]["value"].extend(eeg_values) # 計測値
# ディスパッチャ(アドレスと関数を紐付ける)を作成
disp = dispatcher.Dispatcher()
# アドレスと関数を紐付ける
for conf in addresses:
disp.map(conf.address, handler)
# プロット ----------
# インタラクティブモードを有効にする
plt.ion()
# 図の作成と設定
nrows = len(subplot_settings)
fig, axes = plt.subplots(nrows, figsize=(10, 6), dpi=200, sharex=True)
axes_dict = {
title: ax
for title, ax in zip(subplot_settings.keys(), axes)
}
for title, ax in axes_dict.items():
ax.set_title(title) # タイトルを設定
ax.set_xlabel(subplot_settings[title]["xlabel"]) # X軸のラベルを設定
ax.set_ylabel(subplot_settings[title]["ylabel"]) # Y軸のラベルを設定
ax.set_ylim(subplot_settings[title]["ylim"]) # Y軸の範囲を設定
# 各アドレスごとにラインを作成
lines = {
conf.address: axes_dict[conf.category].plot([], [], label=conf.address)[0]
for conf in addresses
}
for ax in axes:
ax.legend() # 凡例を表示
def update_plot():
""" プロットを更新する関数 """
min_time = 0 # 最小時間を初期化
max_time = 0 # 最大時間を初期化
with data_lock: # データ競合を防ぐためにロックを使用
for conf in addresses:
address = conf.address
# データが存在するかチェック
if data_storage[address]["time"] and data_storage[address]["value"]:
# ラインのデータを更新
lines[address].set_data(
data_storage[address]["time"],
data_storage[address]["value"]
)
# 最小・最大時間を更新
min_time = max(min_time, min(data_storage[address]["time"]))
max_time = max(max_time, max(data_storage[address]["time"]))
else:
# データが存在しない場合は空にする
lines[address].set_data([], [])
# X軸の範囲を更新
for ax in axes:
ax.set_xlim(min_time, max_time)
# 再描画
fig.canvas.draw()
fig.canvas.flush_events()
# OSCサーバー ----------
def start_server():
"""OSCサーバーを起動する関数"""
print(f"> OSC 受信 (UDP:{PORT}) … プロットウィンドウを閉じると終了")
server = osc_server.ThreadingOSCUDPServer(("", PORT), disp)
server.serve_forever()
# サーバーをバックグラウンドで開始
server_thread = threading.Thread(target=start_server, daemon=True)
server_thread.start()
# プロット表示とリアルタイム更新
plt.show()
try:
while plt.get_fignums(): # ウィンドウが開いている間
update_plot()
plt.pause(0.1) # 100ms待機
except KeyboardInterrupt:
print("\n終了")
8. まとめ
本記事では、Python と UDP計測を用いて、脳波のリアルタイム描画を行いました。UDP計測を利用すれば、フィードバックを伴う実験や、脳波を用いたライブ感のある創作活動を行うことができます。
興味のある方は、ぜひ一度ソリューションページをご覧ください。