LLaVA Next と OpenVINO による視覚言語アシスタント#

この Jupyter ノートブックは、ローカルへのインストール後にのみ起動できます。

GitHub

LLaVA-NeXT は、画像に対する高度な言語推論の画期的な進歩を示す、改良された OCR と拡張された世界知識を導入した新世代の LLaVA モデルファミリーです。LLaVA (Large Language and Vision Assistant) は、言語と画像の両方の指示に従ってさまざまな現実世界のタスクを完了できる汎用視覚アシスタントの開発を目的とした大規模なマルチモーダル・モデルです。アイデアは、大規模言語モデル (LLM) のパワーと CLIP などのビジョン・エンコーダーを組み合わせて、マルチモーダル指示を理解してそれに従って動作する、エンドツーエンドでトレーニングされたニューラル・アシスタントを作成することです。

このチュートリアルでは、マルチモーダル・チャットボットを作成するため、Transformers ライブラリーから LLaVA-NeXT モデルに変換および最適化する方法について説明します。マルチモーダル・チャットボットの作成には llava-v1.6-mistral-7b モデルを活用しますが、同様のアクションは、HuggingFace トランスフォーマー実装と互換性のある LLaVA ファミリーの他のモデルにも適用できます。さらに、LLM にステートフル変換を適用する方法と、NNCF を使用した重み圧縮と量子化などのモデル最適化手法を紹介します。

目次:

必要条件#

%pip install -q "openvino>=2024.0.0" "nncf>=2.9.0" "torch>=2.1" "transformers>=4.39.1" "accelerate" "pillow" "gradio>=4.26" "datasets>=2.14.6" "tqdm" --extra-index-url https://download.pytorch.org/whl/cpu
Note: you may need to restart the kernel to use updated packages.
from pathlib import Path 

MODEL_DIR = Path("model") 
IMAGE_ENCODER_PATH = MODEL_DIR / "image_encoder.xml" 
INPUT_EMBEDDING_PATH = MODEL_DIR / "input_embeddings.xml" 
LANGUAGE_MODEL_PATH = MODEL_DIR / "language_model.xml" 

requires_pt_model_loading = not all([p.exists() for p in [IMAGE_ENCODER_PATH, INPUT_EMBEDDING_PATH, LANGUAGE_MODEL_PATH]])

PyTorch モデルをダウンロード#

from transformers import LlavaNextProcessor, LlavaNextForConditionalGeneration 
import torch 
import gc 

processor = LlavaNextProcessor.from_pretrained("llava-hf/llava-v1.6-mistral-7b-hf") 
image_encoder_model, input_embedding_model, language_model = None, None, None 

class ImageEncoder(torch.nn.Module): 
    def __init__(self, config, vision_tower, multi_modal_projector): 
        super().__init__() 
        self.config = config 
        self.vision_tower = vision_tower 
        self.multi_modal_projector = multi_modal_projector 

    def forward(self, pixel_values): 
        batch_size, num_patches, num_channels, height, width = pixel_values.shape 
        reshaped_pixel_values = pixel_values.view(batch_size * num_patches, num_channels, height, width) 
        image_features = self.vision_tower(reshaped_pixel_values, output_hidden_states=True) 
        selected_image_feature = image_features.hidden_states[self.config.vision_feature_layer] 
        if self.config.vision_feature_select_strategy == "default": 
            selected_image_feature = selected_image_feature[:, 1:] 
        elif self.config.vision_feature_select_strategy == "full": 
            selected_image_feature = selected_image_feature 
        image_features = self.multi_modal_projector(selected_image_feature) 
        return image_features 

if requires_pt_model_loading: 
    model = LlavaNextForConditionalGeneration.from_pretrained("llava-hf/llava-v1.6-mistral-7b-hf", low_cpu_mem_usage=True) 
    model.config.save_pretrained(MODEL_DIR) 
    image_encoder_model = ImageEncoder(model.config, model.vision_tower, model.multi_modal_projector) 
    input_embedding_model = input_embedding_model = model.get_input_embeddings() 
    language_model = model.language_model 
    del model 
    gc.collect()
2024-04-04 12:27:23.875042: I tensorflow/core/util/port.cc:111] oneDNN custom operations are on.You may see slightly different numerical results due to floating-point round-off errors from different computation orders.To turn them off, set the environment variable TF_ENABLE_ONEDNN_OPTS=0.
2024-04-04 12:27:23.877406: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-04 12:27:23.907479: E tensorflow/compiler/xla/stream_executor/cuda/cuda_dnn.cc:9342] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered 
2024-04-04 12:27:23.907505: E tensorflow/compiler/xla/stream_executor/cuda/cuda_fft.cc:609] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered 
2024-04-04 12:27:23.907525: E tensorflow/compiler/xla/stream_executor/cuda/cuda_blas.cc:1518] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered 
2024-04-04 12:27:23.913713: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2024-04-04 12:27:23.914384: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-04-04 12:27:24.847675: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT 
Special tokens have been added in the vocabulary, make sure the associated word embeddings are fine-tuned or trained.

OpenVINO## Convert model to OpenVINO Intermediate Representation back to top ⬆️

OpenVINO は、OpenVINO 中間表現 (IR) への変換により PyTorch モデルをサポートします。これには、OpenVINO モデル・トランスフォーメーション API を使用する必要があります。ov.convert_model 関数は、元の PyTorch モデル・インスタンスとトレース用のサンプル入力を受け取り、OpenVINO フレームワークでこのモデルを表す ov.Model を返します。変換されたモデルは、ov.save_model 関数を使用してディスクに保存するか、core.complie_model を使用してデバイスに直接ロードできます。

LLaVA-NeXT は自己回帰トランスフォーマー生成モデルです。つまり、次の各モデルステップは、前のステップからのモデル出力に依存します。生成アプローチは、単語シーケンスの確率分布を条件付きの次の単語分布の積に分解できるという仮定に基づいています。言い換えると、モデルは、停止条件 (最大長の生成されたシーケンスまたは文字列トークンの終了が取得される) に達するまで、以前に生成されたトークンに基づいてループ内の次のトークンを予測します。予測される確率に基づいて次のトークンが選択される方法は、選択されたデコード方法によって決まります。最も一般的なデコード方法の詳細については、このブログをご覧ください。Hugging Face Transformers ライブラリーのモデル生成プロセスのエントリーポイントは、generate メソッドです。パラメーターと構成の詳細については、ドキュメントを参照してください。選択デコード方法論の柔軟性を維持するため、1 つのステップでモデル推論のみを変換します。

