MiniCPM-V2 と OpenVINO による視覚言語アシスタント#

この Jupyter ノートブックは、ローカルへのインストール後にのみ起動できます。

GitHub

MiniCPM-V 2 は、効率的なエンドサイド・デプロイを実現する強力なマルチモーダル大規模言語モデルです。このモデルは、SigLip-400M と MiniCPM-2.4B をベースに構築され、知覚再サンプラーによって接続されています。MiniCPM-V 2.0 には注目すべき機能がいくつかあります: * 多くのベンチマーク (OCRBench、TextVQA、MME、MMB、MathVista など) で人気モデルを上回るパフォーマンスを発揮します。強力な OCR 機能により、シーンテキスト理解において Gemini Pro と同等のパフォーマンスを実現します。* 信頼性のある動作。LLM は幻覚に悩まされていることで知られており、画像に基づいて事実とかけ離れたテキストを生成することがよくあります。MiniCPM-V 2.0 は、信頼性のある動作のためマルチモーダル RLHF を介して調整された最初のエンドサイド LLM です (最新の RLHF-V [CVPR’24] シリーズ技術を使用)。これにより、モデルは Object HalBench での幻覚の防止で GPT-4V と一致するようになります。* あらゆるアスペクト比の高解像度画像。MiniCPM-V 2.0 は、任意のアスペクト比で 180 万ピクセル (例: 1344 x 1344) の画像を受け入れます。これにより、小さな物体や光学的特徴などのきめ細かい視覚情報をより良く認識できるようになります。これは、LLaVA-UHD の最新技術によって実現されています。* 高い効率。視覚的なエンコーディングでは、モデルは知覚再サンプラーにより画像表現をはるかに少ないトークンに圧縮します。これにより、MiniCPM-V 2.0 は、高解像度の画像を扱う場合でも、推論中に有利なメモリーコストと速度で動作できるようになります。* バイリンガル・サポート。MiniCPM-V 2.0 は、英語と中国語の両方で強力なバイリンガル・マルチモーダル機能をサポートします。これは、VisCPM [ICLR’24] の手法である、言語間でのマルチモーダル機能を一般化することによって可能になります。

このチュートリアルでは、マルチモーダル・チャットボットを作成するため MiniCPM-V2 モデルを変換および最適化する方法について説明します。さらに、LLM にステートフル変換を適用する方法と、NNCF を使用した重み圧縮などのモデル最適化手法を紹介します。

目次:

必要条件#

%pip install -q "torch>=2.1" "torchvision" "timm" "transformers>=4.40" "Pillow" "gradio>=4.19" "tqdm" "sentencepiece" --extra-index-url https://download.pytorch.org/whl/cpu 
%pip install -q "openvino>=2024.2.0" "nncf>=2.11.0"
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.

PyTorch モデルをダウンロード#

from transformers import AutoModel, AutoTokenizer 
from pathlib import Path 

model_dir = Path("model") 
text_emb_path = model_dir / "language_model/embed_tokens.xml" 
image_encoder_path = model_dir / "image_encoder.xml" 
llm_path = model_dir / "language_model/language_model.xml" 

model = None 

if not all([text_emb_path.exists(), image_encoder_path.exists(), llm_path.exists()]): 
    model = AutoModel.from_pretrained("openbmb/MiniCPM-V-2", trust_remote_code=True) 
    model.eval() 
    model.config.save_pretrained(model_dir) 
    tokenizer = AutoTokenizer.from_pretrained("openbmb/MiniCPM-V-2", trust_remote_code=True) 
    tokenizer.save_pretrained(model_dir)
Loading checkpoint shards: 0%|          | 0/2 [00:00<?, ?it/s]

モデルを OpenVINO 中間表現に変換#

OpenVINO は、OpenVINO 中間表現 (IR) への変換により PyTorch モデルをサポートします。これには、OpenVINO モデル・トランスフォーメーション API を使用する必要があります。ov.convert_model 関数は、元の PyTorch モデル・インスタンスとトレース用のサンプル入力を受け取り、OpenVINO フレームワークでこのモデルを表す ov.Model を返します。変換されたモデルは、ov.save_model 関数を使用してディスクに保存するか、core.complie_model を使用してデバイスに直接ロードできます。

MiniCPM-V2 は自己回帰トランスフォーマー生成モデルです。つまり、次の各モデルステップは、前のステップからのモデル出力に依存します。生成アプローチは、単語シーケンスの確率分布を条件付きの次の単語分布の積に分解できるという仮定に基づいています。言い換えると、モデルは、停止条件 (最大長の生成されたシーケンスまたは文字列トークンの終了が取得される) に達するまで、以前に生成されたトークンに基づいてループ内の次のトークンを予測します。予測される確率に基づいて次のトークンが選択される方法は、選択されたデコード方法によって決まります。最も一般的なデコード方法の詳細については、このブログをご覧ください。Hugging Face Transformers ライブラリーのモデル生成プロセスのエントリーポイントは、generate メソッドです。パラメーターと構成の詳細については、ドキュメントを参照してください。選択デコード方法論の柔軟性を維持するため、1 つのステップでモデル推論のみを変換します。

推論フローは最初のステップと次のステップで異なります。最初のステップでは、モデルは前処理された入力命令と画像を受け入れ、input_embedding モデルと image_encoder モデルを使用して統合埋め込み空間に変換します。その後、モデルの LLM ベースの部分が入力埋め込みに対して実行され、次に生成されるトークンの確率を予測します。次のステップでは、language_model はサンプリング戦略に基づいて選択され、input_embedding モデルとキャッシュされたアテンション・キーと値によって処理された次のトークン ID のみを受け入れます。出力側は自動回帰であるため、出力トークンの非表示状態は、その後の生成ステップごとに計算されると同じままになります。したがって、新しいトークンを生成するたびに再計算するのは無駄であるように思えます。キャッシュを使用すると、モデルは計算後に非表示の状態を保存します。モデルは各タイムステップで最後に生成された出力トークンのみを計算し、保存された出力トークンを非表示のトークンに再利用します。これにより、変圧器モデルの生成の複雑さが O(n3) から O(n2) に軽減されます。仕組みの詳細については、この記事を参照してください。

上記をまとめると、モデルは 3 つの部分で構成されています:

  • 入力画像を埋め込み空間にエンコードする画像エンコーダー。SigLIP モデルと Resampler が含まれています。

  • 入力テキストトークンを埋め込み空間に変換する入力埋め込み

  • 画像エンコーダーと入力埋め込みモデルによって提供される入力埋め込みに基づいて回答を生成する言語モデル

各モデルパーツを変換してみます。

テキスト埋め込み#

LLM では、入力埋め込みは言語モデルの一部ですが、マルチモーダルの場合、このモデル部分によって生成される最初のステップの隠し状態は、画像埋め込みと共通の埋め込み空間に統合される必要があります。このモデル部分を再利用し、llm モデル・インスタンスの導入を回避するため、個別に使用します。

import openvino as ov 
import torch 
import gc 

def cleanup_torchscript_cache(): 
    """ 
    Helper for removing cached model representation 
    """
    torch._C._jit_clear_class_registry() 
    torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore() 
    torch.jit._state._clear_class_state() 

if not text_emb_path.exists(): 
    ov_model = ov.convert_model(model.llm.model.embed_tokens, example_input=torch.ones([1, 10], dtype=torch.long)) 

    ov.save_model(ov_model, text_emb_path) 
    del ov_model 
    cleanup_torchscript_cache() 
    gc.collect()
['input']

言語モデル#

言語モデルは、MiniCPM-V の回答の生成を担当します。この部分は、テキスト生成の標準 LLM と非常によく似ています。ここでのモデルでは、ベース LLM として MiniCPM-2.4B を使用します。生成プロセスを最適化し、メモリーをさらに効率良く使用するため、HuggingFace トランスフォーマー API は、入力と出力で use_cache=True パラメーターと past_key_values 引数を使用してモデル状態を外部にキャッシュするメカニズムを提供します。キャッシュを使用すると、モデルは計算後に非表示の状態を保存します。モデルは各タイムステップで最後に生成された出力トークンのみを計算し、保存された出力トークンを非表示のトークンに再利用します。これにより、変圧器モデルの生成の複雑さが O(n3) から O(n2) に軽減されます。このオプションを使用すると、モデルは前のステップの非表示状態 (キャッシュされたアテンション・キーと値) を入力として取得し、さらに現在のステップの非表示状態を出力として提供します。これは、次のすべての反復では、前のステップから取得した新しいトークンと、次のトークン予測を取得するためのキャッシュされたキー値のみを提供するだけで十分であることを意味します。

