OpenVINO モデルサーバーで Python を実行

はじめに

この機能は現在プレビュー段階であり、将来のバージョンでは機能とユーザー・インターフェイスの一部の動作が変更される可能性があります。

バージョン 2023.3 以降、OpenVINO モデルサーバーはカスタム Python コードの実行をサポートしています。このようなコードは、単純な前処理または後処理だけでなく、画像やテキストの生成などの複雑なタスクも実行できます。

Python の実行は、Python コードを実行するグ​​ラフノードを作成できるビルトインの PythonExecutorCalculator により MediaPipe 経由で有効になります。Python ノードは、スタンドアロンのサーバブル (単一ノードグラフ) として使用することも、より大きな MediaPipe グラフの一部として使用することもできます。

この機能の使用方法を示す簡単な例は、クイックスタート・ガイドを参照してください。

実際のユースケースは、生成 AI のデモをご覧ください。

Docker イメージのビルド

Docker Hub で公開されている openvino/model_server イメージは Python をサポートしていますが、外部モジュールはインストールされていません。Python だけが必要な場合は、パブリックイメージを変更なしで使用できます。それ以外は、Python コードの実行に必要なモジュールをインストールする追加レイヤーを使用してパブリックイメージを拡張する必要があります。例えば、コードに numpy が必要なケースを考えます。この場合、Dockerfile は次のようになります。

FROM openvino/model_server:latest
USER root
ENV LD_LIBRARY_PATH=/ovms/lib
ENV PYTHONPATH=/ovms/lib/python
RUN apt update && apt install -y python3-pip git
RUN pip3 install numpy
ENTRYPOINT [ `/ovms/bin/ovms` ]

Python デモやリポジトリーの最上位ディレクトリーから次を実行して、requirements.txt を変更することもできます: make python_image

OvmsPythonModel クラス

Python ノードをデプロイする場合、モデルサーバーは OvmsPythonModel クラスが実装された Python ファイルを想定します。

class OvmsPythonModel:

    def initialize(self, kwargs):
        """
        `initialize` is called when model server loads graph definition. 
        It allows to initialize and maintain state between subsequent execute() calls 
        and even graph instances. For gRPC unary, graphs are recreated per request. 
        For gRPC streaming, there can be multiple graph instances existing at the same time. 
        OvmsPythonModel object is initialized with this method and then shared between all graph instances. 
        Implementing this function is optional.

        Parameters:
        -----------
        kwargs : dict
            Available arguments:
            * node_name: string
                Name of the node in the graph
            * input_names: list of strings
                List of input stream names defined for the node in graph
                configuration
            * output_names: list of strings
                List of output stream names defined for the node in graph
                configuration
        -----------
        """
        print("Running initialize...")

    def execute(self, inputs):
        """
        `execute` is called in `Process` method of PythonExecutorCalculator
        which in turn is called by the MediaPipe framework. How MediaPipe
        calls the `Process` method for the node depends on the configuration
        and the two configurations supported by PythonExecutorCalculator are:
        
        * Regular: `execute` is called with a set of inputs and returns a set of outputs. 
        For unary endpoints it's the only possible configuration.
        
        * Generative: `execute` is called with a set of inputs and returns a generator. 
        The generator is then called multiple times with no additional input data and produces
        multiple sets of outputs over time. Works only with streaming endpoints. 

        Implementing this function is required.

        Parameters:
        -----------
        * inputs: list of pyovms.Tensor
        -----------
        
        Returns: list of pyovms.Tensor or generator
        """
        ...
        return outputs

    def finalize(self):
        """
        `finalize` is called when model server unloads graph definition.
        It allows to perform any cleanup actions before the graph definition
        is removed. Implementing this function is optional.
        """
        print("Running finalize...")

initialize

initialize は、モデルサーバーがグラフ定義をロードするときに呼び出されます。これにより、後続の execute 呼び出し間やグラフ・インスタンス間での状態の初期化と維持が可能になります。

gRPC 単項の場合、グラフは要求ごとに再作成されます。

gRPC ストリーミングでは、複数のグラフ・インスタンスが同時に存在する可能性があります。

OvmsPythonModel オブジェクトはこのメソッドで初期化され、すべてのグラフ・インスタンス間で共有されます。

パラメーターと戻り値

initialize は辞書である kwargs パラメーターを指定して呼び出されます。kwargs にはノード設定からの情報が含まれます。サンプルを検討してみます。

node {
  name: <NODE_NAME>
  ...
  input_stream: "<INPUT_TAG>:<INPUT_NAME>"
  input_stream: "<INPUT_TAG>:<INPUT_NAME>"
  ...
  output_stream: "<OUTPUT_TAG>:<OUTPUT_NAME>"
  output_stream: "<OUTPUT_TAG>:<OUTPUT_NAME>"
  ...
}

すべてのキーは文字列です。
利用可能なキーと値:

キー

値のタイプ

説明

node_name

文字列

グラフ内のノードの名前。上記のサンプルの <NODE_NAME>

input_names

文字列のリスト

上記のサンプルのすべての入力ストリームからの <INPUT_NAME> のリスト

outputs_names

文字列のリスト

上記のサンプルのすべての出力ストリームからの <OUTPUT_NAME> のリスト

initialize は戻り値を返しません。

