Stable Diffusion v2 と OpenVINO™ によるテキストからの画像#

この Jupyter ノートブックは、ローカルへのインストール後にのみ起動できます。

GitHub

Stable Diffusion v2 は、Stability AILAION の研究者とエンジニアによって作成された、テキストから画像への潜在拡散モデルである Stable Diffusion の次世代モデルです。

一般に、拡散モデルは、画像などの対象サンプルを取得するために、ランダムなガウスノイズを段階的に除去するようにトレーニングされたマシン・ラーニング・システムです。拡散モデルは、画像データを生成するため最先端の結果を達成することが示されています。しかし、拡散モデルには、逆ノイズ除去プロセスが遅いという欠点があります。さらに、このモデルはピクセル空間で動作するため大量のメモリーを消費し、高解像度の画像を生成するには高いコストがかかります。そのため、このモデルをトレーニングし、推論に使用するのは困難です。OpenVINO は、インテルのハードウェア上でモデル推論を実行する機能を提供し、誰もが拡散モデルの素晴らしい世界への扉を開くことができます。

以前のノートブックでは、Stable Diffusion v1 を使用して Text-to-Image 生成と Image-to-Image 生成を実行し、ControlNet を使用してその生成プロセスを制御する方法について説明しました。ここでは Stable Diffusion v2 を使用します。

Stable Diffusion v2: 新機能#

新しい stable diffusion モデルは、最初の反復の導入以降に登場した他のモデルからヒントを得た新機能を提供します。新しいモデルに搭載されている機能の一部には次のものがあります:

  • このモデルには、LAION によって作成され、Stability AI によってサポートされた新しい強力なエンコーダー OpenCLIP が付属しています。このバージョン v2 では、生成される写真がバージョン V1 よりも大幅に強化されています。

  • モデルは 768x768 の解像度で画像を生成できるようになり、生成された画像に表示される情報がさらに増えました。

  • v-objective で微調整されたモデル。v-パラメーター化は、モデルの漸進的蒸留を可能にするために、拡散プロセス全体にわたって数値安定性を保つのに特に役立ちます。高解像度で動作するモデルの場合、v-パラメーター化によって、高解像度拡散モデルに影響することが知られている色シフト・アーティファクトが回避され、ビデオ設定では、Stable Diffusion v1 で使用されるイプシロン予測でたびたび発生する一時的な色シフトが回避されることも判明しました。

  • このモデルには、生成された画像のアップスケーリングを実行できる新しい拡散モデルも付属しています。アップスケールされた画像は、元の画像の最大 4 倍まで調整できます。別モデルとして提供されており、詳細については stable-diffusion-x4-upscaler を参照してください。

  • このモデルには、画像間の設定で前世代のレイヤーからのコンテキストを保持できる、新しい深度アーキテクチャーが搭載されています。この構造の保存により、オブジェクトの形と影を保存しながら、内容が異なる画像を生成することができます。

  • このモデルには、以前のモデルに基づいて構築された更新されたインペインティング・モジュールが付属しています。このテキストガイドによる修復により、画像内の部分の切り替えが以前よりも簡単になります。

このノートブックでは、OpenVINO を使用して Stable Diffusion v2 モデルを変換および実行する方法を示します。

これには次の手順が含まれます:

  1. Diffusers ライブラリーを使用して PyTorch モデル・パイプラインを作成します。

  2. モデル・トランスフォーメーション API を使用して、PyTorch モデルを OpenVINO IR 形式に変換します。

  3. NNCF を使用して、ハイブリッド・ポストトレーニング量子化を UNet モデルに適用します。

  4. OpenVINO を使用して Stable Diffusion v2 テキストから画像へのパイプラインを実行します。

注: これは、Stable Diffusion テキストから画像への実装の完全バージョンです。すぐにノートブックを実行したい場合は、stable-diffusion-v2-text-to-image-demo notebook を確認してください。

目次:

必要条件#

必要なパッケージをインストール

%pip install -q "diffusers>=0.14.0" "openvino>=2023.1.0" "datasets>=2.14.6" "transformers>=4.25.1" "gradio>=4.19" "torch>=2.1" Pillow opencv-python --extra-index-url https://download.pytorch.org/whl/cpu 
%pip install -q "nncf>=2.9.0"
Note: you may need to restart the kernel to use updated packages.

テキストからの画像生成向けの Stable Diffusion v2#

最初に、Stable Diffusion v2 のテキストから画像への変換プロセスを見てみましょう。これには、Stable Diffusion v2-1 モデルを使用します。Stable Diffusion v2 と Stable Diffusion v2.1 の主な違いは、より多くのデータ、より多くのトレーニング、データセットの制限の少ないフィルター処理であり、これにより、幅広い入力テキストプロンプトの選択よる結果が得られます。モデルの詳細については、Stability AI のブログ投稿と元のモデル・リポジトリーを参照してください。

Diffusers ライブラリーの Stable Diffusion#

Stable Diffusion v2 を使用するには、Hugging Face Diffusers ライブラリーを使用します。Stable Diffusion モデルを試すために、Diffusers は他の Diffusers パイプラインと同様に StableDiffusionPipeline を公開します。以下のコードは、stable-diffusion-2-1 を使用して StableDiffusionPipeline を作成する方法を示しています:

from diffusers import StableDiffusionPipeline

pipe = StableDiffusionPipeline.from_pretrained("stabilityai/stable-diffusion-2-1-base").to("cpu") 

# メモリー消費量を削減するために、パイプラインからすべてのコンポーネントを個別に取得 
text_encoder = pipe.text_encoder 
text_encoder.eval() 
unet = pipe.unet unet.eval() 
vae = pipe.vae 
vae.eval() 

conf = pipe.scheduler.config 

del pipe
Loading pipeline components...: 0%|          | 0/6 [00:00<?, ?it/s]

モデルを OpenVINO 中間表現 (IR) 形式に変換#

2023.0 リリース以降、OpenVINO はモデル・トランスフォーメーション API を介して PyTorch モデルを直接サポートします。ov.convert_model 関数は、PyTorch モデルのインスタンスとトレース用のサンプル入力を受け入れ、ov.Model クラスのオブジェクトを返します。このオブジェクトは、すぐに使用したり、ov.save_model 関数でディスクに保存したりできます。

