産業用メーター読み取り#

この Jupyter ノートブックはオンラインで起動でき、ブラウザーのウィンドウで対話型環境を開きます。ローカルにインストールすることもできます。次のオプションのいずれかを選択します:

BinderGitHub

このノートブックでは、OpenVINO ランタイムを使用して産業用メーターリーダーを作成する方法を説明します。事前トレーニングされた PPYOLOv2 PaddlePaddle モデルと DeepLabV3P を使用して、複数の推論タスク・パイプラインを構築します:

  1. 検出モデルを実行してメーターを見つけ、元の写真からトリミングします。

  2. これらのトリミングされたメーターに対してセグメント化モデルを実行して、ポインターとスケールのインスタンスを取得します。

  3. スケールマップでポインターの位置を見つけます。

workflow

ワークフロー#

目次:

import platform 

# openvino パッケージをインストール 
%pip install -q "openvino>=2023.1.0" opencv-python tqdm 

if platform.system() != "Windows":
    %pip install -q "matplotlib>=3.4" 
else: 
    %pip install -q "matplotlib>=3.4,<3.7"
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.

インポート#

import os 
from pathlib import Path 
import numpy as np 
import math 
import cv2 
import tarfile 
import matplotlib.pyplot as plt 
import openvino as ov 

# `notebook_utils` モジュールを取得 
import requests 

r = requests.get( 
    url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/notebook_utils.py", 
) 

open("notebook_utils.py", "w").write(r.text) 
from notebook_utils import download_file, segmentation_map_to_image

モデルとテストイメージを準備#

PaddlePaddle コミュニティーから PPYOLOv2 および DeepLabV3P の事前トレーニング済みモデルをダウンロードします。

MODEL_DIR = "model" 
DATA_DIR = "data" 
DET_MODEL_LINK = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/models/meter-reader/meter_det_model.tar.gz" 
SEG_MODEL_LINK = "https://storage.openvinotoolkit.org/repositories/openvino_notebooks/models/meter-reader/meter_seg_model.tar.gz" 
DET_FILE_NAME = DET_MODEL_LINK.split("/")[-1] 
SEG_FILE_NAME = SEG_MODEL_LINK.split("/")[-1] 
IMG_LINK = "https://user-images.githubusercontent.com/91237924/170696219-f68699c6-1e82-46bf-aaed-8e2fc3fa5f7b.jpg" 
IMG_FILE_NAME = IMG_LINK.split("/")[-1] 
IMG_PATH = Path(f"{DATA_DIR}/{IMG_FILE_NAME}") 

os.makedirs(MODEL_DIR, exist_ok=True) 

download_file(DET_MODEL_LINK, directory=MODEL_DIR, show_progress=True) 
file = tarfile.open(f"model/{DET_FILE_NAME}") 
res = file.extractall("model") 
if not res: 
    print(f'Detection Model Extracted to "./{MODEL_DIR}".') 
else: 
    print("Error Extracting the Detection model.Please check the network.") 

download_file(SEG_MODEL_LINK, directory=MODEL_DIR, show_progress=True) 
file = tarfile.open(f"model/{SEG_FILE_NAME}") 
res = file.extractall("model") 
if not res: 
    print(f'Segmentation Model Extracted to "./{MODEL_DIR}".') 
else: 
    print("Error Extracting the Segmentation model.Please check the network.") 

download_file(IMG_LINK, directory=DATA_DIR, show_progress=True) 
if IMG_PATH.is_file(): 
    print(f'Test Image Saved to "./{DATA_DIR}".') 
else: 
    print("Error Downloading the Test Image.Please check the network.")
model/meter_det_model.tar.gz: 0%|          | 0.00/192M [00:00<?, ?B/s]
Detection Model Extracted to "./model".
model/meter_seg_model.tar.gz: 0%|          | 0.00/94.9M [00:00<?, ?B/s]
Segmentation Model Extracted to "./model".
data/170696219-f68699c6-1e82-46bf-aaed-8e2fc3fa5f7b.jpg: 0%|          | 0.00/183k [00:00<?, ?B/s]
Test Image Saved to "./data".