最新の LLM のようにモデルサイズが大きくなると、アテンション・ブロックの数と過去のキー値テンソルのサイズもそれぞれ増加します。推論サイクルでキャッシュ状態をモデルの入力と出力として処理する方法は、特にチャットボットのシナリオなどで長い入力シーケンスを処理する場合、メモリー制限のあるシステムのボトルネックになる可能性があります。OpenVINO は、モデル内にキャッシュ処理ロジックを保持しながら、モデルからキャッシュテンソルを含む入力と対応する出力を削除する変換を提案します。このようなモデルはステートフルとも呼ばれます。ステートフル・モデルは、2 つの連続する推論呼び出しの間でデータを暗黙的に保存するモデルです。1 回の実行で保存されるテンソルは、state または variable と呼ばれる内部メモリーバッファーに保持され、モデルの出力となることはなく、次の実行に渡される可能性があります。キャッシュを非表示にすると、デバイスに適した表現でキャッシュ値を保存および更新できるようになります。メモリーの消費を削減し、さらにモデルのパフォーマンスを最適化するのに役立ちます。ステートフル・モデルとステートの操作の詳細については、OpenVINO のドキュメントを参照してください。

from typing import Optional, Tuple, List 
from openvino.runtime 
import opset13 import numpy as np 

def model_has_state(ov_model: ov.Model): 
    # TODO: 変数の可用性に基づいてさらに良い方法を提供しますが、OV Python API は必要なメソッドを公開していません 
    return len(ov_model.get_sinks()) > 0 

def model_has_input_output_name(ov_model: ov.Model, name: str): 
    """
    Helper function for checking that model has specified input or output name 

    Parameters: 
        ov_model (ov.Model): # TODO: モデルトポロジーから次元を導き出すことはできますか? 
    name (str): 
        name of input or output 

    Returns:
        True if input or output with requested name exists else False 
    """ 
    return name in sum([list(t.get_names()) for t in ov_model.inputs + ov_model.outputs], []) 

def fuse_cache_reorder( 
    ov_model: ov.Model, 
    not_kv_inputs: List[str], 
    key_value_input_names: List[str], 
    gather_dim: int, 
): 
    """
    Fuses reored_cache during generate cycle into ov.Model. Used with stateful models, because we can not modify model state directly.
 
    Adds a new beam_idx parameter and Gather op per each kv-cache input in a given model. 
    Should be run before make_stateful.Implements optimumum's _reorder_cache 
    inside the model in the beginning of each iteration. 
    Gather works along given gather_dim dimension that may vary from model to model. 
    KV-cache inputs are identified based on names in key_value_input_names. 
    Append the new beam_idx parameter to not_kv_inputs.

    Parameters: 
        ov_model (`ov.Model`): 
            openvino model for processing 
        not_kv_inputs (`List[str]`): 
            list of input nodes in model that not related to past key values 
        key_value_input_names (`List[str]`): 
            list of names for key value input layers 
        gather_dim (int): 
            dimension for gathering cache during reorder pass 
    """

    if model_has_input_output_name(ov_model, "beam_idx"): 
        raise ValueError("Model already has fused cache") 
    input_batch = ov_model.input("inputs_embeds").get_partial_shape()[0] 
    beam_idx = opset13.parameter(name="beam_idx", dtype=ov.Type.i32, shape=ov.PartialShape([input_batch])) 
    beam_idx.output(0).get_tensor().add_names({"beam_idx"}) # why list is not accepted? 
    Ov_model.add_parameters([beam_idx]).add_parameters([beam_idx]) 
    not_kv_inputs.append(ov_model.inputs[-1]) 
    # すべてのキャッシュ・パラメーターを確認し、新しいパラメーター beam_idx によって提供されるインデックスと _reorder_cache を融合 
    for input_name in key_value_input_names: 
        parameter_output_port = ov_model.input(input_name) 
        consumers = parameter_output_port.get_target_inputs() 
        gather = opset13.gather(parameter_output_port, beam_idx, opset13.constant(gather_dim)) 
        for consumer in consumers: 
            consumer.replace_source_output(gather.output(0)) 
    ov_model.validate_nodes_and_infer_types() 

def build_state_initializer(ov_model: ov.Model, batch_dim: int): 
    """ 
    Build initialization ShapeOf Expression for all ReadValue ops 

    Parameters: 
        ov_model (ov.Model): 
            openvino model 
        batch_dim (int): 
            index of dimension corresponding to batch size 
    """ 
    input_ids = ov_model.input("inputs_embeds") 
    batch = opset13.gather( 
        opset13.shape_of(input_ids, output_type="i64"), 
        opset13.constant([0]), 
        opset13.constant(0), 
    ) 
    for op in ov_model.get_ops(): 
        if op.get_type_name() == "ReadValue": 
            dims = [dim.min_length for dim in list(op.get_output_partial_shape(0))] 
            dims[batch_dim] = batch 
            dims = [(opset13.constant(np.array([dim], dtype=np.int64)) if isinstance(dim, int) else dim) for dim in dims] 
            shape = opset13.concat(dims, axis=0) 
            broadcast = opset13.broadcast(opset13.constant(0.0, dtype=op.get_output_element_type(0)), shape) 
            op.set_arguments([broadcast]) 
    ov_model.validate_nodes_and_infer_types() 

def make_stateful( 
    ov_model: ov.Model, 
    not_kv_inputs: List[str], 
    key_value_input_names: List[str], 
    key_value_output_names: List[str], 
    batch_dim: int, 
    num_attention_heads: int, 
    num_beams_and_batch: int = None, 
): 
    """ 
    Hides kv-cache inputs and outputs inside the model as variables.

    Parameters: 
        ov_model (ov.Model): 
            openvino model 
        not_kv_inputs (`List[str]`): 
            list of input nodes in model that not related to past key values 
        key_value_input_names (`List[str]`): 
            list of names for key value input layers 
        key_value_output_names (`List[str]`): 
            list of names for key value input layers 
        batch_dim (int): 
            index of batch dimension in key value layers 
        num_attention_heads (int): 
            number of attention heads for batch dimension initialization 
        num_beams_an_batch (int): 
            precalculated number of beams and batch for shapes initialization 
    """ 
    from openvino._offline_transformations import apply_make_stateful_transformation 

    input_output_map = {} 
    if num_beams_and_batch is not None:
        # モデルの最後から ReadValue に動的次元が伝播されるのを避けるため、input_ids とアテンション・マスクのバッチサイズを設定 
        for input in not_kv_inputs: 
            shape = input.get_partial_shape() 
            if shape.rank.get_length() <= 2: # == 1 for beam_index 
                shape[0] = num_beams_and_batch 
                input.get_node().set_partial_shape(shape) 
    for kv_name_pair in zip(key_value_input_names, key_value_output_names): 
        input_output_map[kv_name_pair[0]] = kv_name_pair[1] 
        if num_beams_and_batch is not None: 
            input = ov_model.input(kv_name_pair[0]) 
            shape = input.get_partial_shape() 
            shape[batch_dim] = num_beams_and_batch * num_attention_heads 
            input.get_node().set_partial_shape(shape) 

    if num_beams_and_batch is not None:
        # 上記の形状が変更された場合の再検証モデル 
        ov_model.validate_nodes_and_infer_types() 

    apply_make_stateful_transformation(ov_model, input_output_map) 
    if num_beams_and_batch is None: 
        build_state_initializer(ov_model, batch_dim) 

def patch_stateful(ov_model): 
    key_value_input_names = [key.get_any_name() for key in ov_model.inputs[2:-1]] 
    key_value_output_names = [key.get_any_name() for key in ov_model.outputs[1:]] 
    not_kv_inputs = [input for input in ov_model.inputs if not any(name in key_value_input_names for name in input.get_names())]
    if not key_value_input_names or not key_value_output_names: 
        return 
    batch_dim = 0 
    num_attention_heads = 1 

    fuse_cache_reorder(ov_model, not_kv_inputs, key_value_input_names, batch_dim) 
    make_stateful( 
        ov_model, 
        not_kv_inputs, 
        key_value_input_names, 
        key_value_output_names, 
        batch_dim, 
        num_attention_heads, 
        None, 
    )
import types 
from transformers.cache_utils import Cache, DynamicCache 
from transformers.modeling_attn_mask_utils import _prepare_4d_causal_attention_mask 
from transformers.modeling_outputs import BaseModelOutputWithPast, CausalLMOutputWithPast 
from typing import Union 

def forward_wrap(self, attention_mask, position_ids, past_key_values, inputs_embeds): 
    result = self._orig_forward( 
        input_ids=None, attention_mask=attention_mask, position_ids=position_ids, past_key_values=past_key_values, inputs_embeds=inputs_embeds 
    ) 
    return tuple(result.values()) 

