グループスレッド化ユーティリティー#
- group ov_dev_api_threading
非同期操作用のタスクのエグゼキューターを提供するスレッド化 API。
Typedef
- using Task = std::function<void()>#
OpenVINO タスク・エグゼキューターは、パラメーターなしでコピー可能な呼び出し可能オブジェクトをタスクとして出力できます。これは std::function オブジェクトにラップされます。
- class ImmediateExecutor : public ov::threading::ITaskExecutor#
- #include <immediate_executor.hpp>
run() メソッドの呼び出し中に現在のスレッドでタスクを実行するタスク・エグゼキューターの実装。
パブリックタイプ
- using Ptr = std::shared_ptr<ImmediateExecutor>#
ImmediateExecutor オブジェクトへの共有ポインター。
- using Ptr = std::shared_ptr<ImmediateExecutor>#
- class CPUStreamsExecutor : public ov::threading::IStreamsExecutor#
- #include <cpu_streams_executor.hpp>
CPU ストリーム・エグゼキューターの実装。実行プログラムは CPU をスレッドのグループに分割し、コアまたは NUMA ノードに固定することができます。カスタムスレッドを使用して、単一のキューからタスクを取得します。
パブリックタイプ
- using Ptr = std::shared_ptr<CPUStreamsExecutor>#
CPUStreamsExecutor オブジェクトへの共有ポインター。
パブリック関数
- explicit CPUStreamsExecutor(const Config &config)#
コンストラクター
- パラメーター:
config – ストリーム・エグゼキューターのパラメーター
- ~CPUStreamsExecutor() override#
クラス・デストラクター
- virtual void execute(Task task) override#
ストリーム・エグゼキューターの構成と制約に従って、現在のスレッドでタスクを実行します。
- パラメーター:
task – 開始するタスク
- virtual int get_stream_id() override#
現在のストリームのインデックスを返します。
- 戻り値:
現在のストリームのインデックス。ストリームスレッドから呼び出されなかった場合は例外をスローします
- virtual int get_numa_node_id() override#
現在の NUMA ノード の ID を返します。現在のストリームがいくつかの NUMA ノードにまたがる場合は 0 を返します。
- 戻り値:
現在の NUMA ノード の
ID
、またはストリームスレッドから呼び出されなかった場合は例外をスローします
- virtual int get_socket_id() override#
現在のソケットの ID を返します。現在のストリームがいくつかのソケットにまたがる場合は 0 を返します。
- 戻り値:
現在のソケットの
ID
、またはストリームスレッドから呼び出されなかった場合は例外をスローします
- using Ptr = std::shared_ptr<CPUStreamsExecutor>#
- interface ExecutorManager#
- #include <executor_manager.hpp>
タスク実行マネージャーのインターフェイス。これは、文字列 ID によってタスク・エグゼキューター・オブジェクトを取得するグローバルポイントです。複数の非同期要求によるオーバーサブスクリプションを回避するには、一意のエグゼキューターが必要です。例えば、CPU デバイスに 2 つのタスク・エグゼキューターがあるとします (1 つは FPGA にあり、もう 1 つは OneDNN)。両方を並列実行すると、CPU 使用率は最適ではありません。単一のエグゼキューターを介して対応するタスクを 1 つずつ実行すると、より効率的です。
パブリック関数
- virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0#
一意な識別子によってエグゼキューターを返します。
- パラメーター:
id – デバイスの一意な識別子 (通常は TargetDevice の文字列表現)
- 戻り値:
既存または新規の ITaskExecutor への共有ポインター
- virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0#
アイドル状態の CPU ストリーム・エグゼキューターを返します。
- パラメーター:
config – ストリーム・エグゼキューターの設定
- 戻り値:
ストリーム・エグゼキューターの設定へのポインター
- virtual void set_property(const ov::AnyMap &properties) = 0#
エグゼキューター・マネージャーを構成できます。
- パラメーター:
properties – 設定付きマップ
- virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0#
- interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor#
- #include <istreams_executor.hpp>
Streams タスク・エグゼキューターのインターフェイス。このエグゼキューターは、ワーカースレッドを
streams
にグループ化します。- CPU
エグゼキューターは、1 つのストリームから提供されるスレッドによりすべての並列タスクを実行します。適切なピニング設定を行うと、メモリーにバインドされたワークロードのキャッシュミスが軽減されます。
- NUMA
NUMA ホストでは、GetNumaNodeId() メソッドを使用して、現在のストリームの NUMA ノードを定義できます。
ov::threading::CPUStreamsExecutor によってサブクラス化されます。
パブリックタイプ
- enum ThreadBindingType#
推論スレッドのバインディング・タイプを定義します。
値:
- enumerator NONE#
推論スレッドをバインドしてはなりません。
- enumerator CORES#
推論スレッドを CPU コアにバインド (ラウンドロビン)
- enumerator NUMA#
NUMA ノードにバインドします (‘CORES’ が実装されていない Windows*/macOS* 上の非ハイブリッド CPU のデフォルトモード)
- enumerator HYBRID_AWARE#
コアタイプに応じてランタイムが推論スレッドをバインドするようにします (ハイブリッド CPU のデフォルトモード)
- enumerator NONE#
- using Ptr = std::shared_ptr<IStreamsExecutor>#
IStreamsExecutor インターフェイスへの共有ポインター
パブリック関数
- ~IStreamsExecutor() override#
仮想デストラクター。
- virtual int get_stream_id() = 0#
現在のストリームのインデックスを返します。
- 戻り値:
現在のストリームのインデックス。ストリームスレッドから呼び出されなかった場合は例外をスローします
- virtual int get_numa_node_id() = 0#
現在の NUMA ノード の ID を返します。現在のストリームがいくつかの NUMA ノードにまたがる場合は 0 を返します。
- 戻り値:
現在の NUMA ノード の
ID
、またはストリームスレッドから呼び出されなかった場合は例外をスローします
- virtual int get_socket_id() = 0#
現在のソケットの ID を返します。現在のストリームがいくつかのソケットにまたがる場合は 0 を返します。
- 戻り値:
現在のソケットの
ID
、またはストリームスレッドから呼び出されなかった場合は例外をスローします
- virtual void execute(Task task) = 0#
ストリーム・エグゼキューターの構成と制約に従って、現在のスレッドでタスクを実行します。
- パラメーター:
task – 開始するタスク
- virtual void run_sub_stream(Task task, int id) = 0#
タスク・エグゼキューターのサブストリーム内で ov::Task を実行します。
- パラメーター:
task – 開始するタスク
id – サブストリーム id
- void run_sub_stream_and_wait(const std::vector<Task> &tasks)#
すべてのタスクを実行し、完了するまで待機します。デフォルトの run_sub_stream_and_wait() メソッド実装では、run_sub_stream() 純粋仮想メソッドと STL の高レベル同期プリミティブが使用されます。タスクは std::packaged_task にラップされ、std::future を返します。std::packaged_task はタスクを呼び出し、タスクが終了するかタスクから例外がスローされたことを std::future に通知します。その後、std::future を使用してタスク実行の完了とタスク例外の抽出を待機します。
注
run_sub_stream_and_wait() はタスクをコピーまたはキャプチャーすることはありません。
- パラメーター:
tasks – 実行するタスクのベクトル
- struct Config#
- #include <istreams_executor.hpp>
IStreamsExecutor 構成を定義します。
パブリックタイプ
パブリック関数
- inline Config(std::string name = "StreamsExecutor", int streams = 1, int threads_per_stream = 0, ov::hint::SchedulingCoreType thread_preferred_core_type = ov::hint::SchedulingCoreType::ANY_CORE, bool cpu_reservation = false, bool cpu_pinning = false, std::vector<std::vector<int>> streams_info_table = {})#
引数を持つコンストラクター。
- パラメーター:
name – [in] エグゼキューター名
streams – [in]
threads_per_stream – [in]
thread_preferred_core_type – [in]
cpu_reservation – [in]
cpu_pinning – [in]
streams_info_table – [in]
- inline Config(std::string name = "StreamsExecutor", int streams = 1, int threads_per_stream = 0, ov::hint::SchedulingCoreType thread_preferred_core_type = ov::hint::SchedulingCoreType::ANY_CORE, bool cpu_reservation = false, bool cpu_pinning = false, std::vector<std::vector<int>> streams_info_table = {})#
- interface ITaskExecutor#
- #include <itask_executor.hpp>
タスク・エグゼキューターのインターフェイス。OpenVINO は、すべての非同期内部タスクを実行するため
ov::ITaskExecutor
インターフェイスを使用します。タスク・エグゼキューターの各種実装は、さまざまな目的で使用できます:メモリーにバインドされた CPU タスクのキャッシュ局所性を向上させるため、一部のエグゼキューターはタスクのアフィニティーと最大同時実行性を制限します。
1 つのワーカースレッドを持つエグゼキューターを使用して、アクセラレーション・デバイスへのアクセスをシリアル化できます。
即時タスク・エグゼキューターを使用すると、
ov::ITaskExecutor
インターフェイスの制限を満たしつつ、現在のスレッドでタスクを実行できます。
同期#
タスクの実行完了を待機するのは、
ov::ITaskExecutor
ユーザーの責任です。タスクの完了を待機するc++11
の標準的な方法は、std::packaged_task
またはstd::promise
をstd::future
とともに使用することです。以下は、std::promise
を使用してタスクの完了を待機し、タスクの例外を処理する方法の例です。// std::promise は移動のみのオブジェクトなので、コピー呼び出し制約を満たすために std::shared_ptr を使用します auto promise = std::make_shared<std::promise<void>>(); // promise が作成されると、std::future を取得して結果を待機できます auto future = promise->get_future(); // かなり簡単なタスク ov::threading::Task task = [] { std::cout << "Some Output" << std::endl; }; // エグゼキューターを作成 ov::threading::ITaskExecutor::Ptr taskExecutor = std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{}); if (taskExecutor == nullptr) { // ProcessError(e); return; } // タスクと promise を取得。タスクがタスク実行コンテキストで実行される場合 // std::promise::set_value() メソッドを定期的に呼び出す taskExecutor->run([task, promise] { std::exception_ptr currentException; try { task(); } catch(...){ // 例外がある場合は、現在の例外へのポインタを保存 currentException = std::current_exception(); } if (nullptr == currentException) { promise->set_value(); // <-- 問題がなければ std::promise::set_value() を呼び出す } else { promise->set_exception(currentException); // <-- 例外がある場合は std::future オブジェクトに転送 } }); // タスクの完了を待つには std::future::wait メソッドを呼び出す future.wait(); // 現在のスレッドはここでブロックされ、std::promise::set_value() または、 // std::promise::set_exception() メソッドが呼び出されるまで待機 // 将来例外を保存すると、std::future::get メソッドで再スローされます try { future.get(); } catch(std::exception& /*e*/) { // ProcessError(e); }
注
実装ではすべてのメソッドのスレッド安全性を保証する必要があります
ov::threading::IStreamsExecutor、ov::threading::ImmediateExecutor でサブクラス化されます
パブリックタイプ
- using Ptr = std::shared_ptr<ITaskExecutor>#
ITaskExecutor インターフェイスへの共有ポインター
パブリック関数
- virtual ~ITaskExecutor() = default#
オブジェクトを破棄します。
- virtual void run_and_wait(const std::vector<Task> &tasks)#
すべてのタスクを実行し、完了するまで待機します。デフォルトの run_and_wait() メソッド実装では、run() 純粋仮想メソッドと STL の高レベル同期プリミティブが使用されます。タスクは std::packaged_task にラップされ、std::future を返します。std::packaged_task はタスクを呼び出し、タスクが終了するかタスクから例外がスローされたことを std::future に通知します。その後、std::future を使用してタスク実行の完了とタスク例外の抽出を待機します。
注
run_and_wait() はタスクをコピーまたはキャプチャーすることはありません。
- パラメーター:
tasks – 実行するタスクのベクトル
- using Task = std::function<void()>#