設定

読み取り計算用のパラメーター設定を追加します。

METER_SHAPE = [512, 512] 
CIRCLE_CENTER = [256, 256] 
CIRCLE_RADIUS = 250 
PI = math.pi 
RECTANGLE_HEIGHT = 120 
RECTANGLE_WIDTH = 1570 
TYPE_THRESHOLD = 40 
COLORMAP = np.array([[28, 28, 28], [238, 44, 44], [250, 250, 250]]) 

# テスト画像データセットには 2 種類のメーターがあります 
METER_CONFIG = [ 
    {"scale_interval_value": 25.0 / 50.0, "range": 25.0, "unit": "(MPa)"}, 
    {"scale_interval_value": 1.6 / 32.0, "range": 1.6, "unit": "(MPa)"}, ] 

SEG_LABEL = {"background": 0, "pointer": 1, "scale": 2}

モデルのロード#

モデルの読み込みと推論の共通クラスを定義します

# OpenVINO ランタイムを初期化 
core = ov.Core() 

class Model: 
    """ 
    This class represents a OpenVINO model object.
    """ 

    def __init__(self, model_path, new_shape, device="CPU"): 
        """ 
        Initialize the model object 

        Param: 
            model_path (string): path of inference model 
            new_shape (dict): new shape of model input 
        """ 
        self.model = core.read_model(model=model_path) 
        self.model.reshape(new_shape) 
        self.compiled_model = core.compile_model(model=self.model, device_name=device) 
        self.output_layer = self.compiled_model.output(0) 

    def predict(self, input_image): 
        """ 
        Run inference 

        Param: 
            input_image (np.array): input data 

        Retuns: 
            result (np.array)): model output data 
        """ 
        result = self.compiled_model(input_image)[self.output_layer] 
        return result

データ処理#

各モデルの前処理タスクと後処理タスクが含まれます。

def det_preprocess(input_image, target_size): 
    """ 
    Preprocessing the input data for detection task 

    Param: 
        input_image (np.array): input data 
        size (int): the image size required by model input layer
    Retuns: 
        img.astype (np.array): preprocessed image 
    """ 
    img = cv2.resize(input_image, (target_size, target_size)) 
    img = np.transpose(img, [2, 0, 1]) / 255 
    img = np.expand_dims(img, 0) 
    img_mean = np.array([0.485, 0.456, 0.406]).reshape((3, 1, 1)) 
    img_std = np.array([0.229, 0.224, 0.225]).reshape((3, 1, 1)) 
    img -= img_mean 
    img /= img_std 
    return img.astype(np.float32) 

def filter_bboxes(det_results, score_threshold): 
    """ 
    Filter out the detection results with low confidence 

    Param: 
        det_results (list[dict]): detection results 
        score_threshold (float): confidence threshold 

    Retuns: 
        filtered_results (list[dict]): filter detection results 
    """ 
    filtered_results = [] 
    for i in range(len(det_results)): 
        if det_results[i, 1] > score_threshold: 
            filtered_results.append(det_results[i]) 
    return filtered_results 

def roi_crop(image, results, scale_x, scale_y): 
    """ 
    Crop the area of detected meter of original image 

    Param: 
        img (np.array): original image 
        det_results (list[dict]): detection results 
        scale_x (float): the scale value in x axis 
        scale_y (float): the scale value in y axis 

    Retuns: 
        roi_imgs (list[np.array]): the list of meter images 
        loc (list[int]): the list of meter locations 
    """ 
    roi_imgs = [] 
    loc = [] 
    for result in results: 
        bbox = result[2:] 
        xmin, ymin, xmax, ymax = [ 
            int(bbox[0] * scale_x), 
            int(bbox[1] * scale_y), 
            int(bbox[2] * scale_x), 
            int(bbox[3] * scale_y), 
        ] 
        sub_img = image[ymin : (ymax + 1), xmin : (xmax + 1), :] 
        roi_imgs.append(sub_img) 
        loc.append([xmin, ymin, xmax, ymax]) 
    return roi_imgs, loc 