推論フローは最初のステップと次のステップで異なります。最初のステップでは、モデルは前処理された入力命令と画像を受け入れ、input_embedding モデルと image_encoder モデルを使用して統合埋め込み空間に変換します。その後、モデルの LLM ベースの部分が入力埋め込みに対して実行され、次に生成されるトークンの確率を予測します。次のステップでは、language_model はサンプリング戦略に基づいて選択され、input_embedding モデルとキャッシュされたアテンション・キーと値によって処理された次のトークン ID のみを受け入れます。出力側は自動回帰であるため、出力トークンの非表示状態は、その後の生成ステップごとに計算されると同じままになります。したがって、新しいトークンを生成するたびに再計算するのは無駄であるように思えます。キャッシュを使用すると、モデルは計算後に非表示の状態を保存します。モデルは各タイムステップで最後に生成された出力トークンのみを計算し、保存された出力トークンを非表示のトークンに再利用します。これにより、変圧器モデルの生成の複雑さが O(n3) から O(n2) に軽減されます。仕組みの詳細については、この記事を参照してください。

上記をまとめると、モデルは 3 つの部分で構成されています:

  • 入力画像を埋め込み空間にエンコードする画像エンコーダー

  • 入力テキストトークンを埋め込み空間に変換する入力埋め込み

  • 画像エンコーダーと入力埋め込みモデルによって提供される入力埋め込みに基づいて回答を生成する言語モデル

各モデルパーツを変換してみます。

画像エンコーダー#

画像エンコーダーは、事前トレーニング済みの CLIP モデルによって LLaVA で表現されます。

import torch 
import openvino as ov 
import gc 

def cleanup_torchscript_cache(): 
    """ 
    Helper for removing cached model representation 
    """ 
    torch._C._jit_clear_class_registry() 
    torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore() 
    torch.jit._state._clear_class_state() 

if not IMAGE_ENCODER_PATH.exists(): 
    ov_image_encoder = ov.convert_model(image_encoder_model, example_input=torch.zeros((1, 5, 3, 336, 336))) 
    ov.save_model(ov_image_encoder, IMAGE_ENCODER_PATH) 
    del ov_image_encoder 
    cleanup_torchscript_cache() 

del image_encoder_model 
gc.collect();

テキスト埋め込み#

LLM では、入力埋め込みは言語モデルの一部ですが、LLaVA の場合、このモデル部分によって生成される最初のステップの隠し状態は、画像埋め込みと共通の埋め込み空間に統合される必要があります。このモデル部分を再利用し、llm モデル・インスタンスの導入を回避するため、個別に使用します。

llm_input = None 

if not LANGUAGE_MODEL_PATH.exists(): 
    llm_input = input_embedding_model(torch.ones((2, 2), dtype=torch.int64)) 

if not INPUT_EMBEDDING_PATH.exists(): 
    ov_input_embeddings_model = ov.convert_model(input_embedding_model, example_input=torch.ones((2, 2), dtype=torch.int64)) 
    ov.save_model(ov_input_embeddings_model, INPUT_EMBEDDING_PATH) 
    del ov_input_embeddings_model 
    cleanup_torchscript_cache() 

del input_embedding_model 
gc.collect();

言語モデル#

言語モデルは、LLaVA での回答の生成を行います。この部分は、テキスト生成の標準 LLM と非常によく似ています。ここでのモデルでは、ベース LLM として mistralai/Mistral-7B-Instruct-v0.2 を使用します。生成プロセスを最適化し、メモリーをさらに効率良く使用するため、HuggingFace トランスフォーマー API は、入力と出力で use_cache=True パラメーターと past_key_values 引数を使用してモデル状態を外部にキャッシュするメカニズムを提供します。キャッシュを使用すると、モデルは計算後に非表示の状態を保存します。モデルは各タイムステップで最後に生成された出力トークンのみを計算し、保存された出力トークンを非表示のトークンに再利用します。これにより、変圧器モデルの生成の複雑さが O(n3) から O(n2) に軽減されます。このオプションを使用すると、モデルは前のステップの非表示状態 (キャッシュされたアテンション・キーと値) を入力として取得し、さらに現在のステップの非表示状態を出力として提供します。これは、次のすべての反復では、前のステップから取得した新しいトークンと、次のトークン予測を取得するためのキャッシュされたキー値のみを提供するだけで十分であることを意味します。

最新の LLM のようにモデルサイズが大きくなると、アテンション・ブロックの数と過去のキー値テンソルのサイズもそれぞれ増加します。推論サイクルでキャッシュ状態をモデルの入力と出力として処理する方法は、特にチャットボットのシナリオなどで長い入力シーケンスを処理する場合、メモリー制限のあるシステムのボトルネックになる可能性があります。OpenVINO は、モデル内にキャッシュ処理ロジックを保持しながら、モデルからキャッシュテンソルを含む入力と対応する出力を削除する変換を提案します。このようなモデルはステートフルとも呼ばれます。ステートフル・モデルは、2 つの連続する推論呼び出しの間でデータを暗黙的に保存するモデルです。1 回の実行で保存されるテンソルは、state または variable と呼ばれる内部メモリーバッファーに保持され、モデルの出力となることはなく、次の実行に渡される可能性があります。キャッシュを非表示にすると、デバイスに適した表現でキャッシュ値を保存および更新できるようになります。メモリーの消費を削減し、さらにモデルのパフォーマンスを最適化するのに役立ちます。ステートフル・モデルとステートの操作の詳細については、OpenVINO のドキュメントを参照してください。

from typing import Optional, Tuple, List 
from openvino.runtime 
import opset13 import numpy as np 

def model_has_state(ov_model: ov.Model): 
    # TODO: 変数の可用性に基づいてさらに良い方法を提供しますが、OV Python API は必要なメソッドを公開していません 
    return len(ov_model.get_sinks()) > 0 

def model_has_input_output_name(ov_model: ov.Model, name: str): 
    """
    Helper function for checking that model has specified input or output name 

    Parameters: 
        ov_model (ov.Model): # TODO: モデルトポロジーから次元を導き出すことはできますか? 
    name (str): 
        name of input or output 

    Returns:
        True if input or output with requested name exists else False 
    """ 
    return name in sum([list(t.get_names()) for t in ov_model.inputs + ov_model.outputs], []) 

