グループスレッド化ユーティリティー#

group ov_dev_api_threading

非同期操作用のタスクのエグゼキューターを提供するスレッド化 API。

Typedef

using Task = std::function<void()>#

OpenVINO タスク・エグゼキューターは、パラメーターなしでコピー可能な呼び出し可能オブジェクトをタスクとして出力できます。これは std::function オブジェクトにラップされます。

template<typename T>
using ThreadLocal = tbb::enumerable_thread_specific<T>#

オブジェクトをスレッドローカルに維持するラッパークラス。

テンプレート・パラメーター:

T – スレッドをローカルに保つオブジェクトのタイプ。

class ImmediateExecutor : public ov::threading::ITaskExecutor#
#include <immediate_executor.hpp>

run() メソッドの呼び出し中に現在のスレッドでタスクを実行するタスク・エグゼキューターの実装。

パブリックタイプ

using Ptr = std::shared_ptr<ImmediateExecutor>#

ImmediateExecutor オブジェクトへの共有ポインター。

パブリック関数

~ImmediateExecutor() override = default#

オブジェクトを破棄します。

inline virtual void run(Task task) override#

タスク・エグゼキューターのコンテキスト内で ov::Task を実行します。

パラメーター:

task – 開始するタスク

class CPUStreamsExecutor : public ov::threading::IStreamsExecutor#
#include <cpu_streams_executor.hpp>

CPU ストリーム・エグゼキューターの実装。実行プログラムは CPU をスレッドのグループに分割し、コアまたは NUMA ノードに固定することができます。カスタムスレッドを使用して、単一のキューからタスクを取得します。

パブリックタイプ

using Ptr = std::shared_ptr<CPUStreamsExecutor>#

CPUStreamsExecutor オブジェクトへの共有ポインター。

パブリック関数

explicit CPUStreamsExecutor(const Config &config)#

コンストラクター

パラメーター:

config – ストリーム・エグゼキューターのパラメーター

~CPUStreamsExecutor() override#

クラス・デストラクター

virtual void run(Task task) override#

タスク・エグゼキューターのコンテキスト内で ov::Task を実行します。

パラメーター:

task – 開始するタスク

virtual void execute(Task task) override#

ストリーム・エグゼキューターの構成と制約に従って、現在のスレッドでタスクを実行します。

パラメーター:

task – 開始するタスク

virtual int get_stream_id() override#

現在のストリームのインデックスを返します。

戻り値:

現在のストリームのインデックス。ストリームスレッドから呼び出されなかった場合は例外をスローします

virtual int get_numa_node_id() override#

現在の NUMA ノード の ID を返します。現在のストリームがいくつかの NUMA ノードにまたがる場合は 0 を返します。

戻り値:

現在の NUMA ノードID、またはストリームスレッドから呼び出されなかった場合は例外をスローします

virtual int get_socket_id() override#

現在のソケットの ID を返します。現在のストリームがいくつかのソケットにまたがる場合は 0 を返します。

戻り値:

現在のソケットの ID、またはストリームスレッドから呼び出されなかった場合は例外をスローします

virtual void run_sub_stream(Task task, int id) override#

タスク・エグゼキューターのサブストリーム内で ov::Task を実行します。

パラメーター:
  • task – 開始するタスク

  • id – サブストリーム id

interface ExecutorManager#
#include <executor_manager.hpp>

タスク実行マネージャーのインターフェイス。これは、文字列 ID によってタスク・エグゼキューター・オブジェクトを取得するグローバルポイントです。複数の非同期要求によるオーバーサブスクリプションを回避するには、一意のエグゼキューターが必要です。例えば、CPU デバイスに 2 つのタスク・エグゼキューターがあるとします (1 つは FPGA にあり、もう 1 つは OneDNN)。両方を並列実行すると、CPU 使用率は最適ではありません。単一のエグゼキューターを介して対応するタスクを 1 つずつ実行すると、より効率的です。

パブリック関数

virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0#

一意な識別子によってエグゼキューターを返します。

パラメーター:

id – デバイスの一意な識別子 (通常は TargetDevice の文字列表現)

戻り値:

既存または新規の ITaskExecutor への共有ポインター

virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0#

アイドル状態の CPU ストリーム・エグゼキューターを返します。

パラメーター:

config – ストリーム・エグゼキューターの設定

戻り値:

ストリーム・エグゼキューターの設定へのポインター

virtual void set_property(const ov::AnyMap &properties) = 0#

エグゼキューター・マネージャーを構成できます。

パラメーター:

properties – 設定付きマップ

virtual ov::Any get_property(const std::string &name) const = 0#

設定を返します。

パラメーター:

name – プロパティー名

戻り値:

プロパティー

virtual void execute_task_by_streams_executor(ov::hint::SchedulingCoreType core_type, ov::threading::Task task) = 0#