パイプラインは次の 3 つの重要な部分で構成されます:

  • テキストプロンプトから画像を生成する条件を作成するテキスト・エンコーダー。

  • 段階的にノイズを除去する潜像表現のための U-Net。

  • 潜在空間を画像にデコードするオート・エンコーダー (VAE)。

各パーツを変換してみましょう:

テキスト・エンコーダー#

テキスト・エンコーダーは、入力プロンプト (例えば、“馬に乗った宇宙飛行士の写真”) を、U-Net が理解できる埋め込みスペースに変換する役割を果たします。これは通常、入力トークンのシーケンスを潜在テキスト埋め込みのシーケンスにマッピングする単純なトランスフォーマー・ベースのエンコーダーです。

テキスト・エンコーダーの入力はテンソル input_ids です。これには、トークナイザーによって処理され、モデルによって受け入れられる最大長までパディングされたテキストからのトークン・インデックスが含まれます。モデルの出力は第 2 章テンソルです: モデルの出力は 2 つのテンソルです: last_hidden_state - モデル内の最後の MultiHeadtention レイヤーからの非表示状態、および pooler_out - モデル全体の非表示状態のプールされた出力。

from pathlib import Path 

sd2_1_model_dir = Path("sd2.1") 
sd2_1_model_dir.mkdir(exist_ok=True)
import gc 
import torch 
import openvino as ov 

TEXT_ENCODER_OV_PATH = sd2_1_model_dir / "text_encoder.xml" 

def cleanup_torchscript_cache(): 
    """ 
    Helper for removing cached model representation 
    """ 
    torch._C._jit_clear_class_registry() 
    torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore() 
    torch.jit._state._clear_class_state() 

def convert_encoder(text_encoder: torch.nn.Module, ir_path: Path): 
    """ 
    Convert Text Encoder model to IR.
    Function accepts pipeline, prepares example inputs for conversion 
    Parameters: 
        text_encoder (torch.nn.Module): text encoder PyTorch model 
        ir_path (Path): File for storing model 
    Returns:
        None 
    """ 
    if not ir_path.exists(): 
        input_ids = torch.ones((1, 77), dtype=torch.long) 
        # モデルを推論モードに切り替え 
        text_encoder.eval() 

        # メモリー消費量を減らすために勾配計算を無効化 
        with torch.no_grad():
            # エクスポート・モデル 
            ov_model = ov.convert_model( 
                text_encoder, # モデル・インスタンス 
                example_input=input_ids, # モデルトレースの入力例 
                input=([1, 77],), # 変換のための入力形状 
            ) 
            ov.save_model(ov_model, ir_path) 
            del ov_model 
            cleanup_torchscript_cache() 
        print("Text Encoder successfully converted to IR") 

if not TEXT_ENCODER_OV_PATH.exists(): 
    convert_encoder(text_encoder, TEXT_ENCODER_OV_PATH) 
else: 
    print(f"Text encoder will be loaded from {TEXT_ENCODER_OV_PATH}") 

del text_encoder 
gc.collect();
Text encoder will be loaded from sd2.1/text_encoder.xml

U-Net#

U-Net モデルは、テキスト・エンコーダーの隠れ状態に基づいて、潜在画像表現のノイズを段階的に除去します。

U-Net モデルには 3 つの入力があります:

  • sample - 前のステップからの潜在画像サンプル。生成プロセスはまだ開始されていないため、ランダムノイズを使用します。

  • timestep - 現在のスケジューラー・ステップ。

  • encoder_hidden_state - テキスト・エンコーダーの非表示状態。

モデルは次のステップの sample の状態を予測します。

一般的に、U-Net モデル変換プロセスは Stable Diffusion v1 と同じままですが、入力サンプルサイズに変更が加えられることが予想されます。ここで使用するモデルは、解像度 768x768 の画像を生成するように事前トレーニングされており、初期潜在サンプルサイズは 96x96 です。さらに、修復や画像深度生成モデルなどのさまざまなユースケースでは、深度マップやマスクを初期潜在サンプルとチャネル単位で連結した追加の画像情報も受け入れることができます。このようなユースケースに合わせて U-Net モデルを変換するには、入力チャネル数を変更する必要があります。

import numpy as np 

UNET_OV_PATH = sd2_1_model_dir / "unet.xml" 

def convert_unet( 
    unet: torch.nn.Module, 
    ir_path: Path, 
    num_channels: int = 4, 
    width: int = 64, 
    height: int = 64, 
): 
    """ 
    Convert Unet model to IR format.
    Function accepts pipeline, prepares example inputs for conversion 
    Parameters: 
        unet (torch.nn.Module): UNet PyTorch model 
        ir_path (Path): File for storing model 
        num_channels (int, optional, 4): number of input channels 
        width (int, optional, 64): input width 
        height (int, optional, 64): input height 
    Returns:
        None 
    """ 
    dtype_mapping = {torch.float32: ov.Type.f32, torch.float64: ov.Type.f64} 
    if not ir_path.exists():
        # 入力を準備 
        encoder_hidden_state = torch.ones((2, 77, 1024)) 
        latents_shape = (2, num_channels, width, height) 
        latents = torch.randn(latents_shape) 
        t = torch.from_numpy(np.array(1, dtype=np.float32)) 
        unet.eval() 
        dummy_inputs = (latents, t, encoder_hidden_state) 
        input_info = [] 
        for input_tensor in dummy_inputs: 
            shape = ov.PartialShape(tuple(input_tensor.shape)) 
            element_type = dtype_mapping[input_tensor.dtype] 
            input_info.append((shape, element_type)) 

        with torch.no_grad(): 
            ov_model = ov.convert_model(unet, example_input=dummy_inputs, input=input_info) 
        ov.save_model(ov_model, ir_path) 
        del ov_model 
        cleanup_torchscript_cache() 
        print("U-Net successfully converted to IR") 