def fuse_cache_reorder( 
    ov_model: ov.Model, 
    not_kv_inputs: List[str], 
    key_value_input_names: List[str], 
    gather_dim: int, 
): 
    """
    Fuses reored_cache during generate cycle into ov.Model.Used with stateful models, because we can not modify model state directly. 
    Adds a new beam_idx parameter and Gather op per each kv-cache input in a given model. 
    Should be run before make_stateful.Implements optimumum's _reorder_cache 
    inside the model in the beginning of each iteration. 
    Gather works along given gather_dim dimension that may vary from model to model. 
    KV-cache inputs are identified based on names in key_value_input_names. 
    Append the new beam_idx parameter to not_kv_inputs.

    Parameters: 
        ov_model (`ov.Model`): 
            openvino model for processing 
        not_kv_inputs (`List[str]`): 
            list of input nodes in model that not related to past key values 
        key_value_input_names (`List[str]`): 
            list of names for key value input layers 
        gather_dim (int): 
            dimension for gathering cache during reorder pass 
    """ 

    if model_has_input_output_name(ov_model, "beam_idx"): 
        raise ValueError("Model already has fused cache") 
    input_batch = ov_model.input("inputs_embeds").get_partial_shape()[0] 
    beam_idx = opset13.parameter(name="beam_idx", dtype=ov.Type.i32, shape=ov.PartialShape([input_batch])) 
    beam_idx.output(0).get_tensor().add_names({"beam_idx"}) # why list is not accepted? 
    ov_model.add_parameters([beam_idx]) 
    not_kv_inputs.append(ov_model.inputs[-1]) 
    # すべてのキャッシュ・パラメーターを確認し、新しいパラメーター beam_idx によって提供されるインデックスと _reorder_cache を融合 
    for input_name in key_value_input_names: 
        parameter_output_port = ov_model.input(input_name) 
        consumers = parameter_output_port.get_target_inputs() 
        gather = opset13.gather(parameter_output_port, beam_idx, opset13.constant(gather_dim)) 
        for consumer in consumers: 
            consumer.replace_source_output(gather.output(0)) 
    ov_model.validate_nodes_and_infer_types() 

def build_state_initializer(ov_model: ov.Model, batch_dim: int): 
    """ 
    Build initialization ShapeOf Expression for all ReadValue ops 

    Parameters: 
        ov_model (ov.Model): 
            openvino model 
        batch_dim (int): 
            index of dimension corresponding to batch size 
    """ 
    input_ids = ov_model.input("inputs_embeds") 
    batch = opset13.gather( 
        opset13.shape_of(input_ids, output_type="i64"), 
        opset13.constant([0]), 
        opset13.constant(0), 
    ) 
    for op in ov_model.get_ops(): 
        if op.get_type_name() == "ReadValue": 
            dims = [dim.min_length for dim in list(op.get_output_partial_shape(0))] 
            dims[batch_dim] = batch 
            dims = [(opset13.constant(np.array([dim], dtype=np.int64)) if isinstance(dim, int) else dim) for dim in dims] 
            shape = opset13.concat(dims, axis=0) 
            broadcast = opset13.broadcast(opset13.constant(0.0, dtype=op.get_output_element_type(0)), shape) 
            op.set_arguments([broadcast]) 
    ov_model.validate_nodes_and_infer_types() 

def make_stateful( 
    ov_model: ov.Model, 
    not_kv_inputs: List[str], 
    key_value_input_names: List[str], 
    key_value_output_names: List[str], 
    batch_dim: int, 
    num_attention_heads: int, 
    num_beams_and_batch: int = None, 
): 
    """ 
    Hides kv-cache inputs and outputs inside the model as variables.
     Parameters: 
        ov_model (ov.Model): 
            openvino model 
        not_kv_inputs (`List[str]`): 
            list of input nodes in model that not related to past key values 
        key_value_input_names (`List[str]`): 
            list of names for key value input layers 
        key_value_output_names (`List[str]`): 
            list of names for key value input layers 
        batch_dim (int): 
            index of batch dimension in key value layers 
        num_attention_heads (int): 
            number of attention heads for batch dimension initialization 
        num_beams_an_batch (int): 
            precalculated number of beams and batch for shapes initialization 
    """ 
    from openvino._offline_transformations import apply_make_stateful_transformation 

    input_output_map = {}
 
    if num_beams_and_batch is not None:
        # モデルの最後から ReadValue に動的次元が伝播されるのを避けるため、input_ids とアテンション・マスクのバッチサイズを設定 
        for input in not_kv_inputs: 
            shape = input.get_partial_shape() 
            if shape.rank.get_length() <= 2:  # == 1 for beam_index 
                shape[0] = num_beams_and_batch 
                input.get_node().set_partial_shape(shape) 
    for kv_name_pair in zip(key_value_input_names, key_value_output_names): 
        input_output_map[kv_name_pair[0]] = kv_name_pair[1] 
        if num_beams_and_batch is not None: 
            input = ov_model.input(kv_name_pair[0]) 
            shape = input.get_partial_shape() 
            shape[batch_dim] = num_beams_and_batch * num_attention_heads 
            input.get_node().set_partial_shape(shape) 

    if num_beams_and_batch is not None:
        # 上記の形状が変更された場合の再検証モデル 
        ov_model.validate_nodes_and_infer_types() 

    apply_make_stateful_transformation(ov_model, input_output_map) 
    if num_beams_and_batch is None: 
        build_state_initializer(ov_model, batch_dim) 

def patch_stateful(ov_model): 
    key_value_input_names = [key.get_any_name() for key in ov_model.inputs[2:-1]] 
    key_value_output_names = [key.get_any_name() for key in ov_model.outputs[1:]] 
    not_kv_inputs = [input for input in ov_model.inputs if not any(name in key_value_input_names for name in input.get_names())]
    if not key_value_input_names or not key_value_output_names: 
        return 
    batch_dim = 0 
    num_attention_heads = 1 

    fuse_cache_reorder(ov_model, not_kv_inputs, key_value_input_names, batch_dim) 
    make_stateful( 
        ov_model, 
        not_kv_inputs, 
        key_value_input_names, 
        key_value_output_names, 
        batch_dim, 
        num_attention_heads, 
        None, 
    )
make_stateful_model = True 
core = ov.Core() 

