Group Threading utilities¶
- group ov_dev_api_threading
Threading API providing task executors for asynchronous operations.
Typedefs
-
using Task = std::function<void()>¶
OpenVINO Task Executor can use any copyable callable without parameters and output as a task. It would be wrapped into std::function object.
-
class ImmediateExecutor : public ov::threading::ITaskExecutor
- #include <immediate_executor.hpp>
Task executor implementation that just run tasks in current thread during calling of run() method.
Public Types
-
using Ptr = std::shared_ptr<ImmediateExecutor>
A shared pointer to a ImmediateExecutor object.
Public Functions
-
~ImmediateExecutor() override = default
Destroys the object.
-
inline virtual void run(Task task) override
Execute ov::Task inside task executor context.
- Parameters
task – A task to start
-
using Ptr = std::shared_ptr<ImmediateExecutor>
-
class CPUStreamsExecutor : public ov::threading::IStreamsExecutor
- #include <cpu_streams_executor.hpp>
CPU Streams executor implementation. The executor splits the CPU into groups of threads, that can be pinned to cores or NUMA nodes. It uses custom threads to pull tasks from single queue.
Public Types
-
using Ptr = std::shared_ptr<CPUStreamsExecutor>
A shared pointer to a CPUStreamsExecutor object.
Public Functions
-
explicit CPUStreamsExecutor(const Config &config)
Constructor.
- Parameters
config – Stream executor parameters
-
~CPUStreamsExecutor() override
A class destructor.
-
virtual void run(Task task) override
Execute ov::Task inside task executor context.
- Parameters
task – A task to start
-
virtual void execute(Task task) override
Execute the task in the current thread using streams executor configuration and constraints.
- Parameters
task – A task to start
-
virtual int get_stream_id() override
Return the index of current stream.
- Returns
An index of current stream. Or throw exceptions if called not from stream thread
-
virtual int get_numa_node_id() override
Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.
- Returns
ID
of current NUMA Node, or throws exceptions if called not from stream thread
-
virtual int get_socket_id() override
Return the id of current socket Return 0 when current stream cross some sockets.
- Returns
ID
of current socket, or throws exceptions if called not from stream thread
-
using Ptr = std::shared_ptr<CPUStreamsExecutor>
-
interface ExecutorManager¶
- #include <executor_manager.hpp>
Interface for tasks execution manager. This is global point for getting task executor objects by string id. It’s necessary in multiple asynchronous requests for having unique executors to avoid oversubscription. E.g. There 2 task executors for CPU device: one - in FPGA, another - in OneDNN. Parallel execution both of them leads to not optimal CPU usage. More efficient to run the corresponding tasks one by one via single executor.
Public Functions
-
virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0¶
Returns executor by unique identificator.
- Parameters
id – An unique identificator of device (Usually string representation of TargetDevice)
- Returns
A shared pointer to existing or newly ITaskExecutor
-
virtual std::shared_ptr<ov::threading::IStreamsExecutor> get_idle_cpu_streams_executor(const ov::threading::IStreamsExecutor::Config &config) = 0¶
Returns idle cpu streams executor.
- Parameters
config – Streams executor config
- Returns
pointer to streams executor config
-
virtual void set_property(const ov::AnyMap &properties) = 0¶
Allows to configure executor manager.
- Parameters
properties – map with configuration
-
virtual std::shared_ptr<ov::threading::ITaskExecutor> get_executor(const std::string &id) = 0¶
-
interface IStreamsExecutor : public virtual ov::threading::ITaskExecutor¶
- #include <istreams_executor.hpp>
Interface for Streams Task Executor. This executor groups worker threads into so-called
streams
.- CPU
The executor executes all parallel tasks using threads from one stream. With proper pinning settings it should reduce cache misses for memory bound workloads.
- NUMA
On NUMA hosts GetNumaNodeId() method can be used to define the NUMA node of current stream
Subclassed by ov::threading::CPUStreamsExecutor
Public Types
-
enum ThreadBindingType¶
Defines inference thread binding type.
Values:
-
enumerator NONE¶
Don’t bind the inference threads.
-
enumerator CORES¶
Bind inference threads to the CPU cores (round-robin)
-
enumerator NUMA¶
Bind to the NUMA nodes (default mode for the non-hybrid CPUs on the Win/MacOS, where the ‘CORES’ is not implemeneted)
-
enumerator HYBRID_AWARE¶
Let the runtime bind the inference threads depending on the cores type (default mode for the hybrid CPUs)
-
enumerator NONE¶
-
using Ptr = std::shared_ptr<IStreamsExecutor>¶
A shared pointer to IStreamsExecutor interface
Public Functions
-
~IStreamsExecutor() override¶
A virtual destructor.
-
virtual int get_stream_id() = 0¶
Return the index of current stream.
- Returns
An index of current stream. Or throw exceptions if called not from stream thread
-
virtual int get_numa_node_id() = 0¶
Return the id of current NUMA Node Return 0 when current stream cross some NUMA Nodes.
- Returns
ID
of current NUMA Node, or throws exceptions if called not from stream thread
-
virtual int get_socket_id() = 0¶
Return the id of current socket Return 0 when current stream cross some sockets.
- Returns
ID
of current socket, or throws exceptions if called not from stream thread
-
struct Config¶
- #include <istreams_executor.hpp>
Defines IStreamsExecutor configuration.
Public Functions
-
inline Config(std::string name = "StreamsExecutor", int streams = 1, int threadsPerStream = 0, ThreadBindingType threadBindingType = ThreadBindingType::NONE, int threadBindingStep = 1, int threadBindingOffset = 0, int threads = 0, PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY, std::vector<std::vector<int>> streamsInfoTable = {}, bool cpuReservation = false)¶
A constructor with arguments.
- Parameters
name – [in] The executor name
streams – [in]
threadsPerStream – [in]
threadBindingType – [in]
threadBindingStep – [in]
threadBindingOffset – [in]
threads – [in]
threadPreferredCoreType – [in]
streamsInfoTable – [in]
cpuReservation – [in]
-
void set_property(const ov::AnyMap &properties)¶
Sets configuration.
- Parameters
properties – map of properties
-
inline Config(std::string name = "StreamsExecutor", int streams = 1, int threadsPerStream = 0, ThreadBindingType threadBindingType = ThreadBindingType::NONE, int threadBindingStep = 1, int threadBindingOffset = 0, int threads = 0, PreferredCoreType threadPreferredCoreType = PreferredCoreType::ANY, std::vector<std::vector<int>> streamsInfoTable = {}, bool cpuReservation = false)¶
-
interface ITaskExecutor¶
- #include <itask_executor.hpp>
Interface for Task Executor. OpenVINO uses
ov::ITaskExecutor
interface to run all asynchronous internal tasks. Different implementations of task executors can be used for different purposes:To improve cache locality of memory bound CPU tasks some executors can limit task’s affinity and maximum concurrency.
The executor with one worker thread can be used to serialize access to acceleration device.
Immediate task executor can be used to satisfy
ov::ITaskExecutor
interface restrictions but run tasks in current thread.
Synchronization¶
It is
ov::ITaskExecutor
user responsibility to wait for task execution completion. Thec++11
standard way to wait task completion is to usestd::packaged_task
orstd::promise
withstd::future
. Here is an example of how to usestd::promise
to wait task completion and process task’s exceptions:// std::promise is move only object so to satisfy copy callable constraint we use std::shared_ptr auto promise = std::make_shared<std::promise<void>>(); // When the promise is created we can get std::future to wait the result auto future = promise->get_future(); // Rather simple task ov::threading::Task task = [] { std::cout << "Some Output" << std::endl; }; // Create an executor ov::threading::ITaskExecutor::Ptr taskExecutor = std::make_shared<ov::threading::CPUStreamsExecutor>(ov::threading::IStreamsExecutor::Config{}); if (taskExecutor == nullptr) { // ProcessError(e); return; } // We capture the task and the promise. When the task is executed in the task executor context // we munually call std::promise::set_value() method taskExecutor->run([task, promise] { std::exception_ptr currentException; try { task(); } catch(...) { // If there is some exceptions store the pointer to current exception currentException = std::current_exception(); } if (nullptr == currentException) { promise->set_value(); // <-- If there is no problems just call std::promise::set_value() } else { promise->set_exception(currentException); // <-- If there is an exception forward it to std::future object } }); // To wait the task completion we call std::future::wait method future.wait(); // The current thread will be blocked here and wait when std::promise::set_value() // or std::promise::set_exception() method will be called. // If the future store the exception it will be rethrown in std::future::get method try { future.get(); } catch(std::exception& /*e*/) { // ProcessError(e); }
Note
Implementation should guaranty thread safety of all methods
Subclassed by ov::threading::IStreamsExecutor, ov::threading::ImmediateExecutor
Public Types
-
using Ptr = std::shared_ptr<ITaskExecutor>¶
A shared pointer to ITaskExecutor interface
Public Functions
-
virtual ~ITaskExecutor() = default¶
Destroys the object.
-
virtual void run(Task task) = 0¶
Execute ov::Task inside task executor context.
- Parameters
task – A task to start
-
virtual void run_and_wait(const std::vector<Task> &tasks)¶
Execute all of the tasks and waits for its completion. Default run_and_wait() method implementation uses run() pure virtual method and higher level synchronization primitives from STL. The task is wrapped into std::packaged_task which returns std::future. std::packaged_task will call the task and signal to std::future that the task is finished or the exception is thrown from task Than std::future is used to wait for task execution completion and task exception extraction.
Note
run_and_wait() does not copy or capture tasks!
- Parameters
tasks – A vector of tasks to execute
-
using Task = std::function<void()>¶