if not UNET_OV_PATH.exists(): 
    convert_unet(unet, UNET_OV_PATH, width=96, height=96) 
    del unet 
    gc.collect() 
else: 
    del unet 
gc.collect();

VAE#

VAE モデルには、エンコーダーとデコーダーの 2 つのパーツがあります。エンコーダーは、画像を低次元の潜在表現に変換するのに使用され、これが U-Net モデルの入力となります。逆に、デコーダーは潜在表現を変換して画像に戻します。

潜在拡散トレーニング中、エンコーダーは、順拡散プロセス用の画像の潜在表現 (潜在) を取得するために使用され、各ステップでより多くのノイズが適用されます。論中、逆拡散プロセスによって生成されたノイズ除去された潜在は、VAE デコーダーによって画像に変換されます。テキストから画像への推論を実行する場合、開始点となる初期画像はありません。この手順をスキップして、初期のランダムノイズを直接生成することもできます。

テキストから画像へのパイプラインを実行する場合、VAE デコーダーのみが必要であり、VAE エンコーダー変換を保持することがわかります。これは、チュートリアルの次の章で役立ちます。

注: このプロセスには数分かかり、大量の RAM (少なくとも 32 GB を推奨) が使用されます。

VAE_ENCODER_OV_PATH = sd2_1_model_dir / "vae_encoder.xml" 

def convert_vae_encoder(vae: torch.nn.Module, ir_path: Path, width: int = 512, height: int = 512): 
    """ 
    Convert VAE model to IR format.
    VAE model, creates wrapper class for export only necessary for inference part, 
    prepares example inputs for onversion 
    Parameters: 
        vae (torch.nn.Module): VAE PyTorch model 
        ir_path (Path): File for storing model 
        width (int, optional, 512): input width 
        height (int, optional, 512): input height 
    Returns:
        None 
    """ 

    class VAEEncoderWrapper(torch.nn.Module): 
        def __init__(self, vae): 
            super().__init__() 
            self.vae = vae 

        def forward(self, image): 
            return self.vae.encode(x=image)["latent_dist"].sample() 

        if not ir_path.exists(): 
            vae_encoder = VAEEncoderWrapper(vae) 
            vae_encoder.eval() 
            image = torch.zeros((1, 3, width, height)) 
            with torch.no_grad(): 
                ov_model = ov.convert_model(vae_encoder, example_input=image, input=([1, 3, width, height],)) 
            ov.save_model(ov_model, ir_path) 
            del ov_model 
            cleanup_torchscript_cache() 
            print("VAE encoder successfully converted to IR") 

def convert_vae_decoder(vae: torch.nn.Module, ir_path: Path, width: int = 64, height: int = 64): 
    """ 
    Convert VAE decoder model to IR format.
    Function accepts VAE model, creates wrapper class for export only necessary for inference part, 
    prepares example inputs for conversion, 
    Parameters: 
        vae (torch.nn.Module): VAE model 
        ir_path (Path): File for storing model 
        width (int, optional, 64): input width 
        height (int, optional, 64): input height 
    Returns:
        None 
    """ 

    class VAEDecoderWrapper(torch.nn.Module): 
        def __init__(self, vae): 
            super().__init__() 
            self.vae = vae 

        def forward(self, latents): 
            return self.vae.decode(latents) 

    if not ir_path.exists(): 
        vae_decoder = VAEDecoderWrapper(vae) 
        latents = torch.zeros((1, 4, width, height)) 

        vae_decoder.eval() 
        with torch.no_grad(): 
            ov_model = ov.convert_model(vae_decoder, example_input=latents, input=([1, 4, width, height],)) 
        ov.save_model(ov_model, ir_path) 
        del ov_model 
        cleanup_torchscript_cache() 
        print("VAE decoder successfully converted to IR") 

if not VAE_ENCODER_OV_PATH.exists(): 
    convert_vae_encoder(vae, VAE_ENCODER_OV_PATH, 768, 768) 
else: 
    print(f"VAE encoder will be loaded from {VAE_ENCODER_OV_PATH}") 

VAE_DECODER_OV_PATH = sd2_1_model_dir / "vae_decoder.xml" 

if not VAE_DECODER_OV_PATH.exists(): 
    convert_vae_decoder(vae, VAE_DECODER_OV_PATH, 96, 96) 
else: 
    print(f"VAE decoder will be loaded from {VAE_DECODER_OV_PATH}") 

del vae 
gc.collect();
VAE encoder will be loaded from sd2.1/vae_encoder.xml 
VAE decoder will be loaded from sd2.1/vae_decoder.xml

推論パイプラインの準備#

すべてをまとめた論理フローを図から、モデルが推論でどのように機能するかを詳しく見てみましょう。

text2img-stable-diffusion v2

text2img-stable-diffusion v2#

Stable diffusion モデルは、潜在シードとテキストプロンプトの両方を入力として受け取ります。次に、潜在シードを使用して、サイズ 96×96 のランダムな潜在画像表現を生成します。ここで、テキストプロンプトは、OpenCLIP のテキスト・エンコーダーを介してサイズ 77×1024 のテキスト埋め込みに変換されます。

次に、U-Net モデルは、テキスト埋め込みを条件として、ランダムな潜在画像表現を繰り返しノイズ除去します。U-Net の出力はノイズ残差であり、スケジューラー・アルゴリズムを介してノイズ除去された潜在画像表現を計算するために使用されます。この計算にはさまざまなスケジューラー・アルゴリズムを使用できますが、それぞれに長所と短所があります。Stable Diffusion の場合、次のいずれかを使用することを推奨します:

スケジューラーのアルゴリズム関数が動作する理論は、このノートブックの範囲外ですが、簡単に言うと、以前のノイズ表現と予測されたノイズ残差から、予測されたノイズ除去画像表現を計算することを覚えてください。詳細については、拡散ベースの生成モデルの設計空間の解明を参照することを推奨します。