エラー処理

問題が発生したことを通知するには、例外をスローする必要があります。モデルサーバーが初期化の例外をキャッチすると、グラフ内のすべての Python リソース (正しくロードされたノードに属するリソースを含む) がクリーンアップされ、グラフ全体が使用不可の状態になります。

注: --log_level DEBUG パラメーターを指定してモデルサーバーを実行し、サーバーログ内のエラー情報を取得します。

この関数の実装はオプションです

execute

executePythonExecutorCalculatorProcess メソッドで呼び出され、その後 MediaPipe フレームワークによって呼び出されます。MediaPipe がノードの Process を呼び出す方法は構成によって異なります。PythonExecutorCalculator でサポートされる 2 つの構成は次のとおりです。

通常

execute は入力を指定して呼び出され、出力を返します。単項エンドポイントの場合、これが唯一可能な構成です。実装側でそのモードを使用するには、execute は出力を返す必要があります。

def execute(self, inputs):
    ...
    return outputs

説明されている構成に関する詳細については、実行モードのセクションを参照してください。

生成

execute は一連の入力を使用して呼び出され、ジェネレーターを返します。その後、ジェネレーターは追加の入力データなしで複数回呼び出され、時間の経過とともに複数の出力セットを生成します。ストリーミング・エンドポイントでのみ動作します。実装側でそのモードを使用するには、execute は出力を生成する必要があります。

def execute(self, inputs):
    # For single set on inputs generate 10 sets of outputs
    for _ in range(10):
        ... 
        yield outputs

説明されている構成に関する詳細については、実行モードのセクションを参照してください。

パラメーターと戻り値

execute は、pyovms.Tensorリストである inputs パラメーターを使用して呼び出されます。

モードに応じて、次を返す必要があります。

  • 通常モードの場合: list of pyovms.Tensor

  • 生成モードの場合: list of pyovms.Tensor を生成する generator

したがって、モードに応じて、execute は常に list of pyovms.Tensorreturn または yield する必要があります。

グラフから複数の Python 出力を返す

このメソッドは出力をリストとして返しますが、各出力は MediaPipe フロー内の個別のパケットであるため、それらは同時にデスティネーションに到着しないことに注意してください。ノードの出力がグラフからの出力でもある場合、使用されるエンドポイントの種類に応じて動作は異なります。

  • 単項エンドポイントの場合、モデルサーバーはグラフからすべての出力を収集し、それらを 1 つの応答にまとめて送信します

  • ストリーミング・エンドポイントの場合、モデルサーバーは出力をパックし、到着するとすぐに応答で送信します。これは、executeX 個の出力のリストを返した場合、クライアントはそれらの出力を X 個の個別の応答で受け取ることを意味します。その後、受信した応答に含まれるタイムスタンプを使用して出力を収集できます。

エラー処理

問題が発生したことを通知するには、例外をスローする必要があります。例外は PythonExecutorCalculator によってキャッチされ、ログに記録されて非 OK ステータスが返されます。次に、モデルサーバーはそのステータスを読み取り、グラフをエラー状態に設定します。次に、すべてのグラフの入力ストリームを閉じ、進行中のアクションが終了するのを待機します。それが完了すると、グラフは削除されます。

この動作は、使用される gRPC エンドポイントの種類 (単項またはストリーミング) によってクライアント与える影響は異なります。

  • 単項

    単項エンドポイントを使用すると、要求ごとにグラフが作成、実行、破棄されます。execute でエラーが発生すると、モデルサーバーはログに記録し、応答としてエラーメッセージを送信します。

  • ストリーミング

    ストリーミング・エンドポイントを使用すると、ストリーム内の最初の要求に対してグラフが作成され、その後のすべての要求で再利用されます。

    最初の要求で execute がエラーが発生した場合 (例えば、Python コードが期待どおりに動作しないなど)、モデルサーバーはそれをログに記録し、応答としてエラーメッセージを送信します。グラフが破棄されます。

    後続の要求の execute でエラーが発生した場合 (例えば、間違ったデータが受信されたなど)、モデルサーバーはそれをログに記録し、MediaPipe はグラフにエラーを設定しますが、クライアントは別の要求を送信するまでエラー メッセージを受け取りません。次の要求がストリームから読み取られると、モデルサーバーはグラフにエラーがあるかどうかを確認し、それを破棄してクライアントに応答を送信します。エラーがすぐに送信されるように、動作を修正する予定です。

    現時点ではグラフは回復できないので、execute でエラーが発生するとストリームが閉じられるため、新しいストリームを作成する必要があります。

注: --log_level DEBUG パラメーターを指定してモデルサーバーを実行し、サーバーログ内のエラー情報を取得します。

この機能の実装は必須です。

finalize

finalize は、モデルサーバーがグラフ定義をアンロードするときに呼び出されます。これにより、グラフが削除される前にクリーンアップ・アクションを実行できます。

パラメーターと戻り値

finalize にはパラメーターがなく値を返しません。

エラー処理

問題が発生したことを通知するには、例外をスローする必要があります。モデルサーバーはファイナライズの例外をキャッチすると、それをログに記録してアンロードを続行します。

注: --log_level DEBUG パラメーターを指定してモデルサーバーを実行し、サーバーログ内のエラー情報を取得します。

