Magika: OpenVINO を使用した AI による高速かつ効率良いファイルタイプ識別#

この Jupyter ノートブックはオンラインで起動でき、ブラウザーのウィンドウで対話型環境を開きます。ローカルにインストールすることもできます。次のオプションのいずれかを選択します:

BinderGoogle ColabGitHub

Magika は、最新のディープラーニングを活用して正確な検出を実現する、AI を利用した新しいファイルタイプ検出ツールです。Magika は、内部的にわずか 1MB 程の高度に最適化されたカスタムモデルを採用しており、単一 CPU で実行している場合でも数ミリ秒以内に正確なファイル識別を可能にします。

ファイルタイプの識別が難しい理由#

コンピューターの初期の頃から、ファイルの種類を正確に検出することは、ファイルの処理方法を決定する上で非常に重要でした。Linux* には、50 年以上にわたってファイルタイプ識別の標準として機能してきた libmagicfile ユーティリティーがあります。現在、ウェブブラウザー、コードエディター、その他のソフトウェアは、ファイルタイプ検出を利用して、ファイルを適切にレンダリングする方法を決定しています。例えば、最新のコードエディターでは、ファイルタイプ検出を使用して、開発者が新しいファイルで入力を開始するときに使用する構文の色彩の配合スキームを選択します。

ファイル形式ごとに構造が異なるか、全く構造がないため、ファイルタイプを正確に検出することは非常に難しい課題です。これは、テキスト形式とプログラミング言語の構造が非常に似ているため、特に困難になります。これまで、libmagicc や他のファイルタイプ識別ソフトウェアは、各ファイル形式を検出するために、手作業で作成されたヒューリスティックとカスタムルールのコレクションに依存してきました。

この手動アプローチは、手作業で一般化されたルールを作成するのが難しいため、時間がかかり、エラーが発生しやすくなります。特にセキュリティー・アプリケーションでは、攻撃者が敵対的に作成されたペイロードで検出を混乱させようと試みているため、信頼性の高い検出を行うことは特に困難です。

この問題を解決し、高速かつ正確なファイルタイプの検出を実現するため、Magika が開発されました。アプローチとモデルの詳細については、元のリポジトリーGoogle のブログ投稿をご覧ください。

このチュートリアルでは、OpenVINO のパワーを Magika に導入する方法について説明します。

目次:

必要条件#

%pip install -q magika "openvino>=2024.1.0" "gradio>=4.19"
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts. openvino-dev 2024.2.0 requires openvino==2024.2.0, but you have openvino 2024.3.0.dev20240711 which is incompatible. 
tensorflow 2.12.0 requires numpy<1.24,>=1.22, but you have numpy 1.24.4 which is incompatible. Note: you may need to restart the kernel to use updated packages.

モデル・ロード・クラスの定義#

推論時に、Magika は ONNX を推論エンジンとして使用し、ファイルが数ミリ秒で識別されるようにします。これは、CPU でも非 AI ツールとほぼ同じ速度です。以下のコードは、OpenVINO API を使用してオリジナルの Magika 推論クラスを拡張します。提供されるコードは、オリジナルの Magika Python API と完全に互換性があります。

import time 
from pathlib import Path 
from functools import partial 
from typing import List, Tuple, Optional, Dict 

from magika import Magika 
from magika.types import ModelFeatures, ModelOutput, MagikaResult 
from magika.prediction_mode import PredictionMode 
import numpy.typing as npt 
import numpy as np 

import openvino as ov 