if not LANGUAGE_MODEL_PATH.exists(): 
    pkv = language_model(inputs_embeds=llm_input, attention_mask=torch.ones((2, 2), dtype=torch.int64))[1] 
    model_inputs = ["attention_mask", "position_ids"] 
    model_outputs = ["logits"] 
    for idx in range(len(pkv)): 
        model_inputs.extend([f"past_key_values.{idx}.key", f"past_key_values.{idx}.value"]) 
        model_outputs.extend([f"present.{idx}.key", f"present.{idx}.value"]) 
    model_inputs.append("inputs_embeds") 
    language_model.config.torchscript = True 
    position_ids = torch.tensor([[2, 3], [2, 3]]) 
    ov_model = ov.convert_model( 
        language_model, 
        example_input={ 
            "inputs_embeds": llm_input, 
            "attention_mask": torch.ones((2, 4)), 
            "past_key_values": pkv, 
            "position_ids": position_ids, 
        }, 
    ) 

    for input, input_name in zip(ov_model.inputs, model_inputs): 
        input.get_tensor().set_names({input_name}) 

    for output, output_name in zip(ov_model.outputs, model_outputs): 
        output.get_tensor().set_names({output_name}) 
    if make_stateful_model: 
        patch_stateful(ov_model) 
    ov.save_model(ov_model, LANGUAGE_MODEL_PATH) 
    del ov_model 
    cleanup_torchscript_cache() 
    del language_model 
    gc.collect()

言語モデルの重みを 4 ビットに圧縮#

メモリー消費を削減するため、NNCF を使用して重み圧縮を最適化できます。重み圧縮は、モデルのメモリー使用量を削減することを目的としています。また、大規模言語モデル (LLM) など、メモリーに依存する大規模なモデルのパフォーマンスが大幅に向上する可能性もあります。LLM やその他のモデルは、推論中に重みを保存する大量のメモリーを必要とするため、次の方法で重み圧縮の利点を得られます:

  • デバイスのメモリーに格納できない大規模なモデルの推論を可能にします。

  • 線形レイヤーなどの重みを使用した演算を行う際のメモリーアクセス・レイテンシーを短縮することで、モデルの推論パフォーマンスを向上させます。

ニューラル・ネットワーク圧縮フレームワーク (NNCF) は、主に LLM の最適化向けに設計された圧縮方法として、4 ビット / 8 ビット混合重み量子化を提供します。重み圧縮とフルモデル量子化 (トレーニング後の量子化) 違いは、重み圧縮のでは、活性化が浮動小数点のままであるため、精度が向上することです。LLM の重み圧縮は、完全なモデル量子化のパフォーマンスに匹敵する推論パフォーマンスの向上をもたらします。さらに、重み圧縮はデータに依存せず、キャリブレーション・データセットも必要としないため、容易に利用できます。

nncf.compress_weights 関数は重み圧縮の実行に使用できます。この関数は、OpenVINO モデルとその他の圧縮パラメーターを受け入れます。INT8 圧縮と比較して、INT4 圧縮はパフォーマンスをさらに向上させますが、予測品質は若干低下します。

重み圧縮の詳細については、OpenVINO のドキュメントを参照してください。

注: 重み圧縮プロセスを実行するには、さらに時間とメモリーが必要になる場合があります。以下のウィジェットで無効化できます:

import ipywidgets as widgets 

to_compress_weights = widgets.Checkbox( 
    value=True, 
    description="Weights Compression", 
    disabled=False, 
) 

to_compress_weights
Checkbox(value=True, description='Weights Compression')
import nncf 

compression_configuration = { 
    "mode": nncf.CompressWeightsMode.INT4_SYM, 
    "group_size": 64, 
    "ratio": 0.6, 
} 

LANGUAGE_MODEL_PATH_INT4 = LANGUAGE_MODEL_PATH.parent / LANGUAGE_MODEL_PATH.name.replace(".xml", "-int4.xml") 
if to_compress_weights.value and not LANGUAGE_MODEL_PATH_INT4.exists(): 
    ov_model = core.read_model(LANGUAGE_MODEL_PATH) 
    ov_compressed_model = nncf.compress_weights(ov_model, **compression_configuration) 
    ov.save_model(ov_compressed_model, LANGUAGE_MODEL_PATH_INT4) 
    del ov_compressed_model 
    del ov_model 
    gc.collect()
INFO:nncf:NNCF initialized successfully.Supported frameworks detected: torch, tensorflow, onnx, openvino

画像エンコーダーを 8 ビットに量子化#

このチュートリアルは、NNCF(Neural Network Compression Framework) から 8 ビットのトレーニング後の量子化を適用して画像エンコーダーを高速化し、OpenVINO™ ツールキットを介して量子化されたモデルを推論する方法を示します。NNCF は、量子化レイヤーをモデルグラフに追加し、トレーニング・データセットのサブセットを使用してこれらの追加の量子化レイヤーのパラメーターを初期化することで、トレーニング後の量子化を可能にします。量子化操作は FP32/FP16 ではなく INT8 で実行されるため、モデル推論が高速化されます。最適化プロセスには次の手順が含まれます:

  1. 量子化データセットを準備

  2. NNCF を使用して変換された OpenVINO モデルを量子化します。

  3. 次回に備えて量子化されたモデルをディスクに保存します。

注: 量子化プロセスを実行するには、さらに時間とメモリーが必要になる場合があります。以下のウィジェットで無効化できます:

to_quantize = widgets.Checkbox( 
    value=True, 
    description="Quantization", 
    disabled=False, 
) 

to_quantize
Checkbox(value=True, description='Quantization')
IMAGE_ENCODER_PATH_INT8 = IMAGE_ENCODER_PATH.parent / IMAGE_ENCODER_PATH.name.replace(".xml", "-int8.xml") 

import requests 

r = requests.get( 

url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py", 
) 
open("skip_kernel_extension.py", "w").write(r.text) 

%load_ext skip_kernel_extension

データセットの準備#

キャプションのアノテーションが付けられた約 330 万個の画像で構成される Conceptual Captions データセットは、モデルの量子化に使用されます。

%%skip not $to_quantize.value 

import requests 
from io import BytesIO 
import numpy as np 
from PIL import Image 
from requests.packages.urllib3.exceptions import InsecureRequestWarning 
requests.packages.urllib3.disable_warnings(InsecureRequestWarning) 

def get_pil_from_url(url):
    """ 
    Downloads and converts an image from a URL to a PIL Image object.
    """ 
    response = requests.get(url, verify=False, timeout=20) 
    image = Image.open(BytesIO(response.content)) 
    return image.convert("RGB") 

def collate_fn(example, image_column="image_url"):
    """ 
    Preprocesses an example by loading and transforming image and text data.
    Checks if the text data in the example is valid by calling the `check_text_data` function.
    Downloads the image specified by the URL in the image_column by calling the `get_pil_from_url` function.
    If there is any error during the download process, returns None.
    Returns the preprocessed inputs with transformed image and text data.
    """ 
    assert len(example) == 1 
    example = example[0] 

    url = example[image_column] 
    try: 
        image = get_pil_from_url(url) 
        h, w = image.size 
        if h == 1 or w == 1: 
            return None 
    except Exception: 
        return None 

    inputs = processor.image_processor(images=[image], return_tensors="pt") 
    return inputs