上記のチャートはノートブックの Stable Diffusion V1 と非常によく似ていますが、細部に若干の違いがあります:

  • U-Net モデルの入力解像度を変更しました。

  • テキスト・エンコーダーが変更され、その結果、非表示状態の埋め込みのサイズも変更されました。

  • さらに、画像生成の品質を向上させるため、否定プロンプトを導入しました。技術的には、肯定プロンプトは拡散をそれに関連付けられた画像に向けて誘導し、否定プロンプトは拡散をそれから離れるように誘導します。言い換えれば、否定プロンプトは生成画像に対して望ましくない概念を宣言します。例えば、カラフルで明るい画像が必要な場合、グレースケール画像は避けたい結果になりますが、この場合、グレースケールは否定プロンプトとして扱うことができます。肯定プロンプトと否定プロンプトは同等です。どちらか一方だけを常に使用することも、もう一方なしで使用することもできます。動作の仕組みの詳細については、この記事を参照してください。

import inspect 
from typing import List, Optional, Union, Dict 

import PIL 
import cv2 
import torch 

from transformers import CLIPTokenizer 
from diffusers import DiffusionPipeline 
from diffusers.schedulers import DDIMScheduler, LMSDiscreteScheduler, PNDMScheduler 
    """ 
    Preprocessing helper function for calculating image size for resize with peserving original aspect ratio 
    and fitting image to specific window size 

    Parameters: 
        dst_width (int): destination window width 
        dst_height (int): destination window height 
        image_width (int): source image width 
        image_height (int): source image height 
    Returns: 
        result_width (int): calculated width for resize 
        result_height (int): calculated height for resize 
    """ 
    im_scale = min(dst_height / image_height, dst_width / image_width) 
    return int(im_scale * image_width), int(im_scale * image_height) 

def preprocess(image: PIL.Image.Image): 
    """ 
    Image preprocessing function.Takes image in PIL.Image format, resizes it to keep aspect ration and fits to model input window 512x512, 
    then converts it to np.ndarray and adds padding with zeros on right or bottom side of image (depends from aspect ratio), after that 
    converts data to float32 data type and change range of values from [0, 255] to [-1, 1], finally, converts data layout from planar NHWC to NCHW.The function returns preprocessed input tensor and padding size, which can be used in postprocessing.

    Parameters: 
        image (PIL.Image.Image): input image 
    Returns: 
        image (np.ndarray): preprocessed image tensor 
        meta (Dict): dictionary with preprocessing metadata info 
    """ 
    src_width, src_height = image.size 
    dst_width, dst_height = scale_fit_to_window(512, 512, src_width, src_height) 
    image = np.array(image.resize((dst_width, dst_height), resample=PIL.Image.Resampling.LANCZOS))[None, :] 
    pad_width = 512 - dst_width 
    pad_height = 512 - dst_height 
    pad = ((0, 0), (0, pad_height), (0, pad_width), (0, 0)) 
    image = np.pad(image, pad, mode="constant") 
    image = image.astype(np.float32) / 255.0 
    image = 2.0 * image - 1.0 image = image.transpose(0, 3, 1, 2) 
    return image, {"padding": pad, "src_width": src_width, "src_height": src_height} 