この関数の実装はオプションです。

Python テンソル

PythonExecutorCalculator は、名前、形状、データタイプなどの追加情報とともにデータをラップする専用の Tensor クラスで動作します。そのクラスのオブジェクトは、入力として execute メソッドに渡され、出力として返されます。また、グラフ内のノード間、およびグラフとモデルサーバーのコア間でラップされ、交換されます。

この Tensor クラスは、Python バッファープロトコルを実装する Python バインディングを備えた C++ クラスです。これは、ビルトインモジュール pyovms にあります。

Tensor コンテンツへのアクセス

pyovms.Tensor 属性:

名前

タイプ

説明

名前

文字列

ノードの入力または出力ストリームにも関連付けられる文字列の名前

形状

タプル

テンソルの形状を定義する数値のタプル

データタイプ

文字列

バッファー内の要素のタイプ

データ

メモリービュー

ベースとなるデータバッファーのメモリービュー

サイズ

番号

データバッファーのサイズ (バイト単位)

注: datatype 属性はバッファープロトコル実装の一部ではありません。バッファープロトコルは、構造体フォーマット文字を使用する format 値を使用します。これは data メモリービューから読み取ることができます。これら 2 つはマッピングされています。データタイプに関する考慮事項を参照してください。

pyovms.Tensor はバッファープロトコルを実装するため、別のタイプに変換できます。

def execute(self, inputs):
    input_tensor_bytes = bytes(inputs[0])
    ...
    import numpy as np
    input_tensor_ndarray = np.array(inputs[1])
    ...

出力テンソルの作成

入力は execute 関数に提供されますが、出力はユーザーが準備する必要があります。出力オブジェクトは、pyovms.Tensor クラス・コンストラクターを使用して作成できます。

Tensor(name, data)

  • name: Tensor データを特定の名前に関連付ける文字列。この名前は、ノード内の正しい出力ストリームにデータをプッシュするため PythonExecutorCalculator によっても使用されます。詳細については、ノード構成セクションを参照してください。

  • data: Python バッファープロトコルを実装するオブジェクト。これは、byte などのビルトインタイプ、または numpy.ndarray などの外部モジュールからのタイプ・インスタンスである可能性があります。

from pyovms import Tensor

class OvmsPythonModel:
    def execute(self, inputs):
        # Create Tensor called my_output with encoded text
        output = Tensor("my_output", "some text".encode())
        # A list of Tensors is expected, even if there's only one output
        return [output]

Tensor は別のタイプから作成されると、バッファープロトコルに必要なすべてのフィールドを独自に適応させます。このような場合、データタイプ形状も明示的に定義されません。データタイプに関する考慮事項を参照してください。

ノードが別の Python ノードに接続されている場合、このノードの出力にプッシュされた Tensor は別のノードの入力になります。

データタイプに関する考慮事項

pyovms.Tensor オブジェクトが作成およびアクセスされる場所は 2 つあります。

  • OvmsPythonModel クラスの execute メソッド内

  • Python ノードの入力または出力がグラフの入力または出力である場合、シリアル化および逆シリアル化中のモデルサーバーのコア内

モデルサーバーは、テンソルの予期されるデータタイプを定義する KServe API を介して gRPC インターフェイスで要求を受信し、応答を送信します。一方、Python バッファープロトコルでは、形式を構造体形式の文字として指定する必要があります。

クライアント側で構造体形式の文字の使用を強制せずに、ユーザーが KServe タイプを操作できるようにするため、モデルサーバーは要求から Tensor オブジェクトを作成する際に次のマッピングを実行しようとします。

KServe タイプ

文字形式

BOOL

?

UINT8

B

UINT16

H

UINT32

I

UINT64

Q

INT8

b

INT16

h

INT32

i

INT64

q

FP16

e

FP32

f

FP64

d

execute メソッドで別の Python オブジェクトから Tensor を作成する場合、同じマッピングが逆に適用されます。

Tensor オブジェクトは常に Tensor.datatype 属性と Tensor.data.format 属性の両方の値を保持するため、逆シリアル化とシリアル化だけでなく、グラフ内の別のノードでも使用できます。

状況によっては、ユーザーは上記にリストされていない複雑なタイプを扱うことができ、モデルサーバーでもそれが可能になります。

カスタムタイプ

テンソルの datatype フィールドは文字列ですが、モデルサーバーは上記の KServe タイプにないデータタイプを拒否しません。要求でカスタムタイプが定義されており、サーバーがそれをフォーマット文字にマップできない場合、それを 1 次元の生バイナリーバッファーとして扱い B に変換します。一貫性を保つために、ベースとなるバッファーの形状も、要求で定義される形状とは異なります。次の例を見てみます。

  1. モデルサーバーは、次の入力を含む要求を受け取ります。

    • データタイプ: “my_string”

    • 形状: (3,)

    • データ: UTF-8 でエンコードされた “null0terminated0string0” 文字列

  2. モデルサーバーは以下を使用して pyovms.Tensor を作成します。

    • Tensor.datatype: “my_string”

    • Tensor.shape: (3,)

    • Tensor.data.format: “B”

    • Tensor.data.shape: (23,)

execute で、ユーザーは要求からの情報と内部バッファーの様子の両方にアクセスできます。