def _update_causal_mask( 
    self, 
    attention_mask: torch.Tensor, 
    input_tensor: torch.Tensor, 
    cache_position: torch.Tensor, 
    past_key_values: Cache, 
    output_attentions: bool, 
): 
    past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 

    dtype, device = input_tensor.dtype, input_tensor.device 
    min_dtype = torch.finfo(dtype).min 
    sequence_length = input_tensor.shape[1] 

    target_length = attention_mask.shape[-1] if isinstance(attention_mask, torch.Tensor) else past_seen_tokens + sequence_length + 1 

    if attention_mask is not None and attention_mask.dim() == 4:
        # この場合、マスクはすでに反転された形で提供されており、反転やスライスは不要であると想定します 
        if attention_mask.max() != 0: 
            raise ValueError("Custom 4D attention mask should be passed in inverted form with max==0`") 
        causal_mask = attention_mask 
    else: 
        causal_mask = torch.full((sequence_length, target_length), fill_value=min_dtype, dtype=dtype, device=device) 
        if sequence_length != 1: 
            causal_mask = torch.triu(causal_mask, diagonal=1) 
        causal_mask *= torch.arange(target_length, device=device) > cache_position.reshape(-1, 1) 
        causal_mask = causal_mask[None, None, :, :].expand(input_tensor.shape[0], 1, -1, -1) 
        if attention_mask is not None: 
            causal_mask = causal_mask.clone() # インプレース編集のために連続したメモリーにコピー 
            mask_length = attention_mask.shape[-1] 
            padding_mask = causal_mask[:, :, :, :mask_length] + attention_mask[:, None, None, :] 
            padding_mask = padding_mask == 0 
            causal_mask[:, :, :, :mask_length] = causal_mask[:, :, :, :mask_length].masked_fill(padding_mask, min_dtype)

    return causal_mask 

def _model_forward( 
    self, 
    input_ids: torch.LongTensor = None, 
    attention_mask: Optional[torch.Tensor] = None, 
    position_ids: Optional[torch.LongTensor] = None, 
    past_key_values: Optional[List[torch.FloatTensor]] = None, 
    inputs_embeds: Optional[torch.FloatTensor] = None, 
    use_cache: Optional[bool] = None, 
    output_attentions: Optional[bool] = None, 
    output_hidden_states: Optional[bool] = None, 
    return_dict: Optional[bool] = None, 
) -> Union[Tuple, BaseModelOutputWithPast]: 
    output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions 
    output_hidden_states = output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states 
    use_cache = use_cache if use_cache is not None else self.config.use_cache 

    return_dict = return_dict if return_dict is not None else self.config.use_return_dict 

    # input_ids と inputs_embeds を取得 
    if input_ids is not None and inputs_embeds is not None: 
        raise ValueError("You cannot specify both input_ids and inputs_embeds at the same time") 
    elif input_ids is not None: 
        batch_size, seq_length = input_ids.shape[:2] 
    elif inputs_embeds is not None: 
        batch_size, seq_length = inputs_embeds.shape[:2] 
    else: 
        raise ValueError("You have to specify either input_ids or inputs_embeds") 

    past_key_values_length = 0 
    if use_cache: 
        use_legacy_cache = not isinstance(past_key_values, Cache) 
        if use_legacy_cache: 
            past_key_values = DynamicCache.from_legacy_cache(past_key_values) 
        past_key_values_length = past_key_values.get_usable_length(seq_length) 

    if position_ids is None: 
        device = input_ids.device if input_ids is not None else inputs_embeds.device 
        position_ids = torch.arange( 
            past_key_values_length, 
            seq_length + past_key_values_length, 
            dtype=torch.long, 
            device=device, 
        ) 
        position_ids = position_ids.unsqueeze(0) 

    if inputs_embeds is None: 
        inputs_embeds = self.embed_tokens(input_ids) * self.config.scale_emb 
    if self._use_sdpa and not output_attentions:
        # output_attentions=True は SDPA 使用時にはサポートされないため、 
        # すべてのケースで 4D 因果マスクを必要とする手動実装に頼ることになります 
        past_seen_tokens = past_key_values.get_seq_length() if past_key_values is not None else 0 
        cache_position = torch.arange(past_seen_tokens, past_seen_tokens + inputs_embeds.shape[1], device=inputs_embeds.device) 
        attention_mask = self._update_causal_mask(attention_mask, inputs_embeds, cache_position, past_key_values, output_attentions) 
    else:
        # 4D マスクはレイヤーを通過 
        attention_mask = _prepare_4d_causal_attention_mask( 
            attention_mask, 
            (batch_size, seq_length), 
            inputs_embeds, 
            past_key_values_length, 
        ) 

    # 埋め込み位置 
    hidden_states = inputs_embeds 

    # デコーダーレイヤー 
    all_hidden_states = () if output_hidden_states else None 
    all_self_attns = () if output_attentions else None 
    next_decoder_cache = None 

    for decoder_layer in self.layers: 
        if output_hidden_states: 
            all_hidden_states += (hidden_states,) 

        layer_outputs = decoder_layer( 
            hidden_states, 
            attention_mask=attention_mask, 
            position_ids=position_ids, 
            past_key_value=past_key_values, 
            output_attentions=output_attentions, 
            use_cache=use_cache, 
        ) 

        hidden_states = layer_outputs[0] 

        if use_cache: 
            next_decoder_cache = layer_outputs[2 if output_attentions else 1] 

        if output_attentions: 
            all_self_attns += (layer_outputs[1],) 

    hidden_states = self.norm(hidden_states) 

    # 最後のデコーダーレイヤーから隠れた状態を追加 
    if output_hidden_states: 
        all_hidden_states += (hidden_states,) 

    next_cache = None 
    if use_cache: 
        next_cache = next_decoder_cache.to_legacy_cache() if use_legacy_cache else next_decoder_cache 
    if not return_dict: 
        return tuple(v for v in [hidden_states, next_cache, all_hidden_states, all_self_attns] if v is not None)
    return BaseModelOutputWithPast( 
        last_hidden_state=hidden_states, 
        past_key_values=next_cache, 
        hidden_states=all_hidden_states, 
        attentions=all_self_attns, 
    ) 