class OVStableDiffusionPipeline(DiffusionPipeline): 
    def __init__( 
        self, vae_decoder: ov.Model, 
        text_encoder: ov.Model, 
        tokenizer: CLIPTokenizer, 
        unet: ov.Model, 
        scheduler: Union[DDIMScheduler, PNDMScheduler, LMSDiscreteScheduler], 
        vae_encoder: ov.Model = None, 
    ): 
        """ 
        Pipeline for text-to-image generation using Stable Diffusion.
        Parameters: 
            vae_decoder (Model):
                Variational Auto-Encoder (VAE) Model to decode images to and from latent representations. 
            text_encoder (Model): 
                Frozen text-encoder.Stable Diffusion uses the text portion of 
                [CLIP](https://huggingface.co/docs/transformers/model_doc/clip#transformers.CLIPTextModel), specifically 
                the clip-vit-large-patch14(https://huggingface.co/openai/clip-vit-large-patch14) variant. 
            tokenizer (CLIPTokenizer):
                Tokenizer of class CLIPTokenizer(https://huggingface.co/docs/transformers/v4.21.0/en/model_doc/clip#transformers.CLIPTokenizer). 
            unet (Model):
                 Conditional U-Net architecture to denoise the encoded image latents. 
            vae_encoder (Model):
                Variational Auto-Encoder (VAE) Model to encode images to latent representation. 
            scheduler (SchedulerMixin): 
                A scheduler to be used in combination with unet to denoise the encoded image latents.Can be one of 
                DDIMScheduler, LMSDiscreteScheduler, or PNDMScheduler.
        """ 
        super().__init__() 
        self.scheduler = scheduler 
        self.vae_decoder = vae_decoder 
        self.vae_encoder = vae_encoder 
        self.text_encoder = text_encoder 
        self.unet = unet 
        self.register_to_config(unet=unet) 
        self._text_encoder_output = text_encoder.output(0) 
        self._unet_output = unet.output(0) 
        self._vae_d_output = vae_decoder.output(0) 
        self._vae_e_output = vae_encoder.output(0) if vae_encoder is not None else None 
        self.height = self.unet.input(0).shape[2] * 8 
        self.width = self.unet.input(0).shape[3] * 8 
        self.tokenizer = tokenizer 

    def __call__( 
        self, prompt: Union[str, List[str]], 
        image: PIL.Image.Image = None, 
        negative_prompt: Union[str, List[str]] = None, 
        num_inference_steps: Optional[int] = 50, 
        guidance_scale: Optional[float] = 7.5, 
        eta: Optional[float] = 0.0, 
        output_type: Optional[str] = "pil", 
        seed: Optional[int] = None, 
        strength: float = 1.0, 
    ): 
        """ 
        Function invoked when calling the pipeline for generation.
        Parameters: 
            prompt (str or List[str]):
                The prompt or prompts to guide the image generation. 
            image (PIL.Image.Image, *optional*, None):
                Intinal image for generation. 
            negative_prompt (str or List[str]):
                The negative prompt or prompts to guide the image generation. 
            num_inference_steps (int, *optional*, defaults to 50): 
                The number of denoising steps.More denoising steps usually lead to a higher quality image at the 
                expense of slower inference. 
            guidance_scale (float, *optional*, defaults to 7.5):
                Guidance scale as defined in Classifier-Free Diffusion Guidance(https://arxiv.org/abs/2207.12598). 
                guidance_scale is defined as `w` of equation 2.
                Higher guidance scale encourages to generate images that are closely linked to the text prompt, 
                usually at the expense of lower image quality. 
            eta (float, *optional*, defaults to 0.0): Corresponds to parameter eta (η) in the DDIM paper: https://arxiv.org/abs/2010.02502.Only applies to 
                [DDIMScheduler], will be ignored for others. 
            output_type (`str`, *optional*, defaults to "pil"): The output format of the generate image.Choose between 
            [PIL](https://pillow.readthedocs.io/en/stable/):
                PIL.Image.Image or np.array. 
            seed (int, *optional*, None):
                Seed for random generator state initialization. 
            strength (int, *optional*, 1.0): 
                strength between initial image and generated in Image-to-Image pipeline, do not used in Text-to-Image 
        Returns:
            Dictionary with keys: 
                sample - the last generated image PIL.Image.Image or np.array 
        """ 
        if seed is not None: 
            np.random.seed(seed) 
        # ここで、`guidance_scale` は、Imagen 論文の式 (2) のガイダンス重み `w` と 
        # 同様に定義されます: https://arxiv.org/pdf/2205.11487.pdf`guidance_scale = 1` 
        # 分類子のフリーガイダンスを行わないことに相当します 
        do_classifier_free_guidance = guidance_scale > 1.0 
        # プロンプトテキスト埋め込みを取得 
        text_embeddings = self._encode_prompt( 
            prompt, 
            do_classifier_free_guidance=do_classifier_free_guidance, 
            negative_prompt=negative_prompt, 
        ) 
        # タイムステップを設定 
        accepts_offset = "offset" in set(inspect.signature(self.scheduler.set_timesteps).parameters.keys()) 
        extra_set_kwargs = {} 
        if accepts_offset: 
            extra_set_kwargs["offset"] = 1 

        self.scheduler.set_timesteps(num_inference_steps, **extra_set_kwargs) 
        timesteps, num_inference_steps = self.get_timesteps(num_inference_steps, strength) 
        latent_timestep = timesteps[:1] 

        # ユーザーが指定しない限り、初期のランダムノイズを取得 
        latents, meta = self.prepare_latents(image, latent_timestep) 

        # すべてのスケジューラーが同じシグネチャーを持つわけではないので、スケジューラー・ステップに追加の kwargs を準備 
        # eta (η) は DDIMScheduler でのみ使用され、他のスケジューラーでは無視されます
        # eta は DDIM 論文の η に対応します: https://arxiv.org/abs/2010.02502 [0, 1] 
        # の範囲にある必要があります 
        accepts_eta = "eta" in set(inspect.signature(self.scheduler.step).parameters.keys()) 
        extra_step_kwargs = {} 
        if accepts_eta: 
            extra_step_kwargs["eta"] = eta 

        for t in self.progress_bar(timesteps):
            # 分類器のフリーガイダンスを行う場合は潜在変数を拡張 
            latent_model_input = np.concatenate([latents] * 2) if do_classifier_free_guidance else latents 
            latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) 

            # ノイズ残留を予測 
            noise_pred = self.unet([latent_model_input, np.array(t, dtype=np.float32), text_embeddings])[self._unet_output] 
            # ガイダンスを実行 
            if do_classifier_free_guidance: 
                noise_pred_uncond, noise_pred_text = noise_pred[0], noise_pred[1] 
                noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_text - noise_pred_uncond) 

            # 前のノイズサンプルを計算 x_t -> x_t-1 
            latents = self.scheduler.step(torch.from_numpy(noise_pred), t, torch.from_numpy(latents), **extra_step_kwargs)["prev_sample"].numpy() 
        # 画像の潜在変数を vae でスケールしてデコード 
        image = self.vae_decoder(latents * (1 / 0.18215))[self._vae_d_output] 

        image = self.postprocess_image(image, meta, output_type) 
        return {"sample": image} 

    def _encode_prompt( 
        self, 
        prompt: Union[str, List[str]], 
        num_images_per_prompt: int = 1, 
        do_classifier_free_guidance: bool = True, 
        negative_prompt: Union[str, List[str]] = None, 
    ): 
        """ 
        Encodes the prompt into text encoder hidden states.

        Parameters: 
            prompt (str or list(str)): prompt to be encoded 
            num_images_per_prompt (int): number of images that should be generated per prompt 
            do_classifier_free_guidance (bool): whether to use classifier free guidance or not 
            negative_prompt (str or list(str)): negative prompt to be encoded 
        Returns: 
            text_embeddings (np.ndarray): text encoder hidden states 
        """ 
        batch_size = len(prompt) if isinstance(prompt, list) else 1 

        # 入力プロンプトをトークン化 
        text_inputs = self.tokenizer( 
            prompt, 
            padding="max_length", 
            max_length=self.tokenizer.model_max_length, 
            truncation=True, 
            return_tensors="np", 
        ) 
        text_input_ids = text_inputs.input_ids 

        text_embeddings = self.text_encoder(text_input_ids)[self._text_encoder_output] 

        # プロンプトごとに各世代のテキスト埋め込みを複製 
        if num_images_per_prompt != 1: 
            bs_embed, seq_len, _ = text_embeddings.shape 
            text_embeddings = np.tile(text_embeddings, (1, num_images_per_prompt, 1)) 
            text_embeddings = np.reshape(text_embeddings, (bs_embed * num_images_per_prompt, seq_len, -1)) 

        # 分類器の無条件埋め込みを取得するフリーガイダンス 
        if do_classifier_free_guidance: 
            uncond_tokens: List[str] 
            max_length = text_input_ids.shape[-1] 
            if negative_prompt is None: 
                uncond_tokens = [""] * batch_size 
            elif isinstance(negative_prompt, str): 
                uncond_tokens = [negative_prompt] 
            else: 
                uncond_tokens = negative_prompt 
            uncond_input = self.tokenizer( 
                uncond_tokens, 
                padding="max_length", 
                max_length=max_length, 
                truncation=True, 
                return_tensors="np", 
            ) 

            uncond_embeddings = self.text_encoder(uncond_input.input_ids)[self.text_encoder_output] 

            # mps フレンドリーな方法を使用して、プロンプトごとに各世代の無条件埋め込みを複製 
            seq_len = uncond_embeddings.shape[1] 
            uncond_embeddings = np.tile(uncond_embeddings, (1, num_images_per_prompt, 1)) 
            uncond_embeddings = np.reshape(uncond_embeddings, (batch_size * num_images_per_prompt, seq_len, -1)) 

            # 分類器フリーのガイダンスでは、2 回のフォワードパスを実行する必要がある
            # ここでは、無条件埋め込みとテキスト埋め込みを 1 つのバッチに連結して、 
            # 2 回のフォワードパスを回避します 
            text_embeddings = np.concatenate([uncond_embeddings, text_embeddings]) 

        return text_embeddings 

    def prepare_latents(self, image: PIL.Image.Image = None, latent_timestep: torch.Tensor = None): 
        """ 
        Function for getting initial latents for starting generation 

        Parameters: 
            image (PIL.Image.Image, *optional*, None): Input image for generation, if not provided randon noise will be used as starting point 
            latent_timestep (torch.Tensor, *optional*, None): Predicted by scheduler initial step for image generation, required for latent image mixing with nosie 
        Returns: 
            latents (np.ndarray): Image encoded in latent space 
        """ 
        latents_shape = (1, 4, self.height // 8, self.width // 8) 
        noise = np.random.randn(*latents_shape).astype(np.float32) 
        if image is None:
            # LMSDiscreteScheduler を使用する場合は、潜在変数がシグマで乗算されていることを確認 
            if isinstance(self.scheduler, LMSDiscreteScheduler): 
                noise = noise * self.scheduler.sigmas[0].numpy() 
            return noise, {} 
        input_image, meta = preprocess(image) 
        latents = self.vae_encoder(input_image)[self._vae_e_output] 
        latents = latents * 0.18215 
        latents = self.scheduler.add_noise(torch.from_numpy(latents), torch.from_numpy(noise), latent_timestep).numpy() 
        return latents, meta 

    def postprocess_image(self, image: np.ndarray, meta: Dict, output_type: str = "pil"): 
        """ 
        Postprocessing for decoded image.Takes generated image decoded by VAE decoder, unpad it to initila image size (if required), 
        normalize and convert to [0, 255] pixels range.Optionally, convertes it from np.ndarray to PIL.Image format 

        Parameters: 
            image (np.ndarray): Generated image 
            meta (Dict): Metadata obtained on latents preparing step, can be empty 
            output_type (str, *optional*, pil): Output format for result, can be pil or numpy 
        Returns: 
            image (List of np.ndarray or PIL.Image.Image): Postprocessed images 
        """ 
        if "padding" in meta: 
            pad = meta["padding"] 
            (_, end_h), (_, end_w) = pad[1:3] 
            h, w = image.shape[2:] 
            unpad_h = h - end_h 
            unpad_w = w - end_w 
            image = image[:, :, :unpad_h, :unpad_w] 
        image = np.clip(image / 2 + 0.5, 0, 1) 
        image = np.transpose(image, (0, 2, 3, 1)) 
        # 9. PIL へ変換 
        if output_type == "pil": 
            image = self.numpy_to_pil(image) 
            if "src_height" in meta: 
                orig_height, orig_width = meta["src_height"], meta["src_width"] 
                image = [img.resize((orig_width, orig_height), PIL.Image.Resampling.LANCZOS) for img in image] 
        else: 
            if "src_height" in meta: 
                orig_height, orig_width = meta["src_height"], meta["src_width"] 
                image = [cv2.resize(img, (orig_width, orig_width)) for img in image] 
        return image 

    def get_timesteps(self, num_inference_steps: int, strength: float): 
        """ 
        Helper function for getting scheduler timesteps for generation 
        In case of image-to-image generation, it updates number of steps according to strength 

        Parameters: 
            num_inference_steps (int): 
                number of inference steps for generation 
            strength (float): 
                value between 0.0 and 1.0, that controls the amount of noise that is added to the input image.Values that approach 1.0 allow for lots of variations but will also produce images that are not semantically consistent with the input.
        """ 
        # init_timestep を使用して元のタイムステップを取得 
        init_timestep = min(int(num_inference_steps * strength), num_inference_steps) 

        t_start = max(num_inference_steps - init_timestep, 0) 
        timesteps = self.scheduler.timesteps[t_start:] 

        return timesteps, num_inference_steps - t_start

推論パイプラインの構成#

まず、OpenVINO モデルのインスタンスを作成する必要があります。

import ipywidgets as widgets 

core = ov.Core() 
device = widgets.Dropdown( 
    options=core.available_devices + ["AUTO"], 
    value="AUTO", 
    description="Device:", 
    disabled=False, 
) 

device
Dropdown(description='Device:', index=4, options=('CPU', 'GPU.0', 'GPU.1', 'GPU.2', 'AUTO'), value='AUTO')
ov_config = {"INFERENCE_PRECISION_HINT": "f32"} if device.value != "CPU" else {} 

text_enc = core.compile_model(TEXT_ENCODER_OV_PATH, device.value) 
unet_model = core.compile_model(UNET_OV_PATH, device.value) 
vae_decoder = core.compile_model(VAE_DECODER_OV_PATH, device.value, ov_config) 
vae_encoder = core.compile_model(VAE_ENCODER_OV_PATH, device.value, ov_config)

モデル・トークナイザーとスケジューラーもパイプラインの重要なパーツです。それらを定義して、すべてのコンポーネントをまとめてみましょう。

from transformers import CLIPTokenizer 

scheduler = DDIMScheduler.from_config(conf) # DDIMScheduler is used because UNet quantization produces better results with it 
tokenizer = CLIPTokenizer.from_pretrained("openai/clip-vit-large-patch14") 

ov_pipe = OVStableDiffusionPipeline( 
    tokenizer=tokenizer, 
    text_encoder=text_enc, 
    unet=unet_model, 
    vae_encoder=vae_encoder, 
    vae_decoder=vae_decoder, 
    scheduler=scheduler, 
)

量子化#

NNCF は、量子化レイヤーをモデルグラフに追加し、トレーニング・データセットのサブセットを使用してこれらの追加の量子化レイヤーのパラメーターを初期化することで、トレーニング後の量子化を可能にします。量子化操作は FP32/FP16 ではなく INT8 で実行されるため、モデル推論が高速化されます。

Stable Diffusion v2 構造によれば、UNet モデルはパイプライン全体の実行時間の大部分を占めます。ここでは、NNCFを使用して UNet 部分を最適化し、計算コストを削減してパイプラインを高速化する方法を説明します。パイプラインの残りのパーツを量子化しても、推論パフォーマンスは大幅に向上せず、精度が大幅に低下する可能性があります。

このモデルでは、ハイブリッド・モードで量子化を適用します。つまり、次の量子化を行います: (1) MatMul レイヤーと埋め込みレイヤーの重みと (2) 他のレイヤーの活性化。手順は次のとおりです:

  1. 量子化用のキャリブレーション・データセットを作成します。

  2. 重み付きの操作を収集します。

  3. nncf.compress_model() を実行して、モデルの重みだけを圧縮します。

  4. ignored_scope パラメーターを指定して重み付け操作を無視し、圧縮モデルで nncf.quantize() を実行します。

  5. openvino.save_model() 関数を使用して INT8 モデルを保存します。

モデルの推論速度を向上させるため量子化を実行するかどうかを以下で選択してください。

: 量子化は時間とメモリーを消費する操作です。以下の量子化コードの実行には時間がかかる場合があります。

to_quantize = widgets.Checkbox( 
    value=True, 
    description="Quantization", 
    disabled=False, 
) 

to_quantize
Checkbox(value=True, description='Quantization')
# `skip_kernel_extension` モジュールを取得 
import requests 

r = requests.get( 

url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py", 
) 
open("skip_kernel_extension.py", "w").write(r.text) 

int8_ov_pipe = None 

%load_ext skip_kernel_extension

キャリブレーション・データセットの準備#

Hugging Face の検証 conceptual_captions データセットの一部をキャリブレーション・データとして使用します。キャリブレーション用の中間モデル入力を収集するには、CompiledModel をカスタマイズする必要があります。

%%skip not $to_quantize.value 

import datasets 
import numpy as np 
from tqdm.notebook import tqdm 
from typing import Any, Dict, List 

def disable_progress_bar(pipeline, disable=True): 
    if not hasattr(pipeline, "_progress_bar_config"): 
        pipeline._progress_bar_config = {'disable': disable} 
    else: 
        pipeline._progress_bar_config['disable'] = disable 

class CompiledModelDecorator(ov.CompiledModel): 
    def __init__(self, compiled_model: ov.CompiledModel, data_cache: List[Any] = None, keep_prob: float = 0.5):   
        super().__init__(compiled_model) 
        self.data_cache = data_cache if data_cache is not None else [] 
        self.keep_prob = keep_prob 

    def __call__(self, *args, **kwargs): 
        if np.random.rand() <= self.keep_prob: 
            self.data_cache.append(*args) 
        return super().__call__(*args, **kwargs) 

def collect_calibration_data(ov_pipe, calibration_dataset_size: int, num_inference_steps: int) -> List[Dict]: 
    original_unet = ov_pipe.unet 
    calibration_data = [] 
    ov_pipe.unet = CompiledModelDecorator(original_unet, calibration_data, keep_prob=0.7) 
    disable_progress_bar(ov_pipe) 

    dataset = datasets.load_dataset("google-research-datasets/conceptual_captions", split="train", trust_remote_code=True).shuffle(seed=42) 

    # データ収集のため推論を実行 
    pbar = tqdm(total=calibration_dataset_size) 
    for batch in dataset: 
        prompt = batch["caption"] 
        if len(prompt) > ov_pipe.tokenizer.model_max_length: 
            continue 
        ov_pipe(prompt, num_inference_steps=num_inference_steps, seed=1) 
        pbar.update(len(calibration_data) - pbar.n) 
        if pbar.n >= calibration_dataset_size: 
            break 

    disable_progress_bar(ov_pipe, disable=False) 
    ov_pipe.unet = original_unet 
    return calibration_data

ハイブリッド・モデル量子化の実行#

%%skip not $to_quantize.value 

from collections import deque 
from transformers import set_seed 
import nncf 

def get_operation_const_op(operation, const_port_id: int): 
    node = operation.input_value(const_port_id).get_node() 
    queue = deque([node]) 
    constant_node = None 
    allowed_propagation_types_list = ["Convert", "FakeQuantize", "Reshape"] 

    while len(queue) != 0: 
        curr_node = queue.popleft() 
        if curr_node.get_type_name() == "Constant": 
            constant_node = curr_node 
            break 
        if len(curr_node.inputs()) == 0: 
            break 
        if curr_node.get_type_name() in allowed_propagation_types_list: 
            queue.append(curr_node.input_value(0).get_node()) 

    return constant_node 

def is_embedding(node) -> bool: 
    allowed_types_list = ["f16", "f32", "f64"] 
    const_port_id = 0 
    input_tensor = node.input_value(const_port_id) 
    if input_tensor.get_element_type().get_type_name() in allowed_types_list: 
        const_node = get_operation_const_op(node, const_port_id) 
        if const_node is not None: 
            return True 

        return False 

def collect_ops_with_weights(model): 
    ops_with_weights = [] 
    for op in model.get_ops(): 
        if op.get_type_name() == "MatMul": 
            constant_node_0 = get_operation_const_op(op, const_port_id=0) 
            constant_node_1 = get_operation_const_op(op, const_port_id=1) 
            if constant_node_0 or constant_node_1: 
                ops_with_weights.append(op.get_friendly_name()) 
        if op.get_type_name() == "Gather" and is_embedding(op): 
            ops_with_weights.append(op.get_friendly_name()) 

    return ops_with_weights 

UNET_INT8_OV_PATH = sd2_1_model_dir / 'unet_optimized.xml' 
if not UNET_INT8_OV_PATH.exists(): 
    calibration_dataset_size = 300 
    set_seed(1) 
    unet_calibration_data = collect_calibration_data(ov_pipe, calibration_dataset_size=calibration_dataset_size, num_inference_steps=50) 

    unet = core.read_model(UNET_OV_PATH) 

    # 重みが圧縮される操作を収集 
    unet_ignored_scope = collect_ops_with_weights(unet) 

    # モデルの重みを圧縮 
    compressed_unet = nncf.compress_weights(unet, ignored_scope=nncf.IgnoredScope(types=['Convolution'])) 

    # 畳み込み層の重みと活性化の両方を量子化 
    quantized_unet = nncf.quantize( 
        model=compressed_unet, 
        calibration_dataset=nncf.Dataset(unet_calibration_data), 
        subset_size=calibration_dataset_size, 
        model_type=nncf.ModelType.TRANSFORMER, 
        ignored_scope=nncf.IgnoredScope(names=unet_ignored_scope), 

advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alpha=-1) 
) 
ov.save_model(quantized_unet, UNET_INT8_OV_PATH)
INFO:nncf:NNCF initialized successfully.Supported frameworks detected: torch, onnx, openvino
%%skip not $to_quantize.value 