特定のタスクを実行する一時的なエグゼキューターを作成

パラメーター:
  • core_type – cpu コアタイプ

  • task – 実行するタスク

interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor#
#include <istreams_executor.hpp>

Streams タスク・エグゼキューターのインターフェイス。このエグゼキューターは、ワーカースレッドを streams にグループ化します。

CPU

エグゼキューターは、1 つのストリームから提供されるスレッドによりすべての並列タスクを実行します。適切なピニング設定を行うと、メモリーにバインドされたワークロードのキャッシュミスが軽減されます。

NUMA

NUMA ホストでは、GetNumaNodeId() メソッドを使用して、現在のストリームの NUMA ノードを定義できます。

ov::threading::CPUStreamsExecutor によってサブクラス化されます。

パブリックタイプ

enum ThreadBindingType#

推論スレッドのバインディング・タイプを定義します。

値:

enumerator NONE#

推論スレッドをバインドしてはなりません。

enumerator CORES#

推論スレッドを CPU コアにバインド (ラウンドロビン)

enumerator NUMA#

NUMA ノードにバインドします (‘CORES’ が実装されていない Windows*/macOS* 上の非ハイブリッド CPU のデフォルトモード)

enumerator HYBRID_AWARE#

コアタイプに応じてランタイムが推論スレッドをバインドするようにします (ハイブリッド CPU のデフォルトモード)

using Ptr = std::shared_ptr<IStreamsExecutor>#

IStreamsExecutor インターフェイスへの共有ポインター

パブリック関数

~IStreamsExecutor() override#

仮想デストラクター。

virtual int get_stream_id() = 0#

現在のストリームのインデックスを返します。

戻り値:

現在のストリームのインデックス。ストリームスレッドから呼び出されなかった場合は例外をスローします

virtual int get_numa_node_id() = 0#

現在の NUMA ノード の ID を返します。現在のストリームがいくつかの NUMA ノードにまたがる場合は 0 を返します。

戻り値:

現在の NUMA ノードID、またはストリームスレッドから呼び出されなかった場合は例外をスローします

virtual int get_socket_id() = 0#

現在のソケットの ID を返します。現在のストリームがいくつかのソケットにまたがる場合は 0 を返します。

戻り値:

現在のソケットの ID、またはストリームスレッドから呼び出されなかった場合は例外をスローします

virtual void execute(Task task) = 0#

ストリーム・エグゼキューターの構成と制約に従って、現在のスレッドでタスクを実行します。

パラメーター:

task – 開始するタスク

virtual void run_sub_stream(Task task, int id) = 0#

タスク・エグゼキューターのサブストリーム内で ov::Task を実行します。

パラメーター:
  • task – 開始するタスク

  • id – サブストリーム id

void run_sub_stream_and_wait(const std::vector<Task> &tasks)#

すべてのタスクを実行し、完了するまで待機します。デフォルトの run_sub_stream_and_wait() メソッド実装では、run_sub_stream() 純粋仮想メソッドと STL の高レベル同期プリミティブが使用されます。タスクは std::packaged_task にラップされ、std::future を返します。std::packaged_task はタスクを呼び出し、タスクが終了するかタスクから例外がスローされたことを std::future に通知します。その後、std::future を使用してタスク実行の完了とタスク例外の抽出を待機します。

run_sub_stream_and_wait() はタスクをコピーまたはキャプチャーすることはありません。

パラメーター:

tasks – 実行するタスクのベクトル

struct Config#
#include <istreams_executor.hpp>

IStreamsExecutor 構成を定義します。

パブリックタイプ

enum class StreamsMode#

この列挙タイプには、メインストリームの状況を示す各サブ・ストリーム・モードの定義が含まれます。

値:

enumerator SUB_STREAMS_NULL#

サブストリームを作成しないでください。

enumerator SUB_STREAMS_FOR_SOCKET#

メインストリーム内に複数のソケットのサブストリームを作成します。

enumerator LATENCY#

レイテンシー・モード

enumerator THROUGHPUT#

スループット・モード

パブリック関数

inline Config(std::string name = "StreamsExecutor", int streams = 1, int threads_per_stream = 0, ov::hint::SchedulingCoreType thread_preferred_core_type = ov::hint::SchedulingCoreType::ANY_CORE, bool cpu_reservation = false, bool cpu_pinning = false, std::vector<std::vector<int>> streams_info_table = {})#

引数を持つコンストラクター。

パラメーター:
  • name[in] エグゼキューター名

  • streams[in]

  • threads_per_stream[in]

  • thread_preferred_core_type[in]

  • cpu_reservation[in]

  • cpu_pinning[in]

  • streams_info_table[in]

void set_property(const ov::AnyMap &properties)#