def roi_process(input_images, target_size, interp=cv2.INTER_LINEAR): 
    """ 
    Prepare the roi image of detection results data 
    Preprocessing the input data for segmentation task 

    Param: 
        input_images (list[np.array]): the list of meter images 
        target_size (list|tuple): height and width of resized image, e.g [heigh,width] 
        interp (int): the interp method for image reszing 

    Retuns: 
        img_list (list[np.array]): the list of processed images 
        resize_img (list[np.array]): for visualization 

    """ 
    img_list = list() 
    resize_list = list() 
    for img in input_images: 
        img_shape = img.shape 
        scale_x = float(target_size[1]) / float(img_shape[1]) 
        scale_y = float(target_size[0]) / float(img_shape[0]) 
        resize_img = cv2.resize(img, None, None, fx=scale_x, fy=scale_y, interpolation=interp) 
        resize_list.append(resize_img) 
        resize_img = resize_img.transpose(2, 0, 1) / 255 
        img_mean = np.array([0.5, 0.5, 0.5]).reshape((3, 1, 1)) 
        img_std = np.array([0.5, 0.5, 0.5]).reshape((3, 1, 1)) 
        resize_img -= img_mean 
        resize_img /= img_std 
        img_list.append(resize_img) 
    return img_list, resize_list 

def erode(seg_results, erode_kernel): 
    """ 
    Erode the segmentation result to get the more clear instance of pointer and scale 

    Param: 
        seg_results (list[dict]): segmentation results 
        erode_kernel (int): size of erode_kernel 

    Return: 
        eroded_results (list[dict]): the lab map of eroded_results 
    """ 
    kernel = np.ones((erode_kernel, erode_kernel), np.uint8) 
    eroded_results = seg_results 
    for i in range(len(seg_results)): 
        eroded_results[i] = cv2.erode(seg_results[i].astype(np.uint8), kernel) 
    return eroded_results 

def circle_to_rectangle(seg_results): 
    """ 
    Switch the shape of label_map from circle to rectangle 

    Param: 
        seg_results (list[dict]): segmentation results 

    Return: 
        rectangle_meters (list[np.array]): the rectangle of label map 

    """ 
    rectangle_meters = list() 
    for i, seg_result in enumerate(seg_results): 
        label_map = seg_result 

        # rectangle_meter のサイズは RECTANGLE_HEIGHT と RECTANGLE_WIDTH によって決まります 
        rectangle_meter = np.zeros((RECTANGLE_HEIGHT, RECTANGLE_WIDTH), dtype=np.uint8) 
        for row in range(RECTANGLE_HEIGHT): 
            for col in range(RECTANGLE_WIDTH): 
                theta = PI * 2 * (col + 1) / RECTANGLE_WIDTH 

                # メートル円の半径は長方形画像の高さにマッピングされます 
                rho = CIRCLE_RADIUS - row - 1 
                y = int(CIRCLE_CENTER[0] + rho * math.cos(theta) + 0.5) 
                x = int(CIRCLE_CENTER[1] - rho * math.sin(theta) + 0.5) 
                rectangle_meter[row, col] = label_map[y, x] 
        rectangle_meters.append(rectangle_meter) 
    return rectangle_meters 

def rectangle_to_line(rectangle_meters): 
    """ 
    Switch the dimension of rectangle label map from 2D to 1D 

    Param: 
            rectangle_meters (list[np.array]): 2D rectangle OF label_map。 

    Return: 
        line_scales (list[np.array]): the list of scales value 
        line_pointers (list[np.array]): the list of pointers value 

    """ 
    line_scales = list() 
    line_pointers = list() 
    for rectangle_meter in rectangle_meters: 
        height, width = rectangle_meter.shape[0:2] 
        line_scale = np.zeros((width), dtype=np.uint8) 
        line_pointer = np.zeros((width), dtype=np.uint8) 
        for col in range(width): 
            for row in range(height): 
                if rectangle_meter[row, col] == SEG_LABEL["pointer"]: 
                    line_pointer[col] += 1 
                elif rectangle_meter[row, col] == SEG_LABEL["scale"]: 
                    line_scale[col] += 1 
        line_scales.append(line_scale) 
        line_pointers.append(line_pointer) 
    return line_scales, line_pointers 