上記のシナリオは、グラフ入力に直接接続されているノードにのみ当てはまります。execute 内で生成された pyovms.Tensor オブジェクトは、その作成元のオブジェクトからフィールドを継承するため、ユーザーはデータタイプを手動で設定できません。その場合、tensor は前述のマッピングに従ってバッファープロトコル formatdatatype にマップしようとします。

失敗した場合、datatypeformat に設定され、そのようなテンソルがグラフの出力テンソルである場合、クライアントは出力データのタイプに関する情報を受け取ります。

構成とデプロイ

Python は、ビルトインの PythonExecutorCalculator によって MediaPipe 経由で有効になるため、OVMS で Python コードを実行するには、この計算機を使用するノードを含むグラフを作成する必要があります。

グラフの構成方法は、デプロイ全体に大きな影響を与えます。次を定義します。

  • グラフの入力と出力

  • グラフ内の各ノードの入力と出力

  • ノード間の接続

  • グラフとノードのオプション

  • 入力ストリームハンドラー (ノードで Process を起動するために満たすべき条件を定義します)

PythonExecutorCalculator

構成の大部分はノードの設定です。Python ノードは PythonExecutorCalculator を使用し、名前を付ける必要があります。基本的な例を示します。

node {
  name: "python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:input"
  output_stream: "OUTPUT:output"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/model.py"
    }
  }
}

詳細を確認します。

  • name: モデルサーバー内でノードを識別する名前。グラフ内のすべての Python ノードには一意の名前が必要です。

  • calculator: ノードで使用される計算機を示します。PythonExecutorCalculator である必要があります。

  • input_side_packet: モデルサーバーから Python ノードに渡される共有データ。これにより、複数のグラフ・インスタンス間で OvmsPythonModel の状態を共有できます。PYTHON_NODE_RESOURCES:py である必要があります。

  • input_stream: [TAG]:[NAME] の形式で入力を定義します。MediaPipe では、インデックス [TAG]:[INDEX]:[NAME] を使用した構成が許可されますが、PythonExecutorCalculator ではそれが無視されます。

  • output_stream: [TAG]:[NAME] の形式で出力を定義します。MediaPipe では、インデックス [TAG]:[INDEX]:[NAME] を使用した構成が許可されますが、PythonExecutorCalculator ではそれが無視されます。

  • handler_path: PythonExecutorCalculator のオプションは 1 つだけです。これは、OvmsPythonModel 実装を含む Python ファイルへのパスです。

Python コードの入力と出力ストリーム

ノードの入力ストリームと出力ストリームの構成方法は、OvmsPythonModelexecute メソッド内の pyovms.Tensor オブジェクトの名前に直接影響します。これまでの単純な構成には次のものがあります。

input_stream: "INPUT:input"
output_stream: "OUTPUT:output"

入力と出力ストリームは両方とも [TAG]:[NAME] として構築されます。この例では次のようになります。

  • タグ INPUT と名前を使用した入力 input

  • タグ OUTPUT と名前を含む出力 output

Python コードでは、常に [NAME] 部分を参照する必要があります。したがって、execute 内には次のようになります。

from pyovms import Tensor

class OvmsPythonModel:
    def execute(self, inputs):
        my_input = inputs[0]
        my_input.name == "input" # true             
        my_output = Tensor("output", "some text".encode())
        return [my_output]

インデックス経由で入力にアクセス

基本的な構成では、予期されるすべての入力を使用して execute が実行される場合、inputs リスト内の Tensors の順序はランダムではありません。PythonExecutorCalculator が入力ストリームを反復処理して Tensors を作成する際に、ストリームはタグによってソートされます。この知識は、特定のストリームからデータに直接アクセスするための execute メソッドを作成するときに役立ちます。例を参照してください。

node {
  name: "python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "B:b"
  input_stream: "A:a"
  input_stream: "C:c"
  output_stream: "OUTPUT:output"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/model.py"
    }
  }
}

入力には次のようにアクセスできます。

from pyovms import Tensor

class OvmsPythonModel:
    def execute(self, inputs):
        a = inputs[0] # Tensor with name "a" from input stream with tag "A"
        b = inputs[1] # Tensor with name "b" from input stream with tag "B"
        c = inputs[2] # Tensor with name "c" from input stream with tag "C"
        ...

注: ノード構成と execute 実装は常に一致する必要があります。例えば、ノードが不完全な入力を処理するように構成されている場合、インデックスを介して Tensors にアクセスするのは有用ではありません。

グラフの入力と出力ストリーム

ここまではノードの入力と出力ストリームについてのみ説明しましたが、この構成ではグラフの入力と出力ストリームを定義する必要もあります。このルールはノードレベルでの動作と類似しているため、ストリームは [TAG]:[NAME] の形式で記述されますが、それだけではありません。

グラフレベルでは、[TAG] はストリームで予期されるオブジェクト・タイプに関する情報を提供することで、モデルサーバーの逆シリアル化とシリアル化を支援します。モデルサーバーはタグを読み取り、事前定義されたプリフィクスの 1 つで始まることを期待します。グラフ入力ストリームが Python ノードに接続されている場合、タグは OVMS_PY_TENSOR で始まる必要があります。これは、要求から pyovms.Tensor オブジェクトへの入力を逆シリアル化する必要があることをサーバーに指示します。