int8_unet_model = core.compile_model(UNET_INT8_OV_PATH, device.value) 
int8_ov_pipe = OVStableDiffusionPipeline( 
    tokenizer=tokenizer, 
    text_encoder=text_enc, 
    unet=int8_unet_model, 
    vae_encoder=vae_encoder, 
    vae_decoder=vae_decoder, 
    scheduler=scheduler 
)

UNet ファイルサイズを比較#

%%skip not $to_quantize.value 

fp16_ir_model_size = UNET_OV_PATH.with_suffix(".bin").stat().st_size / 1024 
quantized_model_size = UNET_INT8_OV_PATH.with_suffix(".bin").stat().st_size / 1024 

print(f"FP16 model size: {fp16_ir_model_size:.2f} KB") 
print(f"INT8 model size: {quantized_model_size:.2f} KB") 
print(f"Model compression rate: {fp16_ir_model_size / quantized_model_size:.3f}")
FP16 model size: 1691232.51 KB 
INT8 model size: 846918.58 KB 
Model compression rate: 1.997

FP16 モデルと INT8 パイプラインの推論時間を比較#

FP16 および INT8 パイプラインの推論パフォーマンスを測定するには、キャリブレーション・サブセットの推論時間の中央値を使用します。

: 最も正確なパフォーマンス推定を行うには、他のアプリケーションを閉じた後、ターミナル/コマンドプロンプトで benchmark_app を実行することを推奨します。

