OpenVINO™ によるリアルタイムの人間の姿勢推定

この Jupyter ノートブックはオンラインで起動でき、ブラウザーのウィンドウで対話型環境を開きます。ローカルにインストールすることもできます。次のオプションのいずれかを選択します。

Binder GitHub

このノートブックは、Open Model Zoo の OpenPose human-pose-estimation-0001 モデルを使用して、OpenVINO によるリアルタイムの姿勢推定について説明します。このノートブックの最後では、ウェブカメラからのリアルタイムの推論結果が表示されます。 さらに、ビデオファイルをアップロードすることもできます。

注:ウェブカメラを使用するには、ウェブカメラを備えたコンピューター上でこの Jupyter ノートブックを実行する必要があります。サーバー上で実行すると、ウェブカメラは機能しなくなります。ただし、最終ステップではビデオに対して推論を行うことができます。

目次

%pip install -q "openvino>=2023.1.0"
DEPRECATION: pytorch-lightning 1.6.5 has a non-standard dependency specifier torch>=1.8.*. pip 24.1 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of pytorch-lightning or contact the author to suggest that they release a version with a conforming dependency specifiers. Discussion can be found at https://github.com/pypa/pip/issues/12063
Note: you may need to restart the kernel to use updated packages.

インポート

import collections
import sys
import time
from pathlib import Path

import cv2
import numpy as np
from IPython import display
from numpy.lib.stride_tricks import as_strided
import openvino as ov

from decoder import OpenPoseDecoder

sys.path.append("../utils")
import notebook_utils as utils

モデル

モデルのダウンロード

notebook_utils ファイルの関数である download_file を使用します。ディレクトリー構造が自動的に作成され、選択したモデルがダウンロードされます。

別のモデルをダウンロードする場合は、以下のコード内のモデルの名前と精度を置き換えます。

注: これには、別のポーズデコーダーが必要になることがあります。

# A directory where the model will be downloaded.
base_model_dir = Path("model")

# The name of the model from Open Model Zoo.
model_name = "human-pose-estimation-0001"
# Selected precision (FP32, FP16, FP16-INT8).
precision = "FP16-INT8"

model_path = base_model_dir / "intel" / model_name / precision / f"{model_name}.xml"

if not model_path.exists():
    model_url_dir = f"https://storage.openvinotoolkit.org/repositories/open_model_zoo/2022.1/models_bin/3/{model_name}/{precision}/"
    utils.download_file(model_url_dir + model_name + '.xml', model_path.name, model_path.parent)
    utils.download_file(model_url_dir + model_name + '.bin', model_path.with_suffix('.bin').name, model_path.parent)