入力ストリーム間に同じタグが 2 つ以上存在してはなりません。また、出力ストリーム間に同じタグが 2 つ以上存在することもできません。この場合は、プリフィクスの後に一意の文字列を続ける必要があります。

input_stream: "OVMS_PY_TENSOR_IMAGE:image"
input_stream: "OVMS_PY_TENSOR_TEXT:text"
output_stream: "OVMS_PY_TENSOR:output"

注: 同じルールがノードの入力と出力ストリームに適用されます。

[NAME] は、グラフの入力と出力をノードに接続するのに使用されます。これらは、サーバーの要求と応答の入力と出力名でもあります。

複数のノード

同じグラフ内に複数の Python ノードを含める場合に理解すべきことがあります。

  • すべての Python ノードにはグラフスコープ内で一意の名前が必要です。

  • すべての Python ノードには OvmsPythonModel の独自のインスタンスがあり、2 つのノードの handler_path が同一であっても共有されません。

  • PythonExecutorCalculator ベースのノードは、コンバーターを必要とせずに直接接続できます。

  • ノードは同じ Python ファイルを再利用できますが、サーバーで使用されるすべての Python ファイルには一意の名前が必要です。そうしないと、一部のノードが期待どおりに動作しない可能性があります。例えば、/ovms/workspace1/model.py および /ovms/workspace2/model.py では、実質的にロードされる model.py は 1 つだけです (これは将来のバージョンで変更される予定です)。

基本的な例

3 つの Python ノードが順番に設定された構成の例を見てみましょう。

input_stream: "OVMS_PY_TENSOR:first_number"
output_stream: "OVMS_PY_TENSOR:last_number"

node {
  name: "first_python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:first_number"
  output_stream: "OUTPUT:second_number"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/incrementer.py"
    }
  }
}

node {
  name: "second_python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:second_number"
  output_stream: "OUTPUT:third_number"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/incrementer.py"
    }
  }
}

node {
  name: "third_python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:third_number"
  output_stream: "OUTPUT:last_number"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/incrementer.py"
    }
  }
}

この例では、クライアントは first_number という入力を送信し、last_number という出力を受け取ります。ユーザーは Python コードの入力名と出力名にアクセスできるため、インクリメント用のコードは汎用であり、すべてのノードで再利用できます。

incrementer.py

from pyovms import Tensor

def increment(input):
    # Some code for input incrementation
    ...
    return output

class OvmsPythonModel:
    # Assuming this code is used with nodes
    # that have single input and single output
    
    def initialize(self, kwargs):
        self.output_name = kwargs["output_names"][0]

    def execute(self, inputs):
        my_input = inputs[0]           
        my_output = Tensor(self.output_name, increment(my_input))
        return [my_output]

モデルサーバー構成ファイル

Python コードとグラフ構成を含む pbtxt ファイルの準備が完了すると、モデルサーバーの構成は非常に単純であり、次のようになります。

{
    "model_config_list": [],
    "mediapipe_config_list": [
        {
            "name":"python_graph",
            "graph_path":"/ovms/workspace/graph.pbtxt"
        }
    ]
}

ここで、name はグラフの名前を定義し、graph_path にはグラフ構成ファイルへのパスが含まれます。

クライアント側の考慮事項

推論 API と利用可能なエンドポイント

Python の実行は MediaPipe サービングフロー経由でサポートされているため、その機能拡張と制限は継承されます。注意すべきことは、MediaPipe グラフは KServe API 経由でのみ利用できるということです。

クライアントの視点から見ると、モデルサーバーはグラフを提供し、ユーザーはグラフを操作します。グラフ内の単一ノードには外部からアクセスできません。

グラフ・クライアントでは次のことができます。

  • 要求状態 (gRPC と REST)

  • 要求メタデータ (gRPC と REST)

  • 要求推論 (gRPC)

OpenVINO モデルサーバーで MediaPipe フローがどのように機能するかについて詳しく学びます

推測では、グラフ入力ストリームの形式が OvmsPyTensor の場合、KServe 要求内のデータは、KServe API に基づいて raw_input_contents フィールドにカプセル化される必要があります。グラフに OvmsPyTensor 出力ストリームがある場合、KServe 応答のデータは raw_output_contents フィールドにあります。

raw_input_contents で渡されたデータは、pyovms.Tensor オブジェクトの data 属性を介してグラフ入力に接続されたノードの execute メソッドでアクセスできます。

入力と出力では、形状データタイプのパラメーターも定義されます。これらの値は、pyovms.Tensor でもアクセスできます。ただし、出力の場合は、それらの値を応答に直接提供しません。データタイプに関する考慮事項を参照してください。

次の例を見てみます。

# client.py

import tritonclient.grpc as grpcclient
...
client = grpcclient.InferenceServerClient("localhost:9000")
inputs = []
with open("image_path", 'rb') as f:
    image_data = f.read()
image_input = grpcclient.InferInput("image", [len(image_data)], "BYTES")
image_input._raw_content = image_data

text_encoded = "some text".encode()
text_input = grpcclient.InferInput("text", [len(text_encoded)], "BYTES")
text_input._raw_content = text_encoded