if not llm_path.exists(): 
    model.llm.model.forward = types.MethodType(_model_forward, model.llm.model) 
    model.llm.model._update_causal_mask = types.MethodType(_update_causal_mask, model.llm.model) 
    llm_input = torch.zeros([2, 2, 2304]) 
    pkv = model.llm(inputs_embeds=llm_input, attention_mask=torch.ones((2, 2), dtype=torch.int64))[1] 
    model_inputs = ["attention_mask", "position_ids"] 
    model_outputs = ["logits"] 
    for idx in range(len(pkv)): 
        model_inputs.extend([f"past_key_values.{idx}.key", f"past_key_values.{idx}.value"]) 
        model_outputs.extend([f"present.{idx}.key", f"present.{idx}.value"]) 
    model_inputs.append("inputs_embeds") 
    model.llm._orig_forward = model.llm.forward 

    model.llm.forward = types.MethodType(forward_wrap, model.llm) 
    position_ids = torch.tensor([[2, 3], [2, 3]]) 
    ov_model = ov.convert_model( 
        model.llm, 
        example_input={ 
            "inputs_embeds": llm_input, 
            "attention_mask": torch.ones([2, 4], dtype=torch.int64), 
            "past_key_values": pkv, 
            "position_ids": position_ids, 
        }, 
    ) 

    for input, input_name in zip(ov_model.inputs, model_inputs): 
        input.get_tensor().set_names({input_name}) 

    for output, output_name in zip(ov_model.outputs, model_outputs): 
        output.get_tensor().set_names({output_name}) 
    patch_stateful(ov_model) 

    ov.save_model(ov_model, llm_path) 
    model.llm.config.save_pretrained(llm_path.parent) 
    del ov_model 
    cleanup_torchscript_cache() 
    del model.llm 
    gc.collect()
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/modeling_utils.py:4565: FutureWarning: _is_quantized_training_enabled is going to be deprecated in transformers 4.39.0.Please use model.hf_quantizer.is_trainable instead 
  warnings.warn( /tmp/ipykernel_150470/514161198.py:38: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! 
  if sequence_length != 1: /opt/home/k8sworker/.cache/huggingface/modules/transformers_modules/openbmb/MiniCPM-V-2/187851962daa9b63072d40ec802f597b71bff532/modeling_minicpm.py:176: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! 
  if seq_len > self.max_seq_len_cached: /opt/home/k8sworker/.cache/huggingface/modules/transformers_modules/openbmb/MiniCPM-V-2/187851962daa9b63072d40ec802f597b71bff532/modeling_minicpm.py:883: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! 
  if attention_mask.size() != (bsz, 1, q_len, kv_seq_len): /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/torch/jit/_trace.py:165: UserWarning: The .grad attribute of a Tensor that is not a leaf Tensor is being accessed. Its .grad attribute won't be populated during autograd.backward(). If you indeed want the .grad field to be populated for a non-leaf Tensor, use .retain_grad() on the non-leaf Tensor.If you access the non-leaf Tensor by mistake, make sure you access the leaf Tensor instead.See github.com/pytorch/pytorch/pull/30531 for more informations.(Triggered internally at aten/src/ATen/core/TensorBody.h:489.) 
  if a.grad is not None:
['attention_mask', 'position_ids', 'past_key_values', 'inputs_embeds']

言語モデルの重みを 4 ビットに圧縮#

メモリー消費を削減するため、NNCF を使用して重み圧縮を最適化できます。重み圧縮は、モデルのメモリー使用量を削減することを目的としています。また、大規模言語モデル (LLM) など、メモリーに依存する大規模なモデルのパフォーマンスが大幅に向上する可能性もあります。LLM やその他のモデルは、推論中に重みを保存する大量のメモリーを必要とするため、次の方法で重み圧縮の利点を得られます:

  • デバイスのメモリーに格納できない大規模なモデルの推論を可能にします。

  • 線形レイヤーなどの重みを使用した演算を行う際のメモリーアクセス・レイテンシーを短縮することで、モデルの推論パフォーマンスを向上させます。

ニューラル・ネットワーク圧縮フレームワーク (NNCF) は、主に LLM の最適化向けに設計された圧縮方法として、4 ビット / 8 ビット混合重み量子化を提供します。重み圧縮とフルモデル量子化 (トレーニング後の量子化) 違いは、重み圧縮のでは、活性化が浮動小数点のままであるため、精度が向上することです。LLM の重み圧縮は、完全なモデル量子化のパフォーマンスに匹敵する推論パフォーマンスの向上をもたらします。さらに、重み圧縮はデータに依存せず、キャリブレーション・データセットも必要としないため、容易に利用できます。

nncf.compress_weights 関数は重み圧縮の実行に使用できます。この関数は、OpenVINO モデルとその他の圧縮パラメーターを受け入れます。INT8 圧縮と比較して、INT4 圧縮はパフォーマンスをさらに向上させますが、予測品質は若干低下します。

重み圧縮の詳細については、OpenVINO のドキュメントを参照してください。

注: 重み圧縮プロセスを実行するには、さらに時間とメモリーが必要になる場合があります。以下のウィジェットで無効化できます:

import ipywidgets as widgets 

to_compress_weights = widgets.Checkbox( 
    value=True, 
    description="Weights Compression", 
    disabled=False, 
) 

to_compress_weights
Checkbox(value=True, description='Weights Compression')
import nncf 
import shutil 

compression_configuration = { 
    "mode": nncf.CompressWeightsMode.INT4_SYM, 
    "group_size": 64, 
    "ratio": 0.6, 
} 

core = ov.Core() 
llm_int4_path = llm_path.parent.parent / "language_model_int4" / llm_path.name 
if to_compress_weights.value and not llm_int4_path.exists(): 
    ov_model = core.read_model(llm_path) 
    ov_compressed_model = nncf.compress_weights(ov_model, **compression_configuration) 
    ov.save_model(ov_compressed_model, llm_int4_path) 
    del ov_compressed_model 
    del ov_model 
    gc.collect() 
    shutil.copy(text_emb_path, llm_int4_path.parent / text_emb_path.name) 
    shutil.copy(text_emb_path.with_suffix(".bin"), llm_int4_path.parent / text_emb_path.with_suffix(".bin").name) 
    shutil.copy(llm_path.parent / "config.json", llm_int4_path.parent / "config.json")
INFO:nncf:NNCF initialized successfully.Supported frameworks detected: torch, tensorflow, onnx, openvino
2024-07-13 01:04:47.035322: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on.You may see slightly different numerical results due to floating-point round-off errors from different computation orders.To turn them off, set the environment variable TF_ENABLE_ONEDNN_OPTS=0.
2024-07-13 01:04:47.077265: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-07-13 01:04:47.647632: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
Output()
INFO:nncf:Statistics of the bitwidth distribution: 
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑ 
│ Num bits (N)   │ % all parameters (layers)   │ % ratio-defining parameters (layers)   │ 
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥ 
│              8 │ 46% (123 / 281)             │ 40% (122 / 280)                        │ 
├────────────────┼─────────────────────────────┼────────────────────────────────────────┤ 
│              4 │ 54% (158 / 281)             │ 60% (158 / 280)                        │ 
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()

画像エンコーダー#

画像エンコーダーは、事前トレーニング済みの SigLIP モデルによって MiniCPM-V で表現されます。さらに、MiniCPM は画像表現を圧縮する知覚再サンプラーを使用します。これらを 1 つのモデルに組み合わせます。

class ImageEncoder(torch.nn.Module): 
    def __init__(self, vpm, resampler): 
        super().__init__() 
        self.vpm = vpm 
        self.resampler = resampler 

    def forward(self, pixel_values, tgt_size): 
        vision_embedding = self.vpm.forward_features(pixel_values) 
        if hasattr(self.vpm, "num_prefix_tokens") and self.vpm.num_prefix_tokens > 0: 
            vision_embedding = vision_embedding[:, self.vpm.num_prefix_tokens :] 
        if self.resampler.adaptive: 
            pos_embed = ( 
                self.get_2d_sincos_pos_embed(self.resampler.embed_dim, 
tgt_size).float().to(device=vision_embedding.device, dtype=vision_embedding.dtype) 
            ) 
        else: 
            pos_embed = self.get_abs_pos(self.resampler.pos_embed, tgt_size) 

        x = self.resampler.kv_proj(vision_embedding) 
        x = self.resampler.ln_kv(x).permute(1, 0, 2) 

        N = x.shape[1] 
        q = self.resampler.ln_q(self.resampler.query) 
        out = self.resampler.attn(self.resampler._repeat(q, N) + self.resampler.pos_embed.unsqueeze(1), x + pos_embed.unsqueeze(1), x, attn_mask=None)[0] 
        x = out.permute(1, 0, 2)
 
        x = self.resampler.ln_post(x) 
        x = x @ self.resampler.proj 
        return x 

    def get_2d_sincos_pos_embed(self, embed_dim, grid_size, cls_token=False): 
        """ 
        grid_size: int of the grid height and width 
        return: 
        pos_embed: [grid_size*grid_size, embed_dim] or [1+grid_size*grid_size, embed_dim] (w/ or w/o cls_token) 
        """ 

        grid_h_size, grid_w_size = grid_size[0], grid_size[1] 

        grid_h = torch.arange(grid_h_size, dtype=torch.float32) 
        grid_w = torch.arange(grid_w_size, dtype=torch.float32) 
        grid = torch.meshgrid(grid_w, grid_h) # here w goes first 
        grid = torch.stack(grid, dim=0)
 
        grid = grid.reshape([2, 1, grid_h.shape[0], grid_w.shape[0]]) 
        pos_embed = self.get_2d_sincos_pos_embed_from_grid(embed_dim, grid) 
        if cls_token: 
            pos_embed = torch.cat([torch.zeros([1, embed_dim]), pos_embed], dim=0) 
        return pos_embed 

    def get_2d_sincos_pos_embed_from_grid(self, embed_dim, grid):
        # 次元の半分を使用して grid_h をエンコード 
        emb_h = self.get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[0]) # (H*W, D/2) 
        emb_w = self.get_1d_sincos_pos_embed_from_grid(embed_dim // 2, grid[1]) # (H*W, D/2) 

        emb = torch.cat([emb_h, emb_w], dim=1) # (H*W, D) 
        return emb 

    def get_1d_sincos_pos_embed_from_grid(self, embed_dim, pos): 
        """ 
        embed_dim: output dimension for each position 
        pos: a list of positions to be encoded: size (M,) 
        out: (M, D) 
        """ 
        assert embed_dim % 2 == 0 
        omega = torch.arange(embed_dim // 2, dtype=torch.float32) 
        omega /= embed_dim / 2.0 
        omega = 1.0 / 10000**omega # (D/2,) 

        pos = pos.reshape(-1) # (M,) 
        out = torch.einsum("m,d->md", pos, omega) # (M, D/2), outer product 

        emb_sin = torch.sin(out) # (M, D/2) 
        emb_cos = torch.cos(out) # (M, D/2) 

        emb = torch.cat([emb_sin, emb_cos], axis=1) # (M, D) 
        return emb 

if not image_encoder_path.exists(): 
    image_encoder = ImageEncoder(model.vpm, model.resampler) 
    ov_model = ov.convert_model(image_encoder, example_input=[torch.ones([1, 3, 448, 448]), torch.tensor([32, 32], dtype=torch.int32)]) 
    ov.save_model(ov_model, image_encoder_path) 
    del ov_model 
    cleanup_torchscript_cache() 

del model 
gc.collect()
WARNING:tensorflow:Please fix your imports.Module tensorflow.python.training.tracking.base has been moved to tensorflow.python.trackable.base.The old module will be deleted in version 2.11.
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/timm/layers/pos_embed.py:29: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! 
  if num_new_tokens == num_pos_tokens and new_size[0] == new_size[1]: /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/timm/layers/pos_embed.py:33: TracerWarning: Converting a tensor to a Python float might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! 
  hw = int(math.sqrt(num_pos_tokens - num_prefix_tokens)) 
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/torch/functional.py:512: UserWarning: torch.meshgrid: in an upcoming release, it will be required to pass the indexing argument. (Triggered internally at ../aten/src/ATen/native/TensorShape.cpp:3587.) 
  return _VF.meshgrid(tensors, **kwargs) # type: ignore[attr-defined]
['pixel_values', 'tgt_size']
3680

モデル推論パイプラインの準備#

image0

前述したように、このモデルは、回答を生成する画像エンコーダーと LLM (テキスト埋め込み部分が分離されている) で構成されています。生成サイクルを表す LLM 推論クラスを定義します。これは HuggingFace Transformers GenerationMixin に基づいており、LLM 推論に使用される Optimum Intel OVModelForCausalLM に似ています。

from transformers.generation import GenerationMixin 
from transformers import AutoConfig, GenerationConfig 

core = ov.Core() 

class OvModelForCausalLMWithEmb(GenerationMixin): 
    def __init__(self, model_dir, device="CPU", ov_config=None, compile=True) -> None: 
        self._supports_cache_class = False 
        self.config = AutoConfig.from_pretrained(model_dir, trust_remote_code=True) 
        self.config.is_decoder = True 
        self.config.is_encoder_decoder = False 
        self.generation_config = GenerationConfig.from_model_config(self.config) 
        model_dir = Path(model_dir) 
        self.model = core.read_model(model_dir / "language_model.xml") 
        self.token_emb = core.read_model(model_dir / "embed_tokens.xml") 
        self.request = None 
        self.token_emb_request = None 
        self._device = device.upper() 
        self.device = torch.device("cpu") 
        self.ov_config = ov_config 
        self.next_beam_idx = None 
        self._past_length = None 
        self.input_names = [input_t.get_any_name() for input_t in self.model.inputs] 
        self.main_input_name = "input_ids" 
        if compile: 
            self.compile() 

    def compile(self): 
        if self.request is None: 
            self.request = core.compile_model(self.model, self._device, self.ov_config).create_infer_request() 
        self._compile_token_emb() 

    def _compile_token_emb(self): 
        if self.token_emb_request is None: 
            self.token_emb_request = core.compile_model(self.token_emb, self._device, self.ov_config) 

    def to(self, device: str): 
        if isinstance(device, str): 
            self._device = device.upper() 
            self.clear_requests() 

        return self 

    def clear_requests(self): 
        del self.request 
        del self.token_emb_request 
        self.request = None 
        self.token_emb_request = None 

    def embed_tokens(self, input_ids: torch.LongTensor): 
        self._compile_token_emb() 
        res = self.token_emb_request(input_ids, share_inputs=True) 
        return res[0] 

    def prepare_inputs( 
        self, 
        input_ids: torch.LongTensor, 
        attention_mask: Optional[torch.LongTensor] = None, 
        past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, 
        position_ids: Optional[torch.LongTensor] = None, 
        inputs_embeds: Optional[torch.FloatTensor] = None, 
        **kwargs, 
    ): 
        batch_size = input_ids.shape[0] if input_ids is not None else inputs_embeds.shape[0] 

        inputs = {} 
        # past_key_values は明示的には使用されず、モデル内で処理されます 
        if past_key_values is None:
            # これはシーケンスの最初の反復であり、すべての状態をリセットします 
            if self.request is not None: 
                self.request.reset_state() 
                # 現在の反復で使用される次の beam_idx 入力の初期値を設定し、 
                # beam_search が使用される場合は次の反復で _reorder_cache によってオプションで更新されます 
                self.next_beam_idx = np.arange(batch_size, dtype=int) 
                self._past_length = 0 
        past_len = self._get_past_length(past_key_values) 

        if inputs_embeds is None: 
            inputs_embeds = self.embed_tokens(input_ids if past_key_values is None else input_ids[:, -1:])* self.config.scale_emb 
        inputs["inputs_embeds"] = inputs_embeds 

        # 必要に応じて attention_mask 入力を追加 
        if "attention_mask" in self.input_names or "position_ids" in self.input_names: 
            if attention_mask is not None: 
                attention_mask = np.array(attention_mask) 
            else: 
                attention_mask = np.ones((inputs_embeds.shape[0], inputs_embeds.shape[1] + past_len), dtype=int) 

        if "attention_mask" in self.input_names: 
            inputs["attention_mask"] = attention_mask 

        if "position_ids" in self.input_names: 
            if position_ids is not None: 
                position_ids = np.array(position_ids) 
            else: 
                position_ids = np.cumsum(attention_mask, axis=1) - 1 
                position_ids[attention_mask == 0] = 1 
                if past_key_values: 
                    position_ids = position_ids[:, -input_ids.shape[1] :] 

            inputs["position_ids"] = position_ids 

        if "beam_idx" in self.input_names: 
            inputs["beam_idx"] = self.next_beam_idx if self.next_beam_idx is not None else np.arange(batch_size, dtype=int) 

        return inputs 

    def forward( 
        self, 
        input_ids: torch.LongTensor, 
        attention_mask: Optional[torch.LongTensor] = None, 
        past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, 
        position_ids: Optional[torch.LongTensor] = None, 
        inputs_embeds: Optional[torch.LongTensor] = None, 
        **kwargs, 
    ): 
        self.compile() 

        inputs = self.prepare_inputs( 
            input_ids=input_ids, 
            attention_mask=attention_mask, 
            past_key_values=past_key_values, 
            position_ids=position_ids, 
            inputs_embeds=inputs_embeds, 
            **kwargs, 
        ) 

        # 推論を実行 
        self.request.start_async(inputs, share_inputs=True) 
        self.request.wait() 
        logits = self.request.get_tensor("logits").data 
        logits = torch.from_numpy(logits).to(self.device) 
        past_key_values = ((),) 
        self._past_length += inputs["inputs_embeds"].shape[1] 

        return CausalLMOutputWithPast(logits=logits, past_key_values=past_key_values) 

    # transformers.models.llama.modeling_llama.LlamaForCausalLM.prepare_inputs_for_generation からトランスフォーム 
    def prepare_inputs_for_generation(self, input_ids, past_key_values=None, inputs_embeds=None, **kwargs):
        # モデルがエンコーダー・デコーダー・モデルでデコーダーとして使用される場合、デコーダー・アテンション・マスクはオンザフライで作成されます 
        attention_mask = kwargs.get("attention_mask", None) 
        use_cache = kwargs.get("use_cache", None) 

        if past_key_values is not None: 
            past_len = self._get_past_length(past_key_values) 
            # 未処理のトークンのみ保存:
            # 1 - attention_mask の長さが input_ids の長さを超える場合、 
            # 入力の一部がキャッシュの一部として排他的に渡される設定になります 
            # (例: input_embeds を入力として渡す場合) 
            if attention_mask is not None and input_ids is not None and attention_mask.shape[1] > input_ids.shape[1]: 
                input_ids = input_ids[:, -(attention_mask.shape[1] - past_len) :] 
            # 2 - past_length が input_ids より小さい場合、input_ids はすべての入力トークンを保持します。past_length に基づいて 
            # input_ids を破棄できます 
            elif input_ids is not None and past_len < input_ids.shape[1]: 
                input_ids = input_ids[:, past_len:]
            # 3 - それ以外の場合 (past_length >= input_ids.shape[1])、input_ids には未処理のトークンのみがあると仮定 
        position_ids = kwargs.get("position_ids", None) 
        if attention_mask is not None and position_ids is None and "position_ids" in self.input_names:
            # バッチ生成のために position_ids をオンザフライで作成 
            position_ids = attention_mask.long().cumsum(-1) - 1 
            position_ids.masked_fill_(attention_mask == 0, 1) 
            if past_key_values and input_ids is not None: 
                position_ids = position_ids[:, -input_ids.shape[1] :] 

            model_inputs = { 
                "input_ids": input_ids, 
                "past_key_values": past_key_values, 
                "use_cache": use_cache, 
                "position_ids": position_ids, 
                "attention_mask": attention_mask, 
                "inputs_embeds": inputs_embeds if past_key_values is None else None, 
            } 

            return model_inputs 

        def _get_past_length(self, past_key_values=None): 
            if past_key_values is None: 
                return 0 
            return self._past_length 

    # transformers.models.gpt2.modeling_gpt2.GPT2LMHeadModel._reorder_cache から適用 
    def _reorder_cache(self, past_key_values: Tuple[Tuple[torch.Tensor]], beam_idx: torch.Tensor) -> Tuple[Tuple[torch.Tensor]]: 
        """ 
        This function is used to re-order the `past_key_values` cache if [`~PreTrainedModel.beam_search`] or 
        [`~PreTrainedModel.beam_sample`] is called. 
        This is required to match `past_key_values` with the correct beam_idx at every generation step.
        """ 
        self.next_beam_idx = np.array(beam_idx) # 次の反復で入力として使用するために beam_idx を保存 
        return past_key_values 

    def can_generate(self): 
        """Returns True to validate the check that the model using `GenerationMixin.generate()` can indeed generate.""" 

        return True 

    def __call__(self, *args, **kwargs): 
        return self.forward(*args, **kwargs)

これは LLM を使用して画像処理や回答生成などのチャットボット機能を処理する汎用マルチモーダル・モデル・クラス OvMiniCPMVModel の順序です。

from typing import List, Optional 
import math 
import json 
import torch 
from torchvision import transforms 
from timm.data import IMAGENET_INCEPTION_MEAN, IMAGENET_INCEPTION_STD 
from PIL import Image 

def pad(orig_items, key, max_length=None, padding_value=0, padding_side="left"): 
    items = [] 
    if isinstance(orig_items[0][key], list): 
        assert isinstance(orig_items[0][key][0], torch.Tensor) 
        for it in orig_items: 
            for tr in it[key]: 
                items.append({key: tr})
    else: 
        assert isinstance(orig_items[0][key], torch.Tensor) 
        items = orig_items 

    batch_size = len(items) 
    shape = items[0][key].shape 
    dim = len(shape) 
    assert dim <= 3 
    if max_length is None: 
        max_length = 0 
    max_length = max(max_length, max(item[key].shape[-1] for item in items)) 
    min_length = min(item[key].shape[-1] for item in items) 
    dtype = items[0][key].dtype 

    if dim == 1: 
        return torch.cat([item[key] for item in items], dim=0) 
    elif dim == 2: 
        if max_length == min_length: 
            return torch.cat([item[key] for item in items], dim=0) 
        tensor = torch.zeros((batch_size, max_length), dtype=dtype) + padding_value 
    else: 
        tensor = torch.zeros((batch_size, max_length, shape[-1]), dtype=dtype) + padding_value 

    for i, item in enumerate(items): 
        if dim == 2: 
            if padding_side == "left": 
                tensor[i, -len(item[key][0]) :]= item[key][0].clone() 
            else: 
                tensor[i, : len(item[key][0])] = item[key][0].clone() 
        elif dim == 3: 
            if padding_side == "left": 
                tensor[i, -len(item[key][0]) :, :]= item[key][0].clone() 
            else: 
                tensor[i, : len(item[key][0]), :]= item[key][0].clone() 

    return tensor 

def slice_image(image, max_slice_nums=9, scale_resolution=448, patch_size=14, never_split=False): 
    original_size = image.size 
    original_width, original_height = original_size 
    log_ratio = math.log(original_width / original_height) 
    ratio = original_width * original_height / (scale_resolution * scale_resolution) 
    multiple = min(math.ceil(ratio), max_slice_nums) 

    source_image = None 
    best_grid = None 
    patches = [] 

    if multiple <= 1 or never_split:
        # スライスやアップサンプリングは不要 
        best_size = find_best_resize(original_size, scale_resolution, patch_size, allow_upscale=True) 
        source_image = image.resize(best_size, Image.Resampling.BICUBIC) 
    else: 
        candidate_split_grids_nums = [] 
        for i in [multiple - 1, multiple, multiple + 1]: 
            if i == 1 or i > max_slice_nums: 
                continue 
            candidate_split_grids_nums.append(i) 

        # ソース画像、ダウンサンプリング、patch_size で割った値 
        best_resize = find_best_resize(original_size, scale_resolution, patch_size) 
        source_image = image.copy().resize(best_resize, Image.Resampling.BICUBIC) 
        candidate_grids = [] 

        # 最適なグリッドを検出  
        for split_grids_nums in candidate_split_grids_nums: 
            m = 1 
            while m <= split_grids_nums: 
                if split_grids_nums % m == 0: 
                    candidate_grids.append([m, split_grids_nums // m]) 
                m += 1 

        best_grid = [1, 1] 
        min_error = float("inf") 
        for grid in candidate_grids: 
            error = abs(log_ratio - math.log(grid[0] / grid[1])) 
            if error < min_error: 
                best_grid = grid 
                min_error = error 

        refine_size = get_refine_size(original_size, best_grid, scale_resolution, patch_size, allow_upscale=True) 

        refine_image = image.resize(refine_size, Image.Resampling.BICUBIC) 
        patches = split_to_patches(refine_image, best_grid) 

    return source_image, patches, best_grid 

def ensure_divide(length, patch_size): 
    return max(round(length / patch_size) * patch_size, patch_size) 

def find_best_resize(original_size, scale_resolution, patch_size, allow_upscale=False): 
    width, height = original_size 
    if (width * height > scale_resolution * scale_resolution) or allow_upscale: 
        r = width / height 
        height = int(scale_resolution / math.sqrt(r)) 
        width = int(height * r) 
    best_width = ensure_divide(width, patch_size) 
    best_height = ensure_divide(height, patch_size) 
    return (best_width, best_height) 

def get_refine_size(original_size, grid, scale_resolution, patch_size, allow_upscale=False): 
    width, height = original_size 
    grid_x, grid_y = grid 

    refine_width = ensure_divide(width, grid_x) 
    refine_height = ensure_divide(height, grid_y) 
    grid_width = refine_width / grid_x 
    grid_height = refine_height / grid_y 

    best_grid_size = find_best_resize( 
        (grid_width, grid_height), 
        scale_resolution, 
        patch_size, 
        allow_upscale=allow_upscale, 
    ) 

    refine_size = (best_grid_size[0] * grid_x, best_grid_size[1] * grid_y) 

    return refine_size 

def split_to_patches(image, grid): 
    patches = [] 
    width, height = image.size 
    grid_x = int(width / grid[0]) 
    grid_y = int(height / grid[1]) 

    for i in range(0, height, grid_y): 
        images = [] 
        for j in range(0, width, grid_x): 
            box = (j, i, j + grid_x, i + grid_y) 
            patch = image.crop(box) 
            images.append(patch) 
        patches.append(images) 

    return patches 

def get_grid_placeholder(tokenizer, grid, query_num): 
    image_placeholder = tokenizer.im_start + tokenizer.unk_token * query_num + tokenizer.im_end 

    cols = grid[0] 
    rows = grid[1] 
    slices = [] 
    for i in range(rows): 
        lines = [] 
        for j in range(cols): 
            lines.append(image_placeholder) 
        slices.append("".join(lines)) 
    slice_placeholder = tokenizer.slice_start + "\n".join(slices) + tokenizer.slice_end 
    return slice_placeholder 

class OvMiniCPMVModel: 
    def __init__(self, config, vpm, llm, tokenizer) -> None: 
        self.config = config 
        self.vpm = vpm 
        self.llm = llm 
        self.transform = self.init_transform() 
        self.tokenizer = tokenizer 
        self.device = torch.device("cpu") 

    def init_transform(self): 
        return transforms.Compose( 
            [ 
                transforms.ToTensor(), 
                transforms.Normalize(mean=IMAGENET_INCEPTION_MEAN, std=IMAGENET_INCEPTION_STD), 
            ] 
        ) 

    def get_vision_embedding(self, pixel_values): 
        res = [] 
        for pixel_value in pixel_values: 
            h, w = pixel_value.shape[-2:] 
            tgt_size = torch.from_numpy(np.array([math.ceil(h / self.config.patch_size), math.ceil(w / self.config.patch_size)])) 
            vision_embedding = self.vpm([pixel_value.unsqueeze(0), tgt_size])[0] 
            res.append(vision_embedding) 
        return np.vstack(res) 

    def get_vllm_embedding(self, data): 
        if "vision_hidden_states" not in data: 
            pixel_values_list = data["pixel_values"] 
            vision_hidden_states = [] 
            for pixel_values in pixel_values_list: 
                if len(pixel_values) > 0: 
                    vision_hidden_states.append(torch.from_numpy(self.get_vision_embedding(pixel_values))) 
                else
                    : vision_hidden_states.append([]) 
        else: 
            vision_hidden_states = data["vision_hidden_states"] 

        vllm_embedding = torch.from_numpy(self.llm.embed_tokens(data["input_ids"])) * self.llm.config.scale_emb 
        bs = len(data["input_ids"]) 
        for i in range(bs): 
            cur_vs_hs = vision_hidden_states[i] 
            if len(cur_vs_hs) > 0: 
                cur_vllm_emb = vllm_embedding[i] 
                cur_image_bound = data["image_bound"][i] 
                if len(cur_image_bound) > 0: 
                    image_indices = torch.stack([torch.arange(r[0], r[1], dtype=torch.long) for r in cur_image_bound]) 

                    cur_vllm_emb.scatter_( 
                        0, 
                        image_indices.view(-1, 1).repeat(1, 
                        cur_vllm_emb.shape[-1]), cur_vs_hs.view(-1, cur_vs_hs.shape[-1]), 
                    ) 

        return vllm_embedding 

    def forward(self, data, **kwargs): 
        vllm_embedding = self.get_vllm_embedding(data) 
        position_ids = data["position_ids"] 
        if position_ids.dtype != torch.int64: 
            position_ids = position_ids.long() 

        return self.llm(input_ids=None, position_ids=position_ids, inputs_embeds=vllm_embedding, **kwargs) 

    def _convert_to_tensors(self, tokenizer, input_str, max_inp_length: Optional[int] = None): 
        if tokenizer.add_bos_token: 
            input_ids = tokenizer.encode(input_str) 
        else: 
            input_ids = [tokenizer.bos_id] + tokenizer.encode(input_str) 
        if max_inp_length is not None: 
            input_ids = input_ids[:max_inp_length] 
        input_ids = torch.tensor(input_ids, dtype=torch.int32) 

        image_start_tokens = torch.where(input_ids == tokenizer.im_start_id)[0] 
        # 跳过 im_start 
        image_start_tokens += 1 
        image_end_tokens = torch.where(input_ids == tokenizer.im_end_id)[0] 
        valid_image_nums = max(len(image_start_tokens), len(image_end_tokens)) 
        image_bound = torch.hstack( 
            [ 
                image_start_tokens[:valid_image_nums].unsqueeze(-1), 
                image_end_tokens[:valid_image_nums].unsqueeze(-1), 
            ] 
        ) 

        model_input = {} 
        model_input["input_ids"] = input_ids.unsqueeze(0) 
        model_input["image_bound"] = image_bound 

        return model_input 

    def _process_list(self, tokenizer, data_list: List[str], max_inp_length: Optional[int] = None): 
        pad_keys = ["input_ids"] 
        input_tensors = [] 
        for data in data_list: 
            input_tensors.append(self._convert_to_tensors(tokenizer, data, max_inp_length)) 
        padded = {} 
        for key in pad_keys: 
            padded[key] = pad(input_tensors, key, padding_side="left").to(self.device) 
        padded["image_bound"] = [i["image_bound"] for i in input_tensors] 
        return padded 

    def _decode(self, inputs_embeds, tokenizer, **kwargs): 
        output = self.llm.generate(inputs_embeds=inputs_embeds, pad_token_id=0, eos_token_id=tokenizer.eos_token_id, **kwargs) 
        return self._decode_text(output, tokenizer) 

    def _decode_text(self, result_ids, tokenizer): 
        result_text = [] 
        for result in result_ids: 
            result = result[result != 0] 
            if result[0] == tokenizer.bos_id: 
                result = result[1:] 
            if result[-1] == tokenizer.eos_id: 
                result = result[:-1] 
            result_text.append(tokenizer.decode(result).strip()) 
        return result_text 

    def slice_image(self, image): 
        return slice_image( 
            image, 
            self.config.max_slice_nums, 
            self.config.scale_resolution, 
            self.config.patch_size, 
        ) 

    def get_slice_image_placeholder(self, image, tokenizer): 
        image_placeholder = tokenizer.im_start + tokenizer.unk_token * self.config.query_num + tokenizer.im_end 

        slice_images = [] 

        source_image, patches, best_grid = slice_image( 
            image, 
            self.config.max_slice_nums, 
            self.config.scale_resolution, 
            self.config.patch_size, 
        ) 

        slice_images.append(source_image) 
        final_placeholder = image_placeholder 

        if len(patches) > 0: 
            for i in range(len(patches)): 
                for j in range(len(patches[0])): 
                    slice_images.append(patches[i][j]) 

            final_placeholder += get_grid_placeholder(tokenizer, best_grid, self.config.query_num) 
        return slice_images, final_placeholder 

    def generate(self, data_list=None, img_list=None, tokenizer=None, max_inp_length: Optional[int] = None, vision_hidden_states=None, **kwargs): 
        assert data_list is not None 
        bs = len(data_list) 
        if img_list is None: 
            img_list = [[] for i in range(bs)] 
        assert bs == len(img_list) 

        model_inputs = self._process_list(tokenizer, data_list, max_inp_length) 

        if vision_hidden_states is None: 
            pixel_values = [] 
            for i in range(bs): 
                img_inps = [] 
                for img in img_list[i]: 
                    img_inps.append(self.transform(img).to(self.device)) 
                if img_inps: 
                    pixel_values.append(img_inps) 
                else: 
                    pixel_values.append([]) 
            model_inputs["pixel_values"] = pixel_values 
        else: 
            model_inputs["vision_hidden_states"] = vision_hidden_states 

        with torch.inference_mode(): 
            model_inputs["inputs_embeds"] = self.get_vllm_embedding(model_inputs) 

            result = self._decode(model_inputs["inputs_embeds"], tokenizer, **kwargs) 

        return result 

    def chat(self, image, msgs, context, tokenizer, vision_hidden_states=None, max_new_tokens=1024, sampling=True, max_inp_length=2048, **kwargs): 
        if isinstance(msgs, str): 
            msgs = json.loads(msgs) 
        # プロンプトメッセージ 
        prompt = "" 
        for i, msg in enumerate(msgs): 
            role = msg["role"] 
            content = msg["content"] 
            assert role in ["user", "assistant"] 
            if i == 0: 
                if image is None: 
                    images = [] 
                else: 
                    assert role == "user", "The role of first msg should be user" 
                    if self.config.slice_mode: 
                        images, final_placeholder = self.get_slice_image_placeholder(image, tokenizer) content = final_placeholder + "\n" + content 
                    else: 
                        images = [image] 
                        content = tokenizer.im_start + tokenizer.unk_token * self.config.query_num + tokenizer.im_end + "\n" + content 
            prompt += "<用户>" if role == "user" else "<AI>" 
            prompt += content 
        prompt += "<AI>" 
        final_input = prompt 

        if sampling: 
            generation_config = { 
                "top_p": 0.8, 
                "top_k": 100, 
                "temperature": 0.7, 
                "do_sample": True, 
                "repetition_penalty": 1.05, 
                "streamer": None, 
            } 
        else: 
            generation_config = { 
                "num_beams": 3, 
                "repetition_penalty": 1.2, 
                "streamer": None, 
            } 

        generation_config.update((k, kwargs[k]) for k in generation_config.keys() & kwargs.keys()) 

        with torch.inference_mode(): 
            res = self.generate( 
                data_list=[final_input], 
                max_inp_length=max_inp_length, 
                img_list=[images], 
                tokenizer=tokenizer, 
                max_new_tokens=max_new_tokens, 
                vision_hidden_states=vision_hidden_states, 
                **generation_config 
            ) 
        answer = res[0] 
        context = msgs.copy() 
        context.append({"role": "assistant", "content": answer}) 

        return answer, context, generation_config

OpenVINO モデル推論を実行#

デバイスを選択#

core = ov.Core() 

support_devices = core.available_devices 
if "NPU" in support_devices: 
    support_devices.remove("NPU") 

device = widgets.Dropdown( 
    options=support_devices + ["AUTO"], 
    value="CPU", 
    description="Device:", 
    disabled=False, 
) 

device
Dropdown(description='Device:', options=('CPU', 'AUTO'), value='CPU')

モデルバリアントの選択#

use_int4_lang_model = widgets.Checkbox( 
    value=llm_int4_path.exists(), 
    description="INT4 language model", 
    disabled=not llm_int4_path.exists(), 
) 

use_int4_lang_model
Checkbox(value=True, description='INT4 language model')
llm = OvModelForCausalLMWithEmb(llm_path.parent if not use_int4_lang_model.value else llm_int4_path.parent, device.value)
visual_encoder = core.compile_model(image_encoder_path, device.value)
config = AutoConfig.from_pretrained(model_dir, trust_remote_code=True) 
tokenizer = AutoTokenizer.from_pretrained(model_dir, trust_remote_code=True)
model = OvMiniCPMVModel(config, visual_encoder, llm, tokenizer)
import requests 

url = "https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/d5fbbd1a-d484-415c-88cb-9986625b7b11" 
image = Image.open(requests.get(url, stream=True).raw) 
question = "What is unusual on this image?" 

print(f"Question:\n{question}") 
image
Question: 
What is unusual on this image?
../_images/minicpm-v-multimodal-chatbot-with-output_27_1.png
from transformers import TextStreamer 

msgs = [{"role": "user", "content": question}] 

streamer = TextStreamer(tokenizer=tokenizer, skip_special_tokens=True) 

print("Answer:") 
res, context, _ = model.chat(image=image, msgs=msgs, context=None, tokenizer=tokenizer, sampling=True, temperature=0.7, streamer=streamer)
Answer: 
The unusual aspect of this image is the presence of a cat laying inside an open cardboard box.

インタラクティブなデモ#

import gradio as gr 
import traceback 
import re 
from transformers import TextIteratorStreamer 
from threading import Thread 

ERROR_MSG = "Error, please retry" 
model_name = "MiniCPM-V 2.0" 

form_radio = {"choices": ["Beam Search", "Sampling"], "value": "Sampling", "interactive": True, "label": "Decode Type"} 
# ビームフォーム 
num_beams_slider = {"minimum": 0, "maximum": 5, "value": 3, "step": 1, "interactive": True, "label": "Num Beams"} 
repetition_penalty_slider = {"minimum": 0, "maximum": 3, "value": 1.2, "step": 0.01, "interactive": True, "label": "Repetition Penalty"} 
repetition_penalty_slider2 = {"minimum": 0, "maximum": 3, "value": 1.05, "step": 0.01, "interactive": True, "label": "Repetition Penalty"} 
max_new_tokens_slider = {"minimum": 1, "maximum": 4096, "value": 1024, "step": 1, "interactive": True, "label": "Max New Tokens"} 

top_p_slider = {"minimum": 0, "maximum": 1, "value": 0.8, "step": 0.05, "interactive": True, "label": "Top P"} 
top_k_slider = {"minimum": 0, "maximum": 200, "value": 100, "step": 1, "interactive": True, "label": "Top K"} 
temperature_slider = {"minimum": 0, "maximum": 2, "value": 0.7, "step": 0.05, "interactive": True, "label": "Temperature"} 

def create_component(params, comp="Slider"): 
    if comp == "Slider": 
        return gr.Slider( 
            minimum=params["minimum"], 
            maximum=params["maximum"], 
            value=params["value"], 
            step=params["step"], 
            interactive=params["interactive"], 
            label=params["label"], 
        ) 
    elif comp == "Radio": 
        return gr.Radio(choices=params["choices"], value=params["value"], interactive=params["interactive"], label=params["label"]) 
    elif comp == "Button": 
        return gr.Button(value=params["value"], interactive=True) 

def chat(img, msgs, ctx, params=None, vision_hidden_states=None): 
    default_params = {"num_beams": 3, "repetition_penalty": 1.2, "max_new_tokens": 1024} 
    if params is None: 
        params = default_params 
    if img is None: 
        return -1, "Error, invalid image, please upload a new image", None, None 
    try: 
        image = img.convert("RGB") 
        streamer = TextIteratorStreamer(tokenizer, **{"skip_special_tokens": True}) 
        generation_params = {"image": image, "msgs": msgs, "context": None, "tokenizer": tokenizer, "streamer": streamer, **params} 
        thread = Thread(target=model.chat, kwargs=generation_params) 
        thread.start() 

        buffer = "" 

        for res in streamer: 
            res = re.sub(r"(<box>.*</box>)", "", 
            res) res = res.replace("<ref>", "") 
            res = res.replace("</ref>", "") 
            res = res.replace("<box>", "") 
            new_text = res.replace("</box>", "") 
            buffer += new_text 
            yield -1, buffer, None, None 
    except Exception as err: 
        print(err) 
        traceback.print_exc() 
        return -1, ERROR_MSG, None, None 

def upload_img(image, _chatbot, _app_session): 
    image = Image.fromarray(image) 

    _app_session["sts"] = None 
    _app_session["ctx"] = [] 
    _app_session["img"] = image 
    _chatbot.append(("", "Image uploaded successfully, you can talk to me now")) 
    return _chatbot, _app_session 

def respond(_question, _chat_bot, _app_cfg, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature): 
    if _app_cfg.get("ctx", None) is None:
        _chat_bot.append((_question, "Please upload an image to start")) 
        return "", _chat_bot, _app_cfg 

    _context = _app_cfg["ctx"].copy() 
    if _context:
         _context.append({"role": "user", "content": _question}) 
    else:
         _context = [{"role": "user", "content": _question}] 

    if params_form == "Beam Search": 
        params = {"sampling": False, "num_beams": num_beams, "repetition_penalty": repetition_penalty, "max_new_tokens": 896} 
    else: 
        params = { 
            "sampling": True, 
            "top_p": top_p, 
            "top_k": top_k, 
            "temperature": temperature, 
            "repetition_penalty": repetition_penalty_2, 
            "max_new_tokens": 896, 
        } 

    _context.append({"role": "assistant", "content": ""}) 
    _chat_bot.append([_question, ""]) 
    for code, _answer, _, sts in chat(_app_cfg["img"], _context, None, params):
        _context[-1]["content"] = _answer 
        _chat_bot[-1][-1] = _answer 

        if code == 0:
            _app_cfg["ctx"] = _context 
            _app_cfg["sts"] = sts 
        yield "", _chat_bot, _app_cfg 

def regenerate_button_clicked(_question, _chat_bot, _app_cfg, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature): 
    if len(_chat_bot) <= 1:
        _chat_bot.append(("Regenerate", "No question for regeneration.")) 
        return "", _chat_bot, _app_cfg 
    elif _chat_bot[-1][0] == "Regenerate": 
        return "", _chat_bot, _app_cfg 
    else:
        _question = _chat_bot[-1][0] 
        _chat_bot = _chat_bot[:-1] 
        _app_cfg["ctx"] = _app_cfg["ctx"][:-2] 
    for text, _chatbot, _app_cfg in respond( 
        _question, _chat_bot, _app_cfg, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature 
    ): 
        yield text, _chatbot, _app_cfg 

with gr.Blocks() as demo: 
    with gr.Row(): 
        with gr.Column(scale=1, min_width=300): 
            params_form = create_component(form_radio, comp="Radio") 
            with gr.Accordion("Beam Search") as beams_according: 
                num_beams = create_component(num_beams_slider) 
                repetition_penalty = create_component(repetition_penalty_slider) 
            with gr.Accordion("Sampling") as sampling_according: 
                top_p = create_component(top_p_slider) 
                top_k = create_component(top_k_slider) 
                temperature = create_component(temperature_slider) 
                repetition_penalty_2 = create_component(repetition_penalty_slider2) 
            regenerate = create_component({"value": "Regenerate"}, comp="Button") 
        with gr.Column(scale=3, min_width=500): 
            app_session = gr.State({"sts": None, "ctx": None, "img": None}) 
            bt_pic = gr.Image(label="Upload an image to start") 
            chat_bot = gr.Chatbot(label=f"Chat with {model_name}") 
            txt_message = gr.Textbox(label="Input text") 

            regenerate.click( 
                regenerate_button_clicked, 
                [txt_message, chat_bot, app_session, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature], 
                [txt_message, chat_bot, app_session], 
            ) 
            txt_message.submit( 
                respond, 
                [txt_message, chat_bot, app_session, params_form, num_beams, repetition_penalty, repetition_penalty_2, top_p, top_k, temperature], 
            [txt_message, chat_bot, app_session], 
            )
             bt_pic.upload(lambda: None, None, chat_bot, queue=False).then(upload_img, inputs=[bt_pic, chat_bot, app_session], outputs=[chat_bot, app_session]) 

try: 
    demo.launch(debug=False) 
except Exception: 
    demo.launch(debug=False, share=True) 
# リモートで起動する場合は、server_name と server_port を指定 
# demo.launch(server_name='your server name', server_port='server port in int') 
# 詳細はドキュメントをご覧ください: https://gradio.app/docs/
ローカル URL で実行中: http://127.0.0.1:7860 
パブリックリンクを作成するには、launch()share=True を設定します。