gRPC ストリーミング API

はじめに

OpenVINO モデルサーバーは、双方向ストリーミングに個別の RPC を追加する gRPC KServe 拡張機能を実装しています。これは、クライアントが単一の要求を送信して単一の応答を返す単項 RPC 以外に、クライアントは ModelStreamInfer プロシージャーを使用して接続を開始し、任意の数と順序でメッセージを送受信できることを意味します。

service GRPCInferenceService
{
  ...
  // Standard
  rpc ModelInfer(ModelInferRequest) returns (ModelInferResponse) {}
  // Extension
  rpc ModelStreamInfer(stream ModelInferRequest) returns (stream ModelStreamInferResponse) {}
  ...
}

これは、MediaPipe グラフを提供する場合に非常に役立ちます。単項推論 RPC では、各要求は個別の独立した MediaPipe グラフ・インスタンスを使用します。ただし、ストリーミング推論では、RPC MediaPipe グラフはクライアント・セッションごとに 1 回だけ作成され、同じ gRPC ストリームの後続の要求で再利用されます。

  • これにより、グラフの初期化のオーバーヘッドが回避され、全体のスループットが向上します

  • 後続の受信要求間で状態を保持できるようにします

  • ソース計算機が入力パケットをフィードせず複数の応答をストリーミングできるようにします

diagram

グラフ選択

ストリームを開いた後、最初の gRPC 要求は、どのグラフ定義が実行に選択されるか定義します (model_name フィールド)。その後、後続の要求はサーバブル名とバージョンを一致させる必要があります。一致しない場合、エラーが報告され、入力パケットはグラフにプッシュされません。ただし、グラフは正しい要求に対して引き続き使用できます。

注: 要求されたグラフが存在しないか、リタイアした場合、サーバーは最初の要求後にストリームを閉じます。

タイムスタンプ

MediaPipe グラフでは、同期のためパケットにタイムスタンプ情報を含める必要があります。グラフ内の各入力ストリームでは、タイムスタンプは単調増加します。詳細は、MediaPipe タイムスタンプを参照してください。

自動タイムスタンプ

デフォルトでは、OpenVINO モデルサーバーはタイムスタンプを自動的に割り当てます。各 gRPC 要求は、タイムライン 0 から始まる個別のポイントとして扱われます。各要求は順番に逆シリアル化され、タイムスタンプが 1 つずつ増加します。

注: これは、同じタイムスタンプを持つ複数の入力を送信するには、クライアントがそれを 1 つの要求にパックする (または手動のタイムスタンプを使用する) 必要があることを意味します。

手動タイムスタンプ

オプションで、クライアントは要求パラメーター OVMS_MP_TIMESTAMP を介してタイムスタンプを手動で含めることもできます。これは、要求から逆シリアル化されたすべてのパケットに適用されます。

手動/自動タイムスタンプを混在させることができます。正しい逆シリアル化ステップの後、デフォルトの自動タイムスタンプは常に previous_timestamp + 1 と等しくなります。

tritonclient (Python pip パッケージ) を使用して要求にタイムスタンプを追加します。

FRAME_TIMESTAMP_US = 43166520112  # example value
TIMESTAMP_PARAM_NAME = 'OVMS_MP_TIMESTAMP'

triton_client.async_stream_infer(model_name=graph_name,
                                        inputs=inputs,
                                        parameters={TIMESTAMP_PARAM_NAME: FRAME_TIMESTAMP_US})

ストリーム応答の読み取り

MediaPipe グラフで 1 つの出力パケットが準備できると、すぐに gRPC 応答にシリアル化されクライアントに返されます。すべての MediaPipe パケットにはタイムスタンプ情報が含まれるため、OpenVINO モデルサーバーはそれをメッセージに含めます。これは、クライアント側でのアプリケーションの同期に役立ちます。

TIMESTAMP_PARAM_NAME = 'OVMS_MP_TIMESTAMP'

def callback(result, error):
  ...
  timestamp = result.get_response().parameters[TIMESTAMP_PARAM_NAME].int64_param

要求間の状態の保持

ストリーム内の後続の要求は、MediaPipe グラフの同じインスタンスにアクセスできることに注意してください。これにより、中間状態を保存し、ステートフルに動作するグラフを実装できます。これは、オブジェクト追跡のユースケースでは利点かもしれません。

エラー処理

MediaPipe グラフでは、後続のクライアント・ストリーム読み取り操作の間に内部処理エラーがないかチェックされます。グラフで回復不能なエラーが発生した場合、そのメッセージがクライアントに返され、ストリームが閉じられます。