def mean_binarization(data_list): 
    """ 
    Binarize the data 

    Param: 
        data_list (list[np.array]): input data 

    Return: 
        binaried_data_list (list[np.array]): output data 
    """ 
    batch_size = len(data_list) 
    binaried_data_list = data_list 
    for i in range(batch_size): 
        mean_data = np.mean(data_list[i]) 
        width = data_list[i].shape[0] 
        for col in range(width): 
            if data_list[i][col] < mean_data: 
                binaried_data_list[i][col] = 0 
            else: 
                binaried_data_list[i][col] = 1 
    return binaried_data_list 

def locate_scale(line_scales): 
    """ 
    Find location of center of each scale 

    Param: 
        line_scales (list[np.array]): the list of binaried scales value 

    Return: 
        scale_locations (list[list]): location of each scale 
    """ 
    batch_size = len(line_scales) 
    scale_locations = list() 
    for i in range(batch_size): 
        line_scale = line_scales[i] 
        width = line_scale.shape[0] 
        find_start = False 
        one_scale_start = 0 
        one_scale_end = 0 
        locations = list() 
        for j in range(width - 1): 
            if line_scale[j] > 0 and line_scale[j + 1] > 0: 
                if not find_start: 
                    one_scale_start = j 
                    find_start = True 
            if find_start: 
                if line_scale[j] == 0 and line_scale[j + 1] == 0: 
                    one_scale_end = j - 1 
                    one_scale_location = (one_scale_start + one_scale_end) / 2 
                    locations.append(one_scale_location) 
                    one_scale_start = 0 
                    one_scale_end = 0 
                    find_start = False 
        scale_locations.append(locations) 
    return scale_locations 

def locate_pointer(line_pointers): 
    """ 
    Find location of center of pointer 

    Param: 
        line_scales (list[np.array]): the list of binaried pointer value 

    Return: 
        scale_locations (list[list]): location of pointer
 
    """ 
    batch_size = len(line_pointers) 
    pointer_locations = list() 
    for i in range(batch_size): 
        line_pointer = line_pointers[i] 
        find_start = False 
        pointer_start = 0 
        pointer_end = 0 
        location = 0 
        width = line_pointer.shape[0] 
        for j in range(width - 1): 
            if line_pointer[j] > 0 and line_pointer[j + 1] > 0: 
                if not find_start: 
                    pointer_start = j 
                    find_start = True 
            if find_start: 
                if line_pointer[j] == 0 and line_pointer[j + 1] == 0: 
                    pointer_end = j - 1 
                    location = (pointer_start + pointer_end) / 2 
                    find_start = False 
                    break 
        pointer_locations.append(location) 
    return pointer_locations 

def get_relative_location(scale_locations, pointer_locations): 
    """ 
    Match location of pointer and scales 

    Param: 
        scale_locations (list[list]): location of each scale 
        pointer_locations (list[list]): location of pointer 

    Return: 
        pointed_scales (list[dict]): a list of dict with:
                                    'num_scales': total number of scales 
                                    'pointed_scale': predicted number of scales 
    """ 
    pointed_scales = list() 
    for scale_location, pointer_location in zip(scale_locations, pointer_locations): 
        num_scales = len(scale_location) 
        pointed_scale = -1 
        if num_scales > 0: 
            for i in range(num_scales - 1): 
                if scale_location[i] <= pointer_location < scale_location[i + 1]: 
                    pointed_scale = i + (pointer_location - scale_location[i]) / (scale_location[i + 1] - scale_location[i] + 1e-05) + 1 
        result = {"num_scales": num_scales, "pointed_scale": pointed_scale} 
        pointed_scales.append(result) 
    return pointed_scales 