numpy_array = np.array([[1.0, 2.0, 3.0], [4.0, 5.0, 6.0]])
numpy_input = grpcclient.InferInput("numpy", numpy_array.shape, "FP32")
numpy_input.set_data_from_numpy(numpy_array)

results = client.infer("model_name", [image_input, text_input, numpy_input])
# model.py

from pyovms import Tensor
from PIL import Image
import io
import numpy as np
...
class OvmsPythonModel:

    def execute(self, inputs):
        # Read inputs 
        image_input = inputs[0]
        print(image_input.shape) # (<image_binary_size>, )
        print(image_input.datatype) # "BYTES"

        text_input = inputs[1]
        print(text_input.shape) # (<string_binary_size>, )
        print(text_input.datatype) # "BYTES"

        numpy_input = inputs[2]
        print(text_input.shape) # (2, 3)
        print(text_input.datatype) # "FP32"
        
        # Convert pyovms.Tensor objects to more useful formats

        # Pillow Image created from image bytes
        image = Image.open(io.BytesIO(bytes(image_data)))
        # Python string "some text"
        text = bytes(text_input).decode()
        # Numpy array with shape (2, 3) and dtype float32
        ndarray = np.array(numpy_input)
        ...

タイムスタンプ

Mediapipe グラフはパケットを処理し、すべてのパケットにはタイムスタンプがあります。すべてのストリーム (入力と出力の両方) のパケットのタイムスタンプは昇順である必要があります。

推論を要求するとき、ユーザーは自動タイムスタンプを使用するか、OVMS_MP_TIMESTAMP パラメーターとして要求とともにタイムスタンプ自体を送信するかを決定できます。タイムスタンプの詳細は、こちらを参照してください。

Python ノード PythonExecutorCalculator に関しては、次のようになります。

  • 通常の実行モードの場合は、単にタイムスタンプを伝播します。つまり、入力タイムスタンプを出力タイムスタンプとして使用します。

  • 生成実行モードの場合、入力のタイムスタンプを保存し、このタイムスタンプを使用して最初の出力セットをダウンストリームに送信します。その後、タイムスタンプはセットごとに増加するため、出力パッケージの次のセットには昇順のタイムスタンプが付きます。

単一のグラフ・インスタンスでの複数の生成サイクル

ノードはタイムスタンプを保持し、新しい入力が到着するたびにタイムスタンプを上書きすることに注意してください。これは、単一のグラフ・インスタンスで複数の生成サイクルを実行する場合は、手動のタイムスタンプを使用する必要があることを意味します。次の要求のタイムスタンプは、最後の応答で受信したタイムスタンプより大きくなければなりません

gRPC ストリーミングで出力を同期

タイムスタンプは、入力と出力の両方、およびグラフ内の異なるストリームからのパケットを同期するときに重要な役割を果たします。MediaPipe はグラフの出力をモデルサーバーに提供します。次に何が起こるかは、使用されるエンドポイントによって異なります。

  • gRPC 単項エンドポイントでは、サーバーは必要なすべての出力からのパケットを待機し、それらを 1 つの応答で送信します。

  • gRPC ストリーミング・エンドポイントでは、サーバーは出力パケットが到着するとすぐにシリアル化し、個別の応答で送り返します。

つまり、グラフに 2 つ以上の出力があり、gRPC ストリーミング・エンドポイントを使用している場合は、出力の収集を行う必要があります。これには、OVMS_MP_TIMESTAMP を使用します。

timestamp = result.get_response().parameters["OVMS_MP_TIMESTAMP"].int64_param

高度な構成

実行モード

Python ノードは、通常と生成の 2 つの実行モードで実行するように構成できます。

通常の実行モードでは、ノードは 1 セットの入力ごとに 1 セットの出力を生成します。これは、gRPC 単項エンドポイントとストリーミング・エンドポイントの両方を介して動作し、コンピューター・ビジョンなどのユースケースで一般的なモードです。

生成実行モードでは、ノードは単一の入力セットごとに、時間の経過とともに複数の出力セットを生成します。これは gRPC ストリーミング・エンドポイント経由でのみ機能し、合計処理時間が大きく、実行が完了する前に中間結果を返したいユースケースに役立ちます。このモードは、より対話的な方法でサービスを提供する大規模言語モデルに適しています。

使用するモードに応じて、Python コードとグラフ構成の両方が一致している必要があります。

通常モード

通常モードを使用する場合、OvmsPythonModel クラスの execute メソッドは値を返す必要があります。

from pyovms import Tensor
...
  def execute(self, inputs):
        ...           
        my_output = Tensor("output", data)
        return [my_output]

execute が戻ると、PythonExecutorCalculator は出力を取得してグラフにプッシュします。ノード Process メソッドは入力セットごとに 1 回呼び出されます。このような実装は、次のような基本的なグラフ設定と組み合わせることができます。

node {
  name: "python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:input"
  output_stream: "OUTPUT:output"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/model.py"
    }
  }
}

生成モード

生成モードを使用する場合、OvmsPythonModel クラスの execute メソッドは yield 値を生成する必要があります。

from pyovms import Tensor
...
  def execute(self, inputs):
        ...
        for data in data_stream:         
          my_output = Tensor("output", data)
          yield [my_output]