class OVMagika(Magika): 
    def __init__( 
        self, 
        model_dir: Optional[Path] = None, 
        prediction_mode: PredictionMode = PredictionMode.HIGH_CONFIDENCE, 
        no_dereference: bool = False, 
        verbose: bool = False, 
        debug: bool = False, 
        use_colors: bool = False, 
        device="CPU", 
    ) -> None: 
        self._device = device 
        super().__init__(model_dir, prediction_mode, no_dereference, verbose, debug, use_colors) 

    def _init_onnx_session(self):
        # OpenVINO を使用したオーバーロード・モデルの読み込み 
        start_time = time.time() 
        core = ov.Core() 
        ov_model = core.compile_model(self._model_path, self._device.upper()) 
        elapsed_time = 1000 * (time.time() - start_time) 
        self._log.debug(f'ONNX DL model "{self._model_path}" loaded in {elapsed_time:.03f} ms on {self._device}') 
        return ov_model 

    def _get_raw_predictions(self, features: List[Tuple[Path, ModelFeatures]]) -> npt.NDArray:         """ 
        Given a list of (path, features), return a (files_num, features_size) 
        matrix encoding the predictions.         """ 

        dataset_format = self._model_config["train_dataset_info"]["dataset_format"] 
        assert dataset_format == "int-concat/one-hot" 
        start_time = time.time() 
        X_bytes = [] 
        for _, fs in features: 
            sample_bytes = [] 
            if self._input_sizes["beg"] > 0: 
                sample_bytes.extend(fs.beg[: self._input_sizes["beg"]]) 
            if self._input_sizes["mid"] > 0: 
                sample_bytes.extend(fs.mid[: self._input_sizes["mid"]]) 
            if self._input_sizes["end"] > 0: 
                sample_bytes.extend(fs.end[-self._input_sizes["end"] :])         X_bytes.append(sample_bytes) X = np.array(X_bytes).astype(np.float32) 
        elapsed_time = time.time() - start_time 
        self._log.debug(f"DL input prepared in {elapsed_time:.03f} seconds") 

        start_time = time.time() 
        raw_predictions_list = [] 
        samples_num = X.shape[0] 

        max_internal_batch_size = 1000 
        batches_num = samples_num // max_internal_batch_size 
        if samples_num % max_internal_batch_size != 0: 
            batches_num += 1 

        for batch_idx in range(batches_num): 
            self._log.debug(f"Getting raw predictions for (internal) batch {batch_idx+1}/{batches_num}") 
            start_idx = batch_idx * max_internal_batch_size 
            end_idx = min((batch_idx + 1) * max_internal_batch_size, samples_num) 
            batch_raw_predictions = self._onnx_session({"bytes": X[start_idx:end_idx, :]})["target_label"] 
            raw_predictions_list.append(batch_raw_predictions) 
        elapsed_time = time.time() - start_time 
        self._log.debug(f"DL raw prediction in {elapsed_time:.03f} seconds") 
        return np.concatenate(raw_predictions_list) 

    def _get_topk_model_outputs_from_features(self, all_features: List[Tuple[Path, ModelFeatures]], k: int = 5) -> List[Tuple[Path, List[ModelOutput]]]:         """ 
        Helper function for getting top k the highest ranked model results for each feature 
        """ 
        raw_preds = self._get_raw_predictions(all_features) 
        top_preds_idxs = np.argsort(raw_preds, axis=1)[:, -k:][:, ::-1] 
        scores = [raw_preds[i, idx] for i, idx in enumerate(top_preds_idxs)] 
        results = [] 
        for (path, _), scores, top_idxes in zip(all_features, raw_preds, top_preds_idxs): 
            model_outputs_for_path = [] 
                for idx in top_idxes: 
                    ct_label = self._target_labels_space_np[idx] 
                    score = scores[idx] 
                    model_outputs_for_path.append(ModelOutput(ct_label=ct_label, score=float(score))) 
                results.append((path, model_outputs_for_path)) 
            return results 

    def _get_results_from_features_topk(self, all_features: List[Tuple[Path, ModelFeatures]], top_k=5) -> Dict[str, MagikaResult]:         """ 
        Helper function for getting top k the highest ranked model results for each feature 
        """ 
        # 必要なファイルに対して推論を実行するようになりました 

        if len(all_features) == 0:
            # nothing to be done 
            return {} 

        outputs: Dict[str, MagikaResult] = {} 

        for path, model_output in self._get_topk_model_outputs_from_features(all_features, top_k):
            # DL モデルのコンテンツ・タイプ・ラベルに加えて、 
            # 他のロジックでそのような結果を上書きすることもできます。 
            # デバッグと情報提供の目的で、JSON 出力には、 
            # 生の DL モデル出力とユーザーに返される最終出力の 
            # 両方が保存されます。 
            results = [] 
            for out in model_output: 
                output_ct_label = self._get_output_ct_label_from_dl_result(out.ct_label, out.score) 

                results.append( 
                    self._get_result_from_labels_and_score( 
                                path, 
                                dl_ct_label=out.ct_label, 
                                output_ct_label=output_ct_label, 
                                score=out.score, 
                            ) 
                ) 
            outputs[str(path)] = results return outputs 

    def identify_bytes_topk(self, content: bytes, top_k=5) -> MagikaResult:
        # バイトから topk の結果を取得するヘルパー関数 
        _get_results_from_features = self._get_results_from_features 
        self._get_results_from_features = partial(self._get_results_from_features_topk, top_k=top_k) 
        result = super().identify_bytes(content) 
        self._get_results_from_features = _get_results_from_features 
        return result