%%skip not $to_quantize.value 

import time 

def calculate_inference_time(pipeline, validation_data): 
    inference_time = [] 
    pipeline.set_progress_bar_config(disable=True) 
    for prompt in validation_data: 
        start = time.perf_counter() 
        _ = pipeline(prompt, num_inference_steps=10, seed=0) 
        end = time.perf_counter() 
        delta = end - start 
        inference_time.append(delta) 
    return np.median(inference_time)
%%skip not $to_quantize.value 

validation_size = 10 
validation_dataset = datasets.load_dataset("google-research-datasets/conceptual_captions", split="train", streaming=True, trust_remote_code=True).take(validation_size) 
validation_data = [batch["caption"] for batch in validation_dataset] 

fp_latency = calculate_inference_time(ov_pipe, validation_data) 
int8_latency = calculate_inference_time(int8_ov_pipe, validation_data) 
print(f"Performance speed-up: {fp_latency / int8_latency:.3f}")
/home/nsavel/venvs/ov_notebooks_tmp/lib/python3.8/site-packages/datasets/load.py:1429: FutureWarning: The repository for conceptual_captions contains custom code which must be executed to correctly load the dataset.You can inspect the repository content at https://hf.co/datasets/conceptual_captions 
You can avoid this message in future by passing the argument trust_remote_code=True.
Passing trust_remote_code=True will be mandatory to load this dataset from the next major release of datasets. 
  warnings.warn(
Performance speed-up: 1.232

Text-to-Image 生成を実行#

これで、画像生成用のテキストプロンプトを定義し、推論パイプラインを実行できるようになりました。オプションで、潜在状態の初期化とステップ数に対するランダム・ジェネレーターのシード値を変更することもできます。

: より正確な結果を得るため、steps を増やすことを検討してください。推奨値は 50 ですが、処理に時間がかかります。

インタラクティブなデモを起動するため量子化モデルを使用するかどうか以下で選択してください。

quantized_model_present = int8_ov_pipe.exists() 

use_quantized_model = widgets.Checkbox( 
    value=True if quantized_model_present else False, 
    description="Use quantized model", 
    disabled=False, 
) 

use_quantized_model
Checkbox(value=True, description='Use quantized model')
import gradio as gr 

pipeline = int8_ov_pipe if use_quantized_model.value else ov_pipe 

def generate(prompt, negative_prompt, seed, num_steps, _=gr.Progress(track_tqdm=True)): 
    result = pipeline( 
        prompt, 
        negative_prompt=negative_prompt, 
        num_inference_steps=num_steps, 
        seed=seed, 
    ) 
    return result["sample"][0] 

gr.close_all() 
demo = gr.Interface( 
    generate, 
    [ 
        gr.Textbox( 
            "valley in the Alps at sunset, epic vista, beautiful landscape, 4k, 8k", 
            label="Prompt", 
        ), 
        gr.Textbox( 
            "frames, borderline, text, charachter, duplicate, error, out of frame, watermark, low quality, ugly, deformed, blur", 
            label="Negative prompt", 
        ), 
        gr.Slider(value=42, label="Seed", maximum=10000000), 
        gr.Slider(value=25, label="Steps", minimum=1, maximum=50), 
    ], 
    "image", 
) 

try: 
    demo.queue().launch() 
except Exception: 
    demo.queue().launch(share=True)