def calculate_reading(pointed_scales): 
    """ 
    Calculate the value of meter according to the type of meter 

    Param: 
        pointed_scales (list[list]): predicted number of scales 

    Return: 
        readings (list[float]): the list of values read from meter 

    """ 
    readings = list() 
    batch_size = len(pointed_scales) 
    for i in range(batch_size): 
        pointed_scale = pointed_scales[i] 
        # 目盛りの総数に応じてメーターの種類を見つける 
        if pointed_scale["num_scales"] > TYPE_THRESHOLD: 
            reading = pointed_scale["pointed_scale"] * METER_CONFIG[0]["scale_interval_value"] 
        else: 
            reading = pointed_scale["pointed_scale"] * METER_CONFIG[1]["scale_interval_value"] 
        readings.append(reading) 
    return readings

メイン関数#

モデルとパラメーターを初期化#

OpenVINO を使用して推論を実行するためにドロップダウン・リストからデバイスを選択します

import ipywidgets as widgets 

device = widgets.Dropdown( 
    options=core.available_devices + ["AUTO"], 
    value="AUTO", 
    description="Device:", 
    disabled=False, 
) 

device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')

検出ネットワークから検出されるメーターの数は、シナリオによって任意である可能性があります。つまり、セグメント化ネットワーク入力のバッチサイズは動的次元であり、静的次元に使用される正の数ではなく -1 または ov::Dimension() として指定する必要があります。この場合、メモリー消費を最適化するため、入力バッチサイズの下限および上限を指定できます。

img_file = f"{DATA_DIR}/{IMG_FILE_NAME}" 
det_model_path = f"{MODEL_DIR}/meter_det_model/model.pdmodel" 
det_model_shape = {
    "image": [1, 3, 608, 608], 
    "im_shape": [1, 2], 
    "scale_factor": [1, 2], 
} 
seg_model_path = f"{MODEL_DIR}/meter_seg_model/model.pdmodel" 
seg_model_shape = {"image": [ov.Dimension(1, 2), 3, 512, 512]} 

erode_kernel = 4 
score_threshold = 0.5 
seg_batch_size = 2 
input_shape = 608 

# モデル・オブジェクトを初期化 
detector = Model(det_model_path, det_model_shape, device.value) 
segmenter = Model(seg_model_path, seg_model_shape, device.value) 

# 元の入力写真を視覚化 
image = cv2.imread(img_file) 
rgb_image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB) 
plt.imshow(rgb_image)
<matplotlib.image.AxesImage at 0x7f2d2dd77130>
../_images/meter-reader-with-output_16_1.png

メーター検出モデルの実行#

メーターの位置を検出し、セグメント化用の ROI 画像を準備します。

# メーター検出モデルの入力データを準備 
im_shape = np.array([[input_shape, input_shape]]).astype("float32") 
scale_factor = np.array([[1, 2]]).astype("float32") 
input_image = det_preprocess(image, input_shape) 
inputs_dict = {"image": input_image, "im_shape": im_shape, "scale_factor": scale_factor} 

# 走行メーター検出モデル 
det_results = detector.predict(inputs_dict) 

# 信頼性の低い境界ボックスを除外 
filtered_results = filter_bboxes(det_results, score_threshold) 

# メーターのセグメント化モデルの入力データを準備 
scale_x = image.shape[1] / input_shape * 2 
scale_y = image.shape[0] / input_shape 

# 検出されたメーターごとに個別の画像を作成 
roi_imgs, loc = roi_crop(image, filtered_results, scale_x, scale_y) 
roi_imgs, resize_imgs = roi_process(roi_imgs, METER_SHAPE) 

# 検出結果の画像を作成 
roi_stack = np.hstack(resize_imgs) 

if cv2.imwrite(f"{DATA_DIR}/detection_results.jpg", roi_stack): 
    print('The detection result image has been saved as "detection_results.jpg" in data') 
    plt.imshow(cv2.cvtColor(roi_stack, cv2.COLOR_BGR2RGB))
The detection result image has been saved as "detection_results.jpg" in data
../_images/meter-reader-with-output_18_1.png

メーターセグメント化モデルの実行#

検出された ROI に対するセグメント化タスクの結果を取得します。

seg_results = list() 
mask_list = list() 
num_imgs = len(roi_imgs) 