model/intel/human-pose-estimation-0001/FP16-INT8/human-pose-estimation-0001.xml:   0%|          | 0.00/474k [0…
model/intel/human-pose-estimation-0001/FP16-INT8/human-pose-estimation-0001.bin:   0%|          | 0.00/4.03M […

モデルのロード

ダウンロードされたモデルは、ベンダー、モデル名、精度を示す固定構造内にあります。

モデルを実行するには、数行のコードで済みます。まず、OpenVINO ランタイムを初期化します。次に、.bin および .xml ファイルからネットワーク・アーキテクチャーとモデルの重みを読み取り、目的のデバイス用にコンパイルします。OpenVINO を使用して推論を実行するデバイスをドロップダウン・リストから選択します。

import ipywidgets as widgets

core = ov.Core()

device = widgets.Dropdown(
    options=core.available_devices + ["AUTO"],
    value='AUTO',
    description='Device:',
    disabled=False,
)

device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')
# Initialize OpenVINO Runtime
core = ov.Core()
# Read the network from a file.
model = core.read_model(model_path)
# Let the AUTO device decide where to load the model (you can use CPU, GPU as well).
compiled_model = core.compile_model(model=model, device_name=device.value, config={"PERFORMANCE_HINT": "LATENCY"})

# Get the input and output names of nodes.
input_layer = compiled_model.input(0)
output_layers = compiled_model.outputs

# Get the input size.
height, width = list(input_layer.shape)[2:]

入力レイヤーには入力ノードの名前が含まれ、出力レイヤーにはネットワークの出力ノードの名前が含まれます。OpenPose モデルの場合、1 つの入力と 2 つの出力 (PAF とキーポイントのヒートマップ) があります。

input_layer.any_name, [o.any_name for o in output_layers]
('data', ['Mconv7_stage2_L1', 'Mconv7_stage2_L2'])

処理

OpenPose デコーダー

ニューラル・ネットワークからの生の結果を姿勢推定に変換するには、OpenPose デコーダーが必要です。Open Model Zoo で提供されており、human-pose-estimation-0001 モデルと互換性があります。

human-pose-estimation-0001 以外のモデルを選択する場合は、別のデコーダー (AssociativeEmbeddingDecoder など) が必要になります。これは、Open Model Zoo のデモセクションで入手できます。

decoder = OpenPoseDecoder()

処理結果

結果をポーズに変換するいくつかの関数が用意されています。

まず、ヒートマップをプールします。numpy ではプーリングが利用できないため、numpy で直接行う方法を使用してください。次に、非最大抑制を使用してヒートマップからキーポイントを取得します。その後、デコーダーを使用してポーズをデコードします。入力イメージはネットワーク出力よりも大きいため、すべてのポーズ座標にスケール係数を乗算する必要があります。

# 2D pooling in numpy (from: https://stackoverflow.com/a/54966908/1624463)
def pool2d(A, kernel_size, stride, padding, pool_mode="max"):
    """
    2D Pooling

    Parameters:
        A: input 2D array
        kernel_size: int, the size of the window
        stride: int, the stride of the window
        padding: int, implicit zero paddings on both sides of the input
        pool_mode: string, 'max' or 'avg'
    """
    # Padding
    A = np.pad(A, padding, mode="constant")

    # Window view of A
    output_shape = (
        (A.shape[0] - kernel_size) // stride + 1,
        (A.shape[1] - kernel_size) // stride + 1,
    )
    kernel_size = (kernel_size, kernel_size)
    A_w = as_strided(
        A,
        shape=output_shape + kernel_size,
        strides=(stride * A.strides[0], stride * A.strides[1]) + A.strides
    )
    A_w = A_w.reshape(-1, *kernel_size)

    # Return the result of pooling.
    if pool_mode == "max":
        return A_w.max(axis=(1, 2)).reshape(output_shape)
    elif pool_mode == "avg":
        return A_w.mean(axis=(1, 2)).reshape(output_shape)


# non maximum suppression
def heatmap_nms(heatmaps, pooled_heatmaps):
    return heatmaps * (heatmaps == pooled_heatmaps)


# Get poses from results.
def process_results(img, pafs, heatmaps):
    # This processing comes from
    # https://github.com/openvinotoolkit/open_model_zoo/blob/master/demos/common/python/models/open_pose.py
    pooled_heatmaps = np.array(
        [[pool2d(h, kernel_size=3, stride=1, padding=1, pool_mode="max") for h in heatmaps[0]]]
    )
    nms_heatmaps = heatmap_nms(heatmaps, pooled_heatmaps)

    # Decode poses.
    poses, scores = decoder(heatmaps, nms_heatmaps, pafs)
    output_shape = list(compiled_model.output(index=0).partial_shape)
    output_scale = img.shape[1] / output_shape[3].get_length(), img.shape[0] / output_shape[2].get_length()
    # Multiply coordinates by a scaling factor.
    poses[:, :, :2] *= output_scale
    return poses, scores

ポーズオーバーレイの描画

画像上にポーズ・オーバーレイを描画して、推定されたポーズを視覚化します。関節は円として描画され、手足は線で描画されます。このコードは、Open Model Zoo の Human Pose Estimation Demo をベースにしています。

colors = ((255, 0, 0), (255, 0, 255), (170, 0, 255), (255, 0, 85), (255, 0, 170), (85, 255, 0),
          (255, 170, 0), (0, 255, 0), (255, 255, 0), (0, 255, 85), (170, 255, 0), (0, 85, 255),
          (0, 255, 170), (0, 0, 255), (0, 255, 255), (85, 0, 255), (0, 170, 255))

default_skeleton = ((15, 13), (13, 11), (16, 14), (14, 12), (11, 12), (5, 11), (6, 12), (5, 6), (5, 7),
                    (6, 8), (7, 9), (8, 10), (1, 2), (0, 1), (0, 2), (1, 3), (2, 4), (3, 5), (4, 6))


def draw_poses(img, poses, point_score_threshold, skeleton=default_skeleton):
    if poses.size == 0:
        return img

    img_limbs = np.copy(img)
    for pose in poses:
        points = pose[:, :2].astype(np.int32)
        points_scores = pose[:, 2]
        # Draw joints.
        for i, (p, v) in enumerate(zip(points, points_scores)):
            if v > point_score_threshold:
                cv2.circle(img, tuple(p), 1, colors[i], 2)
        # Draw limbs.
        for i, j in skeleton:
            if points_scores[i] > point_score_threshold and points_scores[j] > point_score_threshold:
                cv2.line(img_limbs, tuple(points[i]), tuple(points[j]), color=colors[j], thickness=4)
    cv2.addWeighted(img, 0.4, img_limbs, 0.6, 0, dst=img)
    return img

メイン処理関数

指定されたソースで姿勢推定を実行します。ウェブカメラまたはビデオファイルのいずれか。

# Main processing function to run pose estimation.
def run_pose_estimation(source=0, flip=False, use_popup=False, skip_first_frames=0):
    pafs_output_key = compiled_model.output("Mconv7_stage2_L1")
    heatmaps_output_key = compiled_model.output("Mconv7_stage2_L2")
    player = None
    try:
        # Create a video player to play with target fps.
        player = utils.VideoPlayer(source, flip=flip, fps=30, skip_first_frames=skip_first_frames)
        # Start capturing.
        player.start()
        if use_popup:
            title = "Press ESC to Exit"
            cv2.namedWindow(title, cv2.WINDOW_GUI_NORMAL | cv2.WINDOW_AUTOSIZE)

        processing_times = collections.deque()

        while True:
            # Grab the frame.
            frame = player.next()
            if frame is None:
                print("Source ended")
                break
            # If the frame is larger than full HD, reduce size to improve the performance.
            scale = 1280 / max(frame.shape)
            if scale < 1:
                frame = cv2.resize(frame, None, fx=scale, fy=scale, interpolation=cv2.INTER_AREA)

            # Resize the image and change dims to fit neural network input.
            # (see https://github.com/openvinotoolkit/open_model_zoo/tree/master/models/intel/human-pose-estimation-0001)
            input_img = cv2.resize(frame, (width, height), interpolation=cv2.INTER_AREA)
            # Create a batch of images (size = 1).
            input_img = input_img.transpose((2,0,1))[np.newaxis, ...]

            # Measure processing time.
            start_time = time.time()
            # Get results.
            results = compiled_model([input_img])
            stop_time = time.time()

            pafs = results[pafs_output_key]
            heatmaps = results[heatmaps_output_key]
            # Get poses from network results.
            poses, scores = process_results(frame, pafs, heatmaps)

            # Draw poses on a frame.
            frame = draw_poses(frame, poses, 0.1)

            processing_times.append(stop_time - start_time)
            # Use processing times from last 200 frames.
            if len(processing_times) > 200:
                processing_times.popleft()

            _, f_width = frame.shape[:2]
            # mean processing time [ms]
            processing_time = np.mean(processing_times) * 1000
            fps = 1000 / processing_time
            cv2.putText(frame, f"Inference time: {processing_time:.1f}ms ({fps:.1f} FPS)", (20, 40),
                        cv2.FONT_HERSHEY_COMPLEX, f_width / 1000, (0, 0, 255), 1, cv2.LINE_AA)

            # Use this workaround if there is flickering.
            if use_popup:
                cv2.imshow(title, frame)
                key = cv2.waitKey(1)
                # escape = 27
                if key == 27:
                    break
            else:
                # Encode numpy array to jpg.
                _, encoded_img = cv2.imencode(".jpg", frame, params=[cv2.IMWRITE_JPEG_QUALITY, 90])
                # Create an IPython image.
                i = display.Image(data=encoded_img)
                # Display the image in this notebook.
                display.clear_output(wait=True)
                display.display(i)
    # ctrl-c
    except KeyboardInterrupt:
        print("Interrupted")
    # any different error
    except RuntimeError as e:
        print(e)
    finally:
        if player is not None:
            # Stop capturing.
            player.stop()
        if use_popup:
            cv2.destroyAllWindows()

実行

リアルタイムの姿勢推定の実行

ウェブカメラをビデオ入力として使用します。デフォルトでは、プライマリー・ウェブカメラは source=0 に設定されます。複数のウェブカメラがある場合、0 から始まる連続した番号が割り当てられます。前面カメラを使用する場合は、flip=True を設定します。一部のウェブブラウザー、特に Mozilla Firefox ではちらつきが発生する場合があります。ちらつきが発生する場合は、use_popup=True を設定してください。

注: このノートブックをウェブカメラで使用するには、ウェブカメラを備えたコンピューター上でノートブックを実行する必要があります。ノートブックをサーバー (Binder など) 上で実行する場合、ウェブカメラは機能しません。このノートブックをリモート・コンピューター (Binder など) で実行する場合、ポップアップ・モードは機能しない可能性があります。

ウェブカメラがない場合でも、ビデオファイルを使用してこのデモを実行できます。OpenCV でサポートされている形式であればどれでも機能します。最初の N フレームをスキップしてビデオを早送りできます。

姿勢推定を実行します。

USE_WEBCAM = False
cam_id = 0
video_file = "https://github.com/intel-iot-devkit/sample-videos/blob/master/store-aisle-detection.mp4?raw=true"
source = cam_id if USE_WEBCAM else video_file

additional_options = {"skip_first_frames": 500} if not USE_WEBCAM else {}
run_pose_estimation(source=source, flip=isinstance(source, int), use_popup=False, **additional_options)
../_images/402-pose-estimation-with-output_20_0.png
Source ended