OpenVINO モデル推論を実行#

実際のモデル推論の結果を確認します。

デバイスを選択#

ワークを開始するには、ドロップダウン・リストからデバイスを選択します。

import ipywidgets as widgets 

core = ov.Core() 
device = widgets.Dropdown( 
    options=core.available_devices + ["AUTO"], 
    value="AUTO", 
    description="Device:", 
    disabled=False, 
) 

device
Dropdown(description='Device:', index=1, options=('CPU', 'AUTO'), value='AUTO')

モデルを作成#

前述のように、OpenVINO 拡張 OVMagika クラスには、元のクラスと同じ API があります。インターフェイス・インスタンスを作成し、さまざまな入力形式で起動してみます。

ov_magika = OVMagika(device=device.value)

バイト入力で推論を実行#

result = ov_magika.identify_bytes(b"# Example\nThis is an example of markdown!") 
print(f"Content type: {result.output.ct_label} - {result.output.score * 100:.4}%")
Content type: markdown - 99.29%

ファイル入力で推論を実行#

import requests 

input_file = Path("./README.md") 
if not input_file.exists(): 
    r = requests.get("https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/README.md") 
    with open("README.md", "w") as f: 
        f.write(r.text) 
result = ov_magika.identify_path(input_file) 
print(f"Content type: {result.output.ct_label} - {result.output.score * 100:.4}%")
Content type: markdown - 100.0%

インタラクティブなデモ#

これで、独自のファイルでモデルを試すことができます。入力ファイルウィンドウにファイルをアップロードし、送信ボタンをクリックして、予測されるファイルタイプを確認します。

import gradio as gr 

def classify(file_path):     """Classify file using classes listing.     Args: 
        file_path): path to input file 
    Returns: 
        (dict): Mapping between class labels and class probabilities.     """ 
    results = ov_magika.identify_bytes_topk(file_path) 

    return {result.dl.ct_label: float(result.output.score) for result in results} 

demo = gr.Interface( 
    classify, 
    [ 
        gr.File(label="Input file", type="binary"), 
    ], 
    gr.Label(label="Result"), 
    examples=[["./README.md"]], 
    allow_flagging="never", 
) 
try: 
    demo.launch(debug=False) 
except Exception: 
    demo.launch(share=True, debug=False) 
# リモートで起動する場合は、server_name と server_port を指定します 
# demo.launch(server_name='your server name', server_port='server port in int') 
# 詳細については、ドキュメントをご覧ください: https://gradio.app/docs/
ローカル URL で実行中: http://127.0.0.1:7860 パブリックリンクを作成するには、launch()share=True を設定します。