# 検出されたすべてのメーターに対してメーターセグメント化モデルを実行 
for i in range(0, num_imgs, seg_batch_size): 
    batch = roi_imgs[i : min(num_imgs, i + seg_batch_size)] 
    seg_result = segmenter.predict({"image": np.array(batch)}) 
    seg_results.extend(seg_result) 
results = [] 
for i in range(len(seg_results)): 
    results.append(np.argmax(seg_results[i], axis=0)) 
seg_results = erode(results, erode_kernel) 

# セグメント化結果の画像を作成 
for i in range(len(seg_results)): 
    mask_list.append(segmentation_map_to_image(seg_results[i], COLORMAP)) 
mask_stack = np.hstack(mask_list) 

if cv2.imwrite(f"{DATA_DIR}/segmentation_results.jpg", cv2.cvtColor(mask_stack, cv2.COLOR_RGB2BGR)): 
    print('The segmentation result image has been saved as "segmentation_results.jpg" in data') 
    plt.imshow(mask_stack)
The segmentation result image has been saved as "segmentation_results.jpg" in data
../_images/meter-reader-with-output_20_1.png

モデルの結果を後処理し最終的な読み取り値を計算#

OpenCV 関数を使用して、スケールマップ内のポインターの位置を見つけます。

# スケールマップでポインターの位置を見つけ、メーターの読み取り値を計算 
rectangle_meters = circle_to_rectangle(seg_results) 
line_scales, line_pointers = rectangle_to_line(rectangle_meters) 
binaried_scales = mean_binarization(line_scales) 
binaried_pointers = mean_binarization(line_pointers) 
scale_locations = locate_scale(binaried_scales) 
pointer_locations = locate_pointer(binaried_pointers) 
pointed_scales = get_relative_location(scale_locations, pointer_locations) 
meter_readings = calculate_reading(pointed_scales) 

rectangle_list = list() 
# 長方形のメートルをプロット 
for i in range(len(rectangle_meters)): 
    rectangle_list.append(segmentation_map_to_image(rectangle_meters[i], COLORMAP)) 
rectangle_meters_stack = np.hstack(rectangle_list) 

if cv2.imwrite( 
    f"{DATA_DIR}/rectangle_meters.jpg", 
    cv2.cvtColor(rectangle_meters_stack, cv2.COLOR_RGB2BGR), 
): 
    print('The rectangle_meters result image has been saved as "rectangle_meters.jpg" in data') 
    plt.imshow(rectangle_meters_stack)
The rectangle_meters result image has been saved as "rectangle_meters.jpg" in data
../_images/meter-reader-with-output_22_1.png

メーターの画像で読み取り結果を取得#

# 最終結果の写真を読み取り作成 
for i in range(len(meter_readings)): 
    print("Meter {}: {:.3f}".format(i + 1, meter_readings[i])) 

result_image = image.copy() 
for i in range(len(loc)): 
    cv2.rectangle(result_image, (loc[i][0], loc[i][1]), (loc[i][2], loc[i][3]), (0, 150, 0), 3) 
    font = cv2.FONT_HERSHEY_SIMPLEX 
    cv2.rectangle( 
        result_image, 
        (loc[i][0], loc[i][1]), 
        (loc[i][0] + 100, loc[i][1] + 40), 
        (0, 150, 0), 
        -1, 
    ) 
    cv2.putText( 
        result_image, "#{:.3f}".format(meter_readings[i]), 
        (loc[i][0], loc[i][1] + 25), 
        font, 
        0.8, 
        (255, 255, 255), 
        2, 
        cv2.LINE_AA, 
    ) 
if cv2.imwrite(f"{DATA_DIR}/reading_results.jpg", result_image): 
    print('The reading results image has been saved as "reading_results.jpg" in data') 
    plt.imshow(cv2.cvtColor(result_image, cv2.COLOR_BGR2RGB))
Meter 1: 1.100 
Meter 2: 6.185 
The reading results image has been saved as "reading_results.jpg" in data
../_images/meter-reader-with-output_24_1.png

あなたのメーター写真で試してください#