構成のセット。

パラメーター:

properties – プロパティーのマップ

void set_property(const std::string &key, const ov::Any &value)#

構成のセット。

パラメーター:
  • key – プロパティー名

  • value – プロパティー値

ov::Any get_property(const std::string &key) const#

設定値を返します。

パラメーター:

key – 設定キー

戻り値:

ov::Any にラップされた設定値

パブリック静的関数

static Config make_default_multi_threaded(const Config &initial)#

ハードウェア・プロパティーを使用して、初期設定から未設定の値を適切にマルチスレッド構成ファイルで作成します。

パラメーター:

initial – 初期設定

戻り値:

設定値

interface ITaskExecutor#
#include <itask_executor.hpp>

タスク・エグゼキューターのインターフェイス。OpenVINO は、すべての非同期内部タスクを実行するため ov::ITaskExecutor インターフェイスを使用します。タスク・エグゼキューターの各種実装は、さまざまな目的で使用できます:

  • メモリーにバインドされた CPU タスクのキャッシュ局所性を向上させるため、一部のエグゼキューターはタスクのアフィニティーと最大同時実行性を制限します。

  • 1 つのワーカースレッドを持つエグゼキューターを使用して、アクセラレーション・デバイスへのアクセスをシリアル化できます。

  • 即時タスク・エグゼキューターを使用すると、ov::ITaskExecutor インターフェイスの制限を満たしつつ、現在のスレッドでタスクを実行できます。

同期#

タスクの実行完了を待機するのは、ov::ITaskExecutor ユーザーの責任です。タスクの完了を待機する c++11 の標準的な方法は、std::packaged_task または std::promisestd::future とともに使用することです。以下は、std::promise を使用してタスクの完了を待機し、タスクの例外を処理する方法の例です。

    // std::promise は移動のみのオブジェクトなので、コピー呼び出し制約を満たすために std::shared_ptr を使用します 
    auto promise = std::make_shared<std::promise<void>>(); 
    // promise が作成されると、std::future を取得して結果を待機できます 
    auto future = promise->get_future(); 
    // かなり簡単なタスク 
ov::threading::Task task = [] { 
        std::cout << "Some Output" << std::endl; 
    }; 
    // エグゼキューターを作成 
    ov::threading::ITaskExecutor::Ptr taskExecutor = 
        std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{}); 
    if (taskExecutor == nullptr) { 
        // ProcessError(e); 
        return; 
    } 
    // タスクと promise を取得。タスクがタスク実行コンテキストで実行される場合 
    // std::promise::set_value() メソッドを定期的に呼び出す 
    taskExecutor->run([task, promise] { 
        std::exception_ptr currentException; 
        try { 
            task(); 
        } catch(...){ 
            // 例外がある場合は、現在の例外へのポインタを保存 
            currentException = std::current_exception(); 
        } 

        if (nullptr == currentException) { 
            promise->set_value(); // <-- 問題がなければ std::promise::set_value() を呼び出す 
        } else { 
            promise->set_exception(currentException); // <-- 例外がある場合は std::future オブジェクトに転送 
        } 
    }); 
    // タスクの完了を待つには std::future::wait メソッドを呼び出す 
    future.wait(); // 現在のスレッドはここでブロックされ、std::promise::set_value() または、 
    // std::promise::set_exception() メソッドが呼び出されるまで待機 
        // 将来例外を保存すると、std::future::get メソッドで再スローされます 
    try { 
        future.get(); 
    } catch(std::exception& /*e*/) { 
        // ProcessError(e); 
    }

実装ではすべてのメソッドのスレッド安全性を保証する必要があります

ov::threading::IStreamsExecutorov::threading::ImmediateExecutor でサブクラス化されます

パブリックタイプ

using Ptr = std::shared_ptr<ITaskExecutor>#

ITaskExecutor インターフェイスへの共有ポインター

パブリック関数

virtual ~ITaskExecutor() = default#

オブジェクトを破棄します。

virtual void run(Task task) = 0#

タスク・エグゼキューターのコンテキスト内で ov::Task を実行します。

パラメーター:

task – 開始するタスク

virtual void run_and_wait(const std::vector<Task> &tasks)#

すべてのタスクを実行し、完了するまで待機します。デフォルトの run_and_wait() メソッド実装では、run() 純粋仮想メソッドと STL の高レベル同期プリミティブが使用されます。タスクは std::packaged_task にラップされ、std::future を返します。std::packaged_task はタスクを呼び出し、タスクが終了するかタスクから例外がスローされたことを std::future に通知します。その後、std::future を使用してタスク実行の完了とタスク例外の抽出を待機します。

run_and_wait() はタスクをコピーまたはキャプチャーすることはありません。

パラメーター:

tasks – 実行するタスクのベクトル