execute が成功すると、PythonExecutorCalculator はジェネレーターを保存します。その後、生成されたシーケンスの最後に到達するまで繰り返し呼び出します。ノード Process メソッドは、単一の入力セットごとに複数回呼び出されます。この動作をトリガーするには、特定のグラフ構成が必要です。次の画面を参照してください。

node {
  name: "python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:input"
  input_stream: "LOOPBACK:loopback"
  input_stream_info: {
    tag_index: 'LOOPBACK:0',
    back_edge: true
  }
  input_stream_handler {
    input_stream_handler: "SyncSetInputStreamHandler",
    options {
      [mediapipe.SyncSetInputStreamHandlerOptions.ext] {
        sync_set {
          tag_index: "LOOPBACK:0"
        }
      }
    }
  }
  output_stream: "OUTPUT:output"
  output_stream: "LOOPBACK:loopback"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/model.py"
    }
  }
}

通常モードにも存在する基本構成とは別に、このグラフにはいくつかの追加コンテンツが含まれています。見直してみます。

  1. LOOPBACK 入力と出力ストリーム

    input_stream: "LOOPBACK:loopback"
    ...
    output_stream: "LOOPBACK:loopback"
    

    この追加の入力および出力ストリームのセットにより、ノード内の内部サイクルが有効になります。これは、受信パケットなしで Process 呼び出しをトリガーするのに使用されるため、新しいデータなしでジェネレーターを呼び出します。入力と出力ストリームの値は両方とも全く同じである必要があり、PythonExecutorCalculator はタグが常に LOOPBACK であることを想定します。

    LOOPBACK 入力は execute メソッドに渡されず、ユーザーはいかなる方法でもメソッドと対話しません。

  2. バックエッジのアノテーション

    input_stream_info: {
      tag_index: 'LOOPBACK:0',
      back_edge: true
    }
    

    ここでは、タグ LOOPBACK とインデックス 0 を持つ入力ストリームがサイクルの作成に使用されることを示します。LOOPBACK タグに複数のインデックスがある場合、PythonExecutorCalculator はそれを無視します。

  3. SyncSetInputStreamHandler

    input_stream_handler {
      input_stream_handler: "SyncSetInputStreamHandler",
      options {
        [mediapipe.SyncSetInputStreamHandlerOptions.ext] {
          sync_set {
            tag_index: "LOOPBACK:0"
          }
        }
      }
    }
    

    通常の構成では、DefaultInputStreamHandler がデフォルトで使用されますが、生成モードの場合はそれでは十分ではありません。デフォルトのハンドラーが定義されている場合、ノードは Process を呼び出す前にすべての入力ストリームを待ちます。生成モードでは、Process はグラフからのデータに対して 1 回呼び出され、その後 LOOPBACK で信号を受信することによってのみ複数回呼び出されます。ただし、グラフと LOOPBACK からの入力が同時に存在することはありません。

    生成モードが機能するには、グラフからの入力と LOOPBACK を分離する必要があります。つまり、Process はグラフからの一連の入力を使用して呼び出すことができますが、LOOPBACK のみを使用して呼び出すこともできます。これは、SyncSetInputStreamHandler を介して実現できます。上記の設定サンプルでは、LOOPBACK を使用してセットを作成しますが、これにより、残りのすべての入力を含む別のセットも暗黙的に作成されます。実際には、互いに依存しない 2 つのセットが存在します。

    • LOOPBACK

    • … ユーザーが指定した他のすべての入力

サイクルが終了したときに、同じグラフ・インスタンスを再利用しないことをお勧めします。代わりに、新しいデータを生成する場合は、新しい gRPC ストリームを作成します。

動作する構成とコードサンプルについては、デモを参照してください。

不完全な入力

ノード構成で定義された入力のサブセットのみを使用してプロセスを起動するユースケースがあります。デフォルトでは、ノードは同じタイムスタンプを持つすべての入力を待ち、すべてが利用可能になったら Process を起動します。この動作は、デフォルトで使用される DefaultInputStreamHandler によって実装されます。入力のサブセットのみを使用して Process を起動するようにノードを構成するには、異なる入力ポリシーに対して異なる入力ストリームハンドラーを使用する必要があります。

このような構成は生成実行モードで使用されますが、別の例を見てみます。

node {
  name: "python_node"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT1:labels"
  input_stream: "INPUT2:image"
  input_stream_handler {
    input_stream_handler: "ImmediateInputStreamHandler",
  }
  output_stream: "OUTPUT:result"
  node_options: {
    [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/model.py"
    }
  }
}

ImmediateInputStreamHandler で構成されるノードは、入力が到着するとプロセスを起動します (同期は行われません)。このような構成は、OvmsPythonModel クラスの実装と一致する必要があります。
例:

from pyovms import Tensor

class OvmsPythonModel:

    def initialize(self, kwargs: dict):
        self.model = load_model(...)
        self.labels = []

    def execute(self, inputs: list):
        outputs = []
        for input in inputs:
            if input.name == "labels":
                self.labels = prepare_new_labels(input)
            else: # the only other name is "image"
                output = self.model.process(input, self.labels)
                return [Tensor("result", output)]

上記のシナリオでは、ノードは提供されたラベルのセットを使用して画像に対して何らかの処理を実行します。構成で不完全な入力の送信が許可されている場合、クライアントはラベルを 1 回だけ送信し、その後は画像のみを送信できます。