%%skip not $to_quantize.value 

import torch 
from datasets import load_dataset 
from tqdm.notebook import tqdm 

def prepare_calibration_data(dataloader, init_steps):
    """ 
    This function prepares calibration data from a dataloader for a specified number of initialization steps.
    It iterates over the dataloader, fetching batches and storing the relevant data.
    """ 
    data = [] 
    print(f"Fetching {init_steps} samples for the initialization...") 
    with tqdm(total=init_steps) as pbar: 
        for batch in dataloader: 
            if len(data) == init_steps: 
                break 
            if batch: 
                pbar.update(1) 
                with torch.no_grad(): 
                    data.append( { "pixel_values": batch["pixel_values"].to("cpu") } ) 
    return data 

def prepare_dataset(opt_init_steps=50, max_train_samples=1000):
    """ 
    Prepares a vision-text dataset for quantization.
    """ 
    dataset = load_dataset("google-research-datasets/conceptual_captions", trust_remote_code=True) 
    train_dataset = dataset["train"].shuffle(seed=42) 
    dataloader = torch.utils.data.DataLoader(train_dataset, collate_fn=collate_fn, batch_size=1) 
    calibration_data = prepare_calibration_data(dataloader, opt_init_steps) 
    return calibration_data
%%skip not $to_quantize.value 

vcalibration_data = [] 
if not IMAGE_ENCODER_PATH_INT8.exists(): 
    calibration_data = prepare_dataset()

量子化を行います#

事前トレーニングされたモデルから量子化モデルを作成します。

: 量子化は時間とメモリーを消費する操作です。以下の量子化コードの実行には時間がかかる場合があります。

%%skip not $to_quantize.value 

if not IMAGE_ENCODER_PATH_INT8.exists(): 
    if len(calibration_data) == 0: 
        raise RuntimeError( 'Calibration dataset is empty.Please check internet connection and try to download images manually.'         ) 
    ov_model = core.read_model(IMAGE_ENCODER_PATH) 
    calibration_dataset = nncf.Dataset(calibration_data) 
    quantized_model = nncf.quantize( 
        model=ov_model, 
        calibration_dataset=calibration_dataset, 
        model_type=nncf.ModelType.TRANSFORMER, 
        subset_size=len(calibration_data), 
        # スムーズ・クォンタム・アルゴリズムは活性化量子化誤差を削減し、グリッドサーチを通じて最適なアルファ値を取得 
        advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alpha=0.6)
    ) 
    ov.save_model(quantized_model, IMAGE_ENCODER_PATH_INT8) 
    del ov_model 
    del quantized_model 
    gc.collect()

モデル推論パイプラインの準備#

image0

クラスは、生成シナリオでモデルを使用するのに使いやすいインターフェイスを提供します。これは、HuggingFace Transformers ライブラリーに実装されている生成のすべてのリーチ機能を再利用する可能性をもたらす、transformers.generation.GenerationMixin に基づいています。このインターフェイスの詳細については、HuggingFace のドキュメントを参照してください。

import torch 
from transformers.generation import GenerationConfig, GenerationMixin 
from transformers.modeling_outputs import CausalLMOutputWithPast 
from transformers import AutoConfig 
from transformers.models.llava_next.modeling_llava_next import ( 
    get_anyres_image_grid_shape, 
    unpad_image, 
) 
import openvino as ov 

class OVLlavaForCausalLM(GenerationMixin): 
    def __init__( 
        self, 
        core, 
        image_encoder_path, 
        input_embedding_path, 
        language_model_path, 
        device, 
    ): 
        self.image_encoder = core.compile_model(core.read_model(image_encoder_path), device) 
        self.input_embeddings = core.compile_model(core.read_model(input_embedding_path), device) 
        self.model = core.read_model(language_model_path) 
        self.input_names = {key.get_any_name(): idx for idx, key in enumerate(self.model.inputs)} 
        self.output_names = {idx: key for idx, key in enumerate(self.model.outputs)} 
        self.key_value_input_names = [key for key in list(self.input_names) if key not in ["beam_idx", "inputs_embeds", "attention_mask", "position_ids"]] 
        self.key_value_output_names = [key for key in list(self.output_names)[1:]] 
        self.stateful = len(self.key_value_input_names) == 0 
        compiled_model = core.compile_model(self.model, device) 
        self.request = compiled_model.create_infer_request() 
        self.config = AutoConfig.from_pretrained(Path(language_model_path).parent) 
        self.generation_config = GenerationConfig.from_model_config(self.config) 
        self.main_input_name = "input_ids" 
        self.device = torch.device("cpu") 
        self.num_pkv = 2 
        self.next_beam_idx = None 
        self.image_newline = torch.zeros(self.config.text_config.hidden_size, dtype=torch.float32) 
        self.pad_token_id = self.config.pad_token_id if self.config.pad_token_id is not None else -1 
        self.past_len = 0 
        self._supports_cache_class = False 