注: OvmsPythonModel オブジェクトのメンバーは、すべてのグラフ・インスタンス間で共有されることに注意してください。これは、上記のシナリオで 1 つのグラフ内の 1 つのクライアントがラベルを変更した場合、その変更は他のすべてのグラフ・インスタンス (そのグラフに要求を送信する他のすべてのクライアント) にも反映されることを意味します。単一のグラフ・インスタンスに限定される実行間のデータの保存は、将来のバージョンでサポートされる予定です。

不完全な出力

PythonExecutorCalculator は、execute メソッドで不完全な出力セットを返すことを許可します。これは、各グラフ出力を個別の応答でシリアル化するストリーミング・エンドポイントを操作する場合に役立ちます。例を参照してください。

node {
 name: "python_node"
 calculator: "PythonExecutorCalculator"
 input_side_packet: "PYTHON_NODE_RESOURCES:py"
 input_stream: "INPUT:input"
 output_stream: "OUTPUT:result"
 output_stream: "ERROR:error_message"
 node_options: {
   [type.googleapis.com / mediapipe.PythonExecutorCalculatorOptions]: {
     handler_path: "/ovms/workspace/model.py"
   }
 }
}

このノードを実行する Python コードは次のようになります。

from pyovms import Tensor

class OvmsPythonModel:

    def initialize(self, kwargs: dict):
        self.model = load_model(...)

    def execute(self, inputs: list):
        input = inputs[0]
        try:
            output = self.model(input)
        except Exception:
          return [Tensor("error_message", "Error occurred during execution".encode())]
        return [Tensor("result", output)]

この場合、クライアントは、ストリーム上で受信する出力に応じて、異なるアクションを実装できます。

別の構成例は、生成モードで実行中に生成が終了したことを通知することです。このソリューションはテキスト生成デモで使用されます。

計算機のタイプ変換

Python ノードは、C++ 側と Python 側の両方で使用できる専用の Python Tensor オブジェクトを操作します。このアプローチの欠点は、通常、他の計算機がそのようなオブジェクトを読み取ったり作成したりできないことです。これは、Python ノードを Python ノード以外に直接接続できないことを意味します。

そのためコンバーター計算機が存在します。これらはノード間のアダプターとして機能し、2 つの異なるタイプのパケットを処理する計算機の接続に必要な変換を実装します。

PyTensorOvTensorConverterCalculator

OpenVINO モデルサーバーには、Python TensorOV Tensor 間の変換を提供するビルトイン PyTensorOvTensorConverterCalculator が備わっています。

現在、PyTensorOvTensorConverterCalculator は 1 つの入力と 1 つの出力のみで動作します。

  • Python Tensor を期待するストリームにはタグ OVMS_PY_TENSOR必要です

  • OV Tensor を期待するストリームにはタグ OVTENSOR必要です

将来のバージョンでは、コンバーター計算機は複数の入力を受け入れ、複数の出力を生成するようになりますが、現時点では、1 つの入力ストリームと 1 つの出力ストリームを使用する構成が設定されています。変換方向に応じて、これらのストリームの 1 つはタグ OVMS_PY_TENSOR を持ち、もう 1 つはタグ OVTENSOR を持つ必要があります

PyTensorOvTensorConverterCalculator は、tag_to_output_tensor_names マップでノードオプションを使用するように構成することもでき、OV Tensor から Python Tensor への変換で使用されます。出力ストリームのタグに基づいて、Python Tensor が作成される名前を定義します。

グラフ内で両方の変換が行われる例を参照してください。

input_stream: "OVMS_PY_TENSOR:input"
output_stream: "OVMS_PY_TENSOR:output"

node {
  name: "PythonPreprocess"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:input"
  output_stream: "OUTPUT:preprocessed_py"
  node_options: {
    [type.googleapis.com/mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/preprocess.py"
    }
  }
}

node {
  calculator: "PyTensorOvTensorConverterCalculator"
  input_stream: "OVMS_PY_TENSOR:preprocessed_py"
  output_stream: "OVTENSOR:preprocessed_ov"
}

node {
  calculator: "OpenVINOInferenceCalculator"
  input_side_packet: "SESSION:session" # inference session
  input_stream: "OVTENSOR:preprocessed_ov"
  output_stream: "OVTENSOR:result_ov"
}

node {
  calculator: "PyTensorOvTensorConverterCalculator"
  input_stream: "OVTENSOR:result_ov"
  output_stream: "OVMS_PY_TENSOR:result_py"
  node_options: {
    [type.googleapis.com/mediapipe.PyTensorOvTensorConverterCalculatorOptions]: {
      tag_to_output_tensor_names {
        key: "OVMS_PY_TENSOR"
        value: "result_py"
      }
    }
  }
}

node {
  name: "PythonPostprocess"
  calculator: "PythonExecutorCalculator"
  input_side_packet: "PYTHON_NODE_RESOURCES:py"
  input_stream: "INPUT:result_py"
  output_stream: "OUTPUT:output"
  node_options: {
    [type.googleapis.com/mediapipe.PythonExecutorCalculatorOptions]: {
      handler_path: "/ovms/workspace/postprocess.py"
    }
  }
}

Python ノード、OV 推論ノード、コンバーター・ノードを使用するグラフの完全な例については、CLIP デモを参照してください。