def can_generate(self): 
    """Returns True to validate the check that the model using `GenerationMixin.generate()` can indeed generate.""" 
        return True 

    def __call__( 
        self, 
        input_ids: torch.LongTensor, 
        pixel_values: torch.Tensor, 
        attention_mask: Optional[torch.LongTensor] = None, 
        past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, 
        position_ids: Optional[torch.LongTensor] = None, 
        image_sizes=None, 
        **kwargs, 
    ) -> CausalLMOutputWithPast: 
        return self.forward( 
            input_ids, 
            pixel_values, 
            attention_mask, 
            past_key_values, 
            position_ids, 
            image_sizes, 
            **kwargs, 
        ) 

    def forward( 
        self, 
        input_ids: torch.LongTensor, 
        pixel_values: torch.Tensor, 
        attention_mask: Optional[torch.LongTensor] = None, 
        past_key_values: Optional[Tuple[Tuple[torch.FloatTensor]]] = None, 
        position_ids: Optional[torch.LongTensor] = None, 
        image_sizes=None, 
        **kwargs, 
    ) -> CausalLMOutputWithPast: 
        """General inference method""" 
        inputs = {} 
        if past_key_values is not None: 
            inputs = {} 
            if not self.stateful: 
                past_key_values = tuple(past_key_value for pkv_per_layer in past_key_values for past_key_value in pkv_per_layer) 
                # past_key_values をデコーダー入力に追加 
                inputs = dict(zip(self.key_value_input_names, past_key_values)) 
            # input_ids = np.array(input_ids)[:, -1:] 
            inputs_embeds = self.input_embeddings(input_ids)[0] 
            inputs["inputs_embeds"] = inputs_embeds 
            # inputs["attention_mask"] = attention_mask 
            if "beam_idx" in self.input_names: 
                inputs["beam_idx"] = self.next_beam_idx if self.next_beam_idx is not None else np.arange(batch_size, dtype=int) 

            if not self.stateful: 
                first_layer_past_key_value = torch.from_numpy(past_key_values[0][0][:, :, :, 0]) 
            else: 
                first_layer_past_key_value = torch.from_numpy(self.request.query_state()[0].state.data[:, :, :, 0]) 

            # ランダムエラーを避けるために、head_dim (-2) のすべての次元を合計:
            # https://github.com/huggingface/transformers/pull/28032#issuecomment-1863691941 
            batch_index, non_attended_tokens = torch.where(first_layer_past_key_value.float().sum(-2) == 0) 

            # ターゲット長を取得 
            target_length = input_ids.shape[1] 
            past_length = first_layer_past_key_value.shape[-1] 

            extended_attention_mask = torch.ones( 
                (attention_mask.shape[0], past_length), 
                dtype=attention_mask.dtype, 
                device=attention_mask.device, 
            ) 

            # 無視できるトークンのみをフィルタリングします。 
            # これは、最初の反復でキャッシュがすでに十分に大きい Llava + Fused モジュールを使用する場合、 
            # またはカスタムキャッシュを渡す場合に発生する可能性があります。 
            valid_indices = non_attended_tokens < extended_attention_mask.size(-1) 
            new_batch_index = batch_index[valid_indices] 
            new_non_attended_tokens = non_attended_tokens[valid_indices] 

            # 必要のない場所をゼロにする 
            extended_attention_mask[new_batch_index, new_non_attended_tokens] = 0 

            attention_mask = torch.cat((extended_attention_mask, attention_mask[:, -target_length:]), dim=1) 
            position_ids = torch.sum(attention_mask, dim=1).unsqueeze(-1) - 1 
            inputs["attention_mask"] = attention_mask 
            inputs["position_ids"] = position_ids
        else: 
            inputs = self.prepare_multimodal_input(input_ids, pixel_values, attention_mask, position_ids, image_sizes) 

        # 推論を実行 
        self.request.start_async(inputs, share_inputs=True) 
        self.request.wait() 

        logits = torch.from_numpy(self.request.get_tensor(self.output_names[0]).data) 

        if not self.stateful:
            # 長さが等しいタプル: レイヤー数 * デコーダーレイヤーごとの past_key_value の数 (2 は自己注意レイヤーに対応) 
            past_key_values = tuple(self.request.get_tensor(key).data for key in self.key_value_output_names) 
            # 長さが `n_layers` のタプルのタプル。各タプルの長さは 2 (自己注意の k/v) に等しい 
            past_key_values = tuple(past_key_values[i : i + self.num_pkv] for i in range(0, len(past_key_values), self.num_pkv)) 
        else: 
            past_key_values = ((),) 
        self.past_len += inputs["inputs_embeds"].shape[1] 
        return CausalLMOutputWithPast(logits=logits, past_key_values=past_key_values) 

    def prepare_multimodal_input(self, input_ids, pixel_values, attention_mask, position_ids, image_sizes=None): 
        """Preprocessing function for embedding multimodal data""」 
        inputs = {} 
        inputs_embeds = torch.from_numpy(self.input_embeddings(input_ids)[0]) 
        batch_size = input_ids.shape[0] 
        if not self.stateful: 
            for input_name in self.key_value_input_names: 
                model_inputs = self.modeget_anyres_image_grid_shapel.input(input_name) 
                shape = model_inputs.get_partial_shape() 
                shape[0] = batch_size 
                if shape[2].is_dynamic: 
                    shape[2] = 0 
                else: 
                    shape[1] = 0 
                inputs[input_name] = ov.Tensor(model_inputs.get_element_type(), shape.get_shape()) 
        else: 
            self.past_len = 0 
            self.request.reset_state() 
            # 現在の反復で使用される次の beam_idx 入力の初期値を設定し、 
            # beam_search が使用される場合は次の反復で _reorder_cache によってオプションで更新されます 
            self.next_beam_idx = np.arange(batch_size, dtype=int) 

        if "beam_idx" in self.input_names: 
            inputs["beam_idx"] = self.next_beam_idx if self.next_beam_idx is not None else np.arange(batch_size, dtype=int) 
        if pixel_values is None: 
            inputs["inputs_embeds"] = inputs_embeds 
            inputs["attention_mask"] = attention_mask 
            if position_ids is None: 
                position_ids = torch.cumsum(attention_mask, axis=1) - 1 
                position_ids[attention_mask == 0] = 1 
            inputs["position_ids"] = position_ids 
        res = self.image_encoder(pixel_values) 
        image_features = torch.from_numpy(res[0]) 
        split_sizes = [image.shape[0] for image in pixel_values] 
        image_features = torch.split(image_features, split_sizes, dim=0) 

        # 注: multimodal_patch_merge_type == "spatial_unpad" のみをサポートします 
        height = width = self.config.vision_config.image_size // self.config.vision_config.patch_size 

        new_image_features = [] 
        for image_idx, image_feature in enumerate(image_features): 
            if image_feature.shape[0] > 1: 
                base_image_feature = image_feature[0] 
                image_feature = image_feature[1:] 

                if height * width != base_image_feature.shape[0]: 
                    raise ValueError("The number of patches is not consistent with the image size.") 
                num_patch_height, num_patch_width = get_anyres_image_grid_shape( 
                    image_sizes[image_idx], 
                    self.config.image_grid_pinpoints, 
                    self.config.vision_config.image_size, 
                ) 
                image_feature = image_feature.view(num_patch_height, num_patch_width, height, width, -1) 

                image_feature = image_feature.permute(4, 0, 2, 1, 3).contiguous() 
                image_feature = image_feature.flatten(1, 2).flatten(2, 3) 
                image_feature = unpad_image(image_feature, image_sizes[image_idx]) 
                image_feature = torch.cat( 
                    ( 
                        image_feature, 
                        self.image_newline[:, None, None].expand(*image_feature.shape[:-1], 1), 
                    ), 
                    dim=-1, 
                ) 
                image_feature = image_feature.flatten(1, 2).transpose(0, 1) 
                image_feature = torch.cat((base_image_feature, image_feature), dim=0) 
            else: 
                image_feature = image_feature[0] 
                image_feature = torch.cat((image_feature, self.image_newline[None]), dim=0) 
            new_image_features.append(image_feature) 
        image_features = torch.stack(new_image_features, dim=0) 

        ( 
            inputs_embeds, 
            attention_mask, 
            position_ids, 
        ) = self._merge_input_ids_with_image_features(image_features, inputs_embeds, input_ids, attention_mask, None) 
        inputs["inputs_embeds"] = inputs_embeds 
        inputs["attention_mask"] = attention_mask 
        inputs["position_ids"] = position_ids 

        return inputs 

    def _merge_input_ids_with_image_features(self, image_features, inputs_embeds, input_ids, attention_mask, labels): 
        num_images, num_image_patches, embed_dim = image_features.shape 
        batch_size, sequence_length = input_ids.shape 
        left_padding = not torch.sum(input_ids[:, -1] == torch.tensor(self.pad_token_id)) 
        # 1. 特別な画像トークンがどこにあるかを知るマスクを作成 
        special_image_token_mask = input_ids == self.config.image_token_index 
        num_special_image_tokens = torch.sum(special_image_token_mask, dim=-1) 
        # 最大埋め込み次元を計算 
        max_embed_dim = (num_special_image_tokens.max() * (num_image_patches - 1)) + sequence_length 
        batch_indices, non_image_indices = torch.where(input_ids != self.config.image_token_index) 

        # 2. テキストを書き込む位置を計算 
        # 結合された画像からテキストへのシーケンス内のテキストトークンの新しい位置を計算 
        # special_image_token_mask` は画像トークンを識別。各画像トークンは `nb_text_tokens_per_images - 1` 個のテキストトークンに置き換えられます。 
        # `torch.cumsum` は、各画像トークンが後続のテキストトークンの位置をどのようにシフトするかを計算
        # `cumsum`  は本質的にインデックスを 1 ずつ増やすため、ゼロベースのインデックスを調整するために 1 を指定します。 
        new_token_positions = torch.cumsum((special_image_token_mask * (num_image_patches - 1) + 1), -1) - 1 
        nb_image_pad = max_embed_dim - 1 - new_token_positions[:, -1] 
        if left_padding: 
            new_token_positions += nb_image_pad[:, None] # 左パディングのオフセット 
        text_to_overwrite = new_token_positions[batch_indices, non_image_indices] 

        # 3. 最大位置までパディングされた完全な埋め込みを作成 
        final_embedding = torch.zeros( 
            batch_size, 
            max_embed_dim, 
            embed_dim, 
            dtype=inputs_embeds.dtype, 
            device=inputs_embeds.device, 
        ) 
        final_attention_mask = torch.zeros( 
            batch_size, 
            max_embed_dim, 
            dtype=attention_mask.dtype, 
            device=inputs_embeds.device, 
        ) 
        # ビジョンモデルまたは言語モデルが CPU にオフロードされている場合は、 
        # 対応するテンソルを正しいターゲットデバイスに手動で設定する必要があります 
        target_device = inputs_embeds.device 
        batch_indices, non_image_indices, text_to_overwrite = ( 
            batch_indices.to(target_device), 
            non_image_indices.to(target_device), 
            text_to_overwrite.to(target_device), 
        ) 
        attention_mask = attention_mask.to(target_device) 

        # 4. マスクに基づいて埋め込みを埋めます。["hey" "<image>", "how", "are"] がある場合、テキストについては 
        #  [0, 577, 578, 579] に、画像機能については [1:576] にコピーをインデックスする必要があります。 
        final_embedding[batch_indices, text_to_overwrite] = inputs_embeds[batch_indices, non_image_indices] 
        final_attention_mask[batch_indices, text_to_overwrite] = attention_mask[batch_indices, non_image_indices] 
        if labels is not None: 
            final_labels[batch_indices, text_to_overwrite] = labels[batch_indices, non_image_indices] 

        # 5. 画像に対応する埋め込みを入力。まだゼロになっているものはすべて埋めます 
        image_to_overwrite = torch.all(final_embedding == 0, dim=-1) 
        image_to_overwrite &= image_to_overwrite.cumsum(-1) - 1 >= nb_image_pad[:, None].to(target_device) 
        if image_to_overwrite.sum() != image_features.shape[:-1].numel(): 
            raise ValueError( 
                f"The input provided to the model are wrong.The number of image tokens is {torch.sum(special_image_token_mask)} while" 
                f" the number of image given to the model is {num_images}. This prevents correct indexing and breaks batch generation."            ) 

        final_embedding[image_to_overwrite] = image_features.contiguous().reshape(-1, embed_dim).to(target_device) 
        final_attention_mask |= image_to_overwrite 
        position_ids = (final_attention_mask.cumsum(-1) - 1).masked_fill_((final_attention_mask == 0), 1) 

        # 6. 後で past_key_value 値を使用して未対応のトークンを決定するため、パディング位置での埋め込みをマスク 
        batch_indices, pad_indices = torch.where(input_ids == self.pad_token_id) 
        indices_to_mask = new_token_positions[batch_indices, pad_indices] 

        final_embedding[batch_indices, indices_to_mask] = 0 

        return final_embedding, final_attention_mask, position_ids 

    def prepare_inputs_for_generation( 
        self, 
        input_ids, 
        past_key_values=None, 
        inputs_embeds=None, 
        pixel_values=None, 
        image_sizes=None, 
        attention_mask=None, 
        **kwargs, 
    ): 
        if past_key_values is not None: 
            if not self.stateful: 
                cache_length = past_length = past_key_values[0][0].shape[2] 
            else: 
                cache_length = past_length = self.past_len 

            # 未処理のトークンのみ保存:
            # 1 - attention_mask の長さが input_ids の長さを超える場合、 
            # 入力の一部がキャッシュの一部として排他的に渡される設定になります 
            # (例: input_embeds を入力として渡す場合) 
            if attention_mask is not None and attention_mask.shape[1] > input_ids.shape[1]: 
                input_ids = input_ids[:, -(attention_mask.shape[1] - past_length) :] 
            # 2 - past_length が input_ids より小さい場合、input_ids はすべての入力トークンを保持します。 
            # past_length.llava に基づいて input_ids を破棄できます 
            elif past_length < input_ids.shape[1]: 
                input_ids = input_ids[:, past_length:]
            # 3 - それ以外の場合 (past_length >= input_ids.shape[1])、input_ids には未処理のトークンのみがあると仮定 
            elif self.config.image_token_index in input_ids: 
                input_ids = input_ids[:, input_ids.shape[1] - 1 :] 
            # キャッシュが保持できる数を超えるトークンが検出された場合、キャッシュにはサイズ制限があります。 
            # 対応する値は入力の一部ではないため、古いアテンション値を破棄します 
            if cache_length < past_length and attention_mask is not None: 
                attention_mask = attention_mask[:, -(cache_length + input_ids.shape[1]) :] 

        position_ids = kwargs.get("position_ids", None) 
        if attention_mask is not None and position_ids is None:
            # バッチ gllavaenerationsubset_siz の position_ids をオンザフライで作成 
            position_ids = attention_mask.long().cumsum(-1) - 1 
            position_ids.masked_fill_(attention_mask == 0, 1) 
            if past_key_values: 
                position_ids = position_ids[:, -input_ids.shape[1] :]
        # `inputs_embeds` が渡された場合、最初の生成ステップでのみ使用 
        if inputs_embeds is not None and past_key_values is None: 
            model_inputs = {"inputs_embeds": inputs_embeds} 
        else: 
            model_inputs = {"input_ids": input_ids} 

        model_inputs.update( 
            { 
                "position_ids": position_ids, 
                "past_key_values": past_key_values, 
                "use_cache": kwargs.get("use_cache"), 
                "attention_mask": attention_mask, 
                "pixel_values": pixel_values, 
                "image_sizes": image_sizes, 
            } 
        ) 
        return model_inputs

OpenVINO モデル推論を実行#

デバイスを選択#

core = ov.Core() 

support_devices = core.available_devices 
if "NPU" in support_devices: 
    support_devices.remove("NPU") 

device = widgets.Dropdown( 
    options=support_devices + ["AUTO"], 
    value="CPU", 
    description="Device:", 
    disabled=False, 
) 

device
Dropdown(description='Device:', options=('CPU', 'GPU.0', 'GPU.1'), value='CPU')
use_int4_lang_model = widgets.Checkbox( 
    value=LANGUAGE_MODEL_PATH_INT4.exists(), 
    description="INT4 language model", 
    disabled=not LANGUAGE_MODEL_PATH_INT4.exists(), 
) 

use_int4_lang_model
Checkbox(value=True, description='INT4 language model')
use_int8_image_encoder = widgets.Checkbox( 
    value=IMAGE_ENCODER_PATH_INT8.exists(), 
    description="INT8 image encoder", 
    disabled=not IMAGE_ENCODER_PATH_INT8.exists(), 
) 

use_int8_image_encoder
Checkbox(value=True, description='INT4 language model')
lang_model_path = LANGUAGE_MODEL_PATH_INT4 if use_int4_lang_model.value else LANGUAGE_MODEL_PATH 
image_encoder_path = IMAGE_ENCODER_PATH_INT8 if use_int8_image_encoder.value else IMAGE_ENCODER_PATH 

ov_llava_model = OVLlavaForCausalLM(core, image_encoder_path, INPUT_EMBEDDING_PATH, lang_model_path, device.value)
from PIL import Image 
import requests 

from transformers import TextStreamer 
url = "https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/d5fbbd1a-d484-415c-88cb-9986625b7b11" 
image = Image.open(requests.get(url, stream=True).raw) 
question = "What is unusual on this image?" 
prompt = f"[INST] <image>\n{question}[/INST]" 
streamer = TextStreamer(processor, skip_special_tokens=True, skip_prompt=True) 

inputs = processor(prompt, image, return_tensors="pt") 
print(f"Question:\n{question}") 
image
Question: What is unusual on this image?
../_images/llava-next-multimodal-chatbot-with-output_32_1.png
print("Answer:") 
streamer = TextStreamer(processor, skip_special_tokens=True, skip_prompt=True) 
output = ov_llava_model.generate(**inputs, max_new_tokens=49, streamer=streamer)
Setting pad_token_id to eos_token_id:2 for open-end generation.
Answer: The image shows a cat lying on its back inside a cardboard box. What's unusual is that the cat appears to be in a relaxed and somewhat human-like pose, with its paws up in the air and its belly exposed.

インタラクティブなデモ#

import gradio as gr 
from transformers import TextIteratorStreamer 
from threading import Thread 
from PIL import Image 
import torch 

example_image_urls = [ 
    ( 
        "https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/1d6a0188-5613-418d-a1fd-4560aae1d907", 
        "bee.jpg",
    ), 
    ( 
        "https://github.com/openvinotoolkit/openvino_notebooks/assets/29454499/6cc7feeb-0721-4b5d-8791-2576ed9d2863", 
        "baklava.png", 
    ), 
] 
for url, file_name in example_image_urls:
     Image.open(requests.get(url, stream=True).raw).save(file_name) 

def bot_streaming(message, history): 
    print(message) 
    if message["files"]: 
        image = message["files"][-1]["path"] if isinstance(message["files"][-1], dict) else message["files"][-1] 
    else:
        # このターンに画像がアップロードされていない場合は、 
        # タプル内に保存されている過去のターンの画像を探し、最後のものを取得 
        for hist in history: 
            if isinstance(hist[0], tuple): 
                image = hist[0][0] 

    if image is None: 
        gr.Error("You need to upload an image for LLaVA to work.") 
    prompt = f"[INST] <image>\n{message['text']} [/INST]" 
    image = Image.open(image).convert("RGB") 
    inputs = processor(prompt, image, return_tensors="pt") 

    streamer = TextIteratorStreamer(processor, **{"skip_special_tokens": True}) 
    generation_kwargs = dict(inputs, streamer=streamer, max_new_tokens=100) 

    thread = Thread(target=ov_llava_model.generate, kwargs=generation_kwargs) 
    thread.start() 

    text_prompt = f"[INST] \n{message['text']} [/INST]" 

    buffer = "" 
    for new_text in streamer: 
        buffer += new_text 
        generated_text_without_prompt = buffer[len(text_prompt) :] 
        yield generated_text_without_prompt 

demo = gr.ChatInterface( 
    fn=bot_streaming, 
    title="LLaVA NeXT", 
    examples=[ 
        {"text": "What is on the flower?", "files": ["./bee.jpg"]}, 
        {"text": "How to make this pastry?", "files": ["./baklava.png"]}, 
    ], 
    description="Try [LLaVA NeXT](https://huggingface.co/docs/transformers/main/en/model_doc/llava_next) in this demo using OpenVINO. Upload an image and start chatting about it, or simply try one of the examples below. If you don't upload an image, you will receive an error.", 
    stop_btn="Stop Generation", 
    multimodal=True, 
) 

try: 
    demo.launch(debug=False) 
except Exception: 
    demo.launch(debug=False, share=True)