Stable Video Diffusion による画像からのビデオ生成#

この Jupyter ノートブックは、ローカルへのインストール後にのみ起動できます。

GitHub

Stable Video Diffusion (SVD) の画像からビデオは、静止画像を条件フレームとして取り込み、そこからビデオを生成する拡散モデルです。このチュートリアルでは、OpenVINO を使用して Stable Video Diffusion を実行する方法を説明します。ここでは、stable-video-diffusion-img2video-xt モデルを使用します。さらに、ビデオ生成プロセスを高速化するため、AnimateLCM LoRA 重みを適用し、NNCF を使用して最適化を行います。

目次:

必要条件#

%pip install -q "torch>=2.1" "diffusers>=0.25" "peft==0.6.2" "transformers" "openvino>=2024.1.0" Pillow opencv-python tqdm "gradio>=4.19" safetensors --extra-index-url https://download.pytorch.org/whl/cpu 
%pip install -q datasets "nncf>=2.10.0"
Note: you may need to restart the kernel to use updated packages. 
Note: you may need to restart the kernel to use updated packages.

PyTorch モデルをダウンロード#

以下のコードは、Diffusers ライブラリーを使用して Stable Video Diffusion XT モデルを読み込み、Consistency Distilled AnimateLCM 重みを適用します。

import torch 
from pathlib import Path 
from diffusers import StableVideoDiffusionPipeline 
from diffusers.utils import load_image, export_to_video 
from diffusers.models.attention_processor import AttnProcessor 
from safetensors import safe_open 
import gc 
import requests 

lcm_scheduler_url = "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/raw/main/lcm_scheduler.py" 

r = requests.get(lcm_scheduler_url) 

with open("lcm_scheduler.py", "w") as f: 
    f.write(r.text) 

from lcm_scheduler import AnimateLCMSVDStochasticIterativeScheduler 
from huggingface_hub import hf_hub_download 

MODEL_DIR = Path("model") 

IMAGE_ENCODER_PATH = MODEL_DIR / "image_encoder.xml" 
VAE_ENCODER_PATH = MODEL_DIR / "vae_encoder.xml" 
VAE_DECODER_PATH = MODEL_DIR / "vae_decoder.xml" 
UNET_PATH = MODEL_DIR / "unet.xml" 

load_pt_pipeline = not (VAE_ENCODER_PATH.exists() and VAE_DECODER_PATH.exists() and UNET_PATH.exists() and IMAGE_ENCODER_PATH.exists()) 

unet, vae, image_encoder = None, None, None 
if load_pt_pipeline: 
    noise_scheduler = AnimateLCMSVDStochasticIterativeScheduler( 
        num_train_timesteps=40, 
        sigma_min=0.002, 
        sigma_max=700.0, 
        sigma_data=1.0, 
        s_noise=1.0, 
        rho=7, 
        clip_denoised=False, 
    ) 
    pipe = StableVideoDiffusionPipeline.from_pretrained( 
        "stabilityai/stable-video-diffusion-img2vid-xt", 
        variant="fp16", 
        scheduler=noise_scheduler, 
    ) 
    pipe.unet.set_attn_processor(AttnProcessor()) 
    hf_hub_download( 
        repo_id="wangfuyun/AnimateLCM-SVD-xt", 
        filename="AnimateLCM-SVD-xt.safetensors", 
        local_dir="./checkpoints", 
    ) 
    state_dict = {} 
    LCM_LORA_PATH = Path( 
        "checkpoints/AnimateLCM-SVD-xt.safetensors", 
    ) 
    with safe_open(LCM_LORA_PATH, framework="pt", device="cpu") as f: 
        for key in f.keys(): 
            state_dict[key] = f.get_tensor(key) 
    missing, unexpected = pipe.unet.load_state_dict(state_dict, strict=True) 

    pipe.scheduler.save_pretrained(MODEL_DIR / "scheduler") 
    pipe.feature_extractor.save_pretrained(MODEL_DIR / "feature_extractor") 
    unet = pipe.unet 
    unet.eval() 
    vae = pipe.vae 
    vae.eval() 
    image_encoder = pipe.image_encoder 
    image_encoder.eval() 
    del pipe 
    gc.collect() 

# コンディショニング画像を読み込み 
image = load_image("https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/svd/rocket.png?download=true") 
image = image.resize((512, 256))

モデルを OpenVINO 中間表現に変換#

OpenVINO は、中間表現 (IR) への変換により PyTorch モデルをサポートします。OpenVINO ov.Model オブジェクト・インスタンスを取得するには、モデル・オブジェクト、モデルトレース用の入力データを ov.convert_model 関数に提供する必要があります。ov.save_model 関数を使用して、次回のデプロイのためにモデルをディスクに保存できます。

Stable Video Diffusion は 3 つの部分で構成されます:

  • 入力画像から埋め込みを抽出する画像エンコーダー

  • 段階的にノイズを除去する潜像ビデオクリップの U-Net

  • 入力画像を潜在空間にエンコードし、生成されたビデオをデコードする VAE

各パーツを変換してみます。

画像エンコーダー#

import openvino as ov 

def cleanup_torchscript_cache(): 
    """ 
    Helper for removing cached model representation 
    """ 
    torch._C._jit_clear_class_registry() 
    torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore() 
    torch.jit._state._clear_class_state() 

if not IMAGE_ENCODER_PATH.exists(): 
    with torch.no_grad(): 
        ov_model = ov.convert_model( 
            image_encoder, 
            example_input=torch.zeros((1, 3, 224, 224)), 
            input=[-1, 3, 224, 224], 
        ) 
    ov.save_model(ov_model, IMAGE_ENCODER_PATH) 
    del ov_model 
    cleanup_torchscript_cache() 
    print(f"Image Encoder successfully converted to IR and saved to {IMAGE_ENCODER_PATH}") 
del image_encoder 
gc.collect();

U-net#

if not UNET_PATH.exists(): 
    unet_inputs = { 
        "sample": torch.ones([2, 2, 8, 32, 32]), 
        "timestep": torch.tensor(1.256), 
        "encoder_hidden_states": torch.zeros([2, 1, 1024]), 
        "added_time_ids": torch.ones([2, 3]), 
    } 
    with torch.no_grad(): 
        ov_model = ov.convert_model(unet, example_input=unet_inputs) 
    ov.save_model(ov_model, UNET_PATH) 
    del ov_model 
    cleanup_torchscript_cache() 
    print(f"UNet successfully converted to IR and saved to {UNET_PATH}") 

    del unet 
    gc.collect();

VAE エンコーダーとデコーダー#

前述したように、VAE モデルは初期画像のエンコードと生成されたビデオのデコードに使用されます。エンコードとデコードは異なるパイプライン・ステージで行われるため、使い勝手を考慮して VAE をエンコードとデコードの 2 つに分割します。

class VAEEncoderWrapper(torch.nn.Module): 
    def __init__(self, vae): 
        super().__init__() 
        self.vae = vae 

    def forward(self, image): 
        return self.vae.encode(x=image)["latent_dist"].sample() 

class VAEDecoderWrapper(torch.nn.Module): 
    def __init__(self, vae): 
        super().__init__() 
        self.vae = vae 

    def forward(self, latents, num_frames: int): 
        return self.vae.decode(latents, num_frames=num_frames) 

if not VAE_ENCODER_PATH.exists(): 
    vae_encoder = VAEEncoderWrapper(vae) 
    with torch.no_grad(): 
        ov_model = ov.convert_model(vae_encoder, example_input=torch.zeros((1, 3, 576, 1024))) 
    ov.save_model(ov_model, VAE_ENCODER_PATH) 
    cleanup_torchscript_cache() 
    print(f"VAE Encoder successfully converted to IR and saved to {VAE_ENCODER_PATH}") 
    del vae_encoder 
    gc.collect() 

if not VAE_DECODER_PATH.exists(): 
    vae_decoder = VAEDecoderWrapper(vae) 
    with torch.no_grad(): 
        ov_model = ov.convert_model(vae_decoder, example_input=(torch.zeros((8, 4, 72, 128)), torch.tensor(8))) 
    ov.save_model(ov_model, VAE_DECODER_PATH) 
    cleanup_torchscript_cache() 
    print(f"VAE Decoder successfully converted to IR and saved to {VAE_ENCODER_PATH}") 
    del vae_decoder 
    gc.collect() 

del vae 
gc.collect();

推論パイプラインの準備#

以下のコードは、OpenVINO を使用してビデオ生成を実行する OVStableVideoDiffusionPipeline クラスを実装します。パイプラインは入力画像を受け入れ、生成されたフレームのシーケンスを返します。下図は、簡略化されたパイプライン・ワークフローを表しています。

svd

svd#

このパイプラインは、Stable Diffusion 画像から画像生成パイプラインに非常に似ていますが、テキスト・エンコーダーの代わりに画像エンコーダーが使用される点が異なります。モデルは入力画像とランダムシードを初期プロンプトとして受け取ります。次に、画像は画像エンコーダーを使用して埋め込み空間にエンコードされ、VAE Encoder を使用して潜在空間にエンコードされ、U-Net モデルへの入力として渡されます。次に、U-Net モデルは、画像埋め込みを条件として、ランダムな潜在ビデオ表現を繰り返しノイズ除去します。U-Net の出力はノイズ残差であり、生成サイクルの次の反復のためのスケジューラー・アルゴリズムによりノイズ除去された潜在画像表現を計算するのに使用されます。このプロセスは指定された回数繰り返され、最後に、VAE デコーダーはノイズ除去された潜在データをビデオフレームのシーケンスに変換します。

from diffusers.pipelines.pipeline_utils import DiffusionPipeline 
import PIL.Image 
from diffusers.image_processor import VaeImageProcessor 
from diffusers.utils.torch_utils import randn_tensor 
from typing import Callable, Dict, List, Optional, Union 
from diffusers.pipelines.stable_video_diffusion import ( 
    StableVideoDiffusionPipelineOutput, 
) 

def _append_dims(x, target_dims): 
    """Appends dimensions to the end of a tensor until it has target_dims dimensions.""" 
    dims_to_append = target_dims - x.ndim 
    if dims_to_append < 0: 
        raise ValueError(f"input has {x.ndim} dims but target_dims is {target_dims}, which is less") 
    return x[(...,) + (None,) * dims_to_append] 

def tensor2vid(video: torch.Tensor, processor, output_type="np"): 
    # Based on:
    # https://github.com/modelscope/modelscope/blob/1509fdb973e5871f37148a4b5e5964cafd43e64d/modelscope/pipelines/multi_modal/text_to_video_synthesis_pipeline.py#L78 

    batch_size, channels, num_frames, height, width = video.shape 
    outputs = [] 
    for batch_idx in range(batch_size): 
        batch_vid = video[batch_idx].permute(1, 0, 2, 3) 
        batch_output = processor.postprocess(batch_vid, output_type) 

        outputs.append(batch_output) 

    return outputs 

class OVStableVideoDiffusionPipeline(DiffusionPipeline): 
    r""" 
    Pipeline to generate video from an input image using Stable Video Diffusion.
    This model inherits from [`DiffusionPipeline`].Check the superclass documentation for the generic methods 
    implemented for all pipelines (downloading, saving, running on a particular device, etc.).
     Args: 
        vae ([`AutoencoderKL`]):
            Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations. 
        image_encoder ([`~transformers.CLIPVisionModelWithProjection`]):
            Frozen CLIP image-encoder ([laion/CLIP-ViT-H-14-laion2B-s32B-b79K](https://huggingface.co/laion/CLIP-ViT-H-14-laion2B-s32B-b79K)). 
        unet ([`UNetSpatioTemporalConditionModel`]):
            A `UNetSpatioTemporalConditionModel` to denoise the encoded image latents. 
        scheduler ([`EulerDiscreteScheduler`]):
            A scheduler to be used in combination with `unet` to denoise the encoded image latents. 
        feature_extractor ([`~transformers.CLIPImageProcessor`]): 
            A `CLIPImageProcessor` to extract features from generated images.
     """ 

    def __init__( 
        self, 
        vae_encoder, 
        image_encoder, 
        unet, 
        vae_decoder, 
        scheduler, 
        feature_extractor, 
    ): 
        super().__init__() 
        self.vae_encoder = vae_encoder 
        self.vae_decoder = vae_decoder 
        self.image_encoder = image_encoder 
        self.register_to_config(unet=unet) 
        self.scheduler = scheduler 
        self.feature_extractor = feature_extractor 
        self.vae_scale_factor = 2 ** (4 - 1) 
        self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor) 

    def _encode_image(self, image, device, num_videos_per_prompt, do_classifier_free_guidance): 
        dtype = torch.float32 

        if not isinstance(image, torch.Tensor): 
            image = self.image_processor.pil_to_numpy(image) 
            image = self.image_processor.numpy_to_pt(image) 

            # 元の実装に合わせてサイズを変更する前に画像を正規化
            # 次に、サイズを変更した後に非正規化 
            image = image * 2.0 - 1.0 
            image = _resize_with_antialiasing(image, (224, 224)) 
            image = (image + 1.0) / 2.0 

            # CLIP 入力用に画像を正規化 
            image = self.feature_extractor( 
                images=image, 
                do_normalize=True, 
                do_center_crop=False, 
                do_resize=False, 
                do_rescale=False, 
                return_tensors="pt", 
            ).pixel_values 

        image = image.to(device=device, dtype=dtype) 
        image_embeddings = torch.from_numpy(self.image_encoder(image)[0]) 
        image_embeddings = image_embeddings.unsqueeze(1) 

        # mps フレンドリーな方法を使用して、プロンプトごとに各世代の画像埋め込みを複製 
        bs_embed, seq_len, _ = image_embeddings.shape 
        image_embeddings = image_embeddings.repeat(1, num_videos_per_prompt, 1) 
        image_embeddings = image_embeddings.view(bs_embed * num_videos_per_prompt, seq_len, -1) 

        if do_classifier_free_guidance: 
            negative_image_embeddings = torch.zeros_like(image_embeddings) 

            # 分類器のフリーガイダンスでは、2 回のフォワードパスを実行する必要があります。
            # ここでは、無条件埋め込みとテキスト埋め込みを 1 つのバッチに連結して、 
            # 2 回のフォワードパスを回避します 
            image_embeddings = torch.cat([negative_image_embeddings, image_embeddings]) 
        return image_embeddings 

    def _encode_vae_image( 
        self, 
        image: torch.Tensor, 
        device, 
        num_videos_per_prompt, 
        do_classifier_free_guidance, 
    ): 
        image_latents = torch.from_numpy(self.vae_encoder(image)[0]) 

        if do_classifier_free_guidance: 
            negative_image_latents = torch.zeros_like(image_latents) 

            # 分類器のフリーガイダンスでは、2 回のフォワードパスが必要です
            # ここでは、無条件埋め込みとテキスト埋め込みを 1 つのバッチに連結して、 
            # 2 回のフォワードパスを回避 
            image_latents = torch.cat([negative_image_latents, image_latents]) 

        # mps フレンドリーな方法を使用して、プロンプトごとに各世代の image_latents を複製 
        image_latents = image_latents.repeat(num_videos_per_prompt, 1, 1, 1) 

        return image_latents 

    def _get_add_time_ids( 
        self, 
        fps, 
        motion_bucket_id, 
        noise_aug_strength, 
        dtype, 
        batch_size, 
        num_videos_per_prompt, 
        do_classifier_free_guidance, 
    ): 
        add_time_ids = [fps, motion_bucket_id, noise_aug_strength] 

        passed_add_embed_dim = 256 * len(add_time_ids) 
        expected_add_embed_dim = 3 * 256 

        if expected_add_embed_dim != passed_add_embed_dim: 
            raise ValueError( 
                f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`."             ) 

        add_time_ids = torch.tensor([add_time_ids], dtype=dtype) 
        add_time_ids = add_time_ids.repeat(batch_size * num_videos_per_prompt, 1) 

        if do_classifier_free_guidance: 
            add_time_ids = torch.cat([add_time_ids, add_time_ids]) 

        return add_time_ids 

    def decode_latents(self, latents, num_frames, decode_chunk_size=14):
        # [batch, frames, channels, height, width] -> [batch*frames, channels, height, width] 
        latents = latents.flatten(0, 1) 

        latents = 1 / 0.18215 * latents 

        # OOM を回避するため、decode_chunk_size フレームを一度にデコード 
        frames = [] 
        for i in range(0, latents.shape[0], decode_chunk_size): 
            frame = torch.from_numpy(self.vae_decoder([latents[i : i + decode_chunk_size], num_frames])[0]) 
            frames.append(frame) 
        frames = torch.cat(frames, dim=0) 

        # [batch*frames, channels, height, width] -> [batch, channels, frames, height, width] 
        frames = frames.reshape(-1, num_frames, *frames.shape[1:]).permute(0, 2, 1, 3, 4) 

        # 常に float32 にキャストします。これは大きなオーバーヘッドを発生せず、bfloat16 と互換性があります 
        frames = frames.float() 
        return frames 

    def check_inputs(self, image, height, width): 
        if not isinstance(image, torch.Tensor) and not isinstance(image, PIL.Image.Image) and not isinstance(image, list): 
            raise ValueError("`image` has to be of type `torch.FloatTensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is" f" {type(image)}") 

        if height % 8 != 0 or width % 8 != 0: 
            raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.") 

    def prepare_latents( 
        self, 
        batch_size, 
        num_frames, 
        num_channels_latents, 
        height, 
        width, 
        dtype, 
        device, 
        generator, 
        latents=None, 
    ): 
        shape = ( 
            batch_size, 
            num_frames, 
            num_channels_latents // 2, 
            height // self.vae_scale_factor, 
            width // self.vae_scale_factor, 
        ) 
        if isinstance(generator, list) and len(generator) != batch_size: 
            raise ValueError( 
                f"You have passed a list of generators of length {len(generator)}, but requested an effective batch" 
                f" size of {batch_size}.Make sure the batch size matches the length of the generators."             ) 

        if latents is None: 
            latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype) 
        else: 
            latents = latents.to(device) 

        # スケジューラーが要求する標準偏差で初期ノイズをスケール 
        latents = latents * self.scheduler.init_noise_sigma 
        return latents 

    @torch.no_grad() 
    def __call__( 
        self, 
        image: Union[PIL.Image.Image, List[PIL.Image.Image], torch.FloatTensor], 
        height: int = 320, 
        width: int = 512, 
        num_frames: Optional[int] = 8, 
        num_inference_steps: int = 4, 
        min_guidance_scale: float = 1.0, 
        max_guidance_scale: float = 1.2, 
        fps: int = 7, 
        motion_bucket_id: int = 80, 
        noise_aug_strength: int = 0.01, 
        decode_chunk_size: Optional[int] = None, 
        num_videos_per_prompt: Optional[int] = 1, 
        generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None, 
        latents: Optional[torch.FloatTensor] = None, 
        output_type: Optional[str] = "pil", 
        callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None, 
        callback_on_step_end_tensor_inputs: List[str] = ["latents"], 
        return_dict: bool = True, 
    ): 
        r""" 
        The call function to the pipeline for generation.

        Args:discussed 
            image (`PIL.Image.Image` or `List[PIL.Image.Image]` or `torch.FloatTensor`): 
                Image or images to guide image generation.If you provide a tensor, it needs to be compatible with 
                [`CLIPImageProcessor`](https://huggingface.co/lambdalabs/sd-image-variations-diffusers/blob/main/feature_extractor/preprocessor_config.json). 
            height (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`):
                The height in pixels of the generated image. 
            width (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`):
                The width in pixels of the generated image. 
            num_frames (`int`, *optional*): 
                The number of video frames to generate.Defaults to 14 for `stable-video-diffusion-img2vid` and to 25 for `stable-video-diffusion-img2vid-xt` 
            num_inference_steps (`int`, *optional*, defaults to 25): 
                The number of denoising steps.More denoising steps usually lead to a higher quality image at the 
                expense of slower inference.This parameter is modulated by `strength`. 
            min_guidance_scale (`float`, *optional*, defaults to 1.0): 
                The minimum guidance scale.Used for the classifier free guidance with first frame. 
            max_guidance_scale (`float`, *optional*, defaults to 3.0): 
                The maximum guidance scale.Used for the classifier free guidance with last frame. 
            fps (`int`, *optional*, defaults to 7): 
                Frames per second. The rate at which the generated images shall be exported to a video after generation.
                Note that Stable Diffusion Video's UNet was micro-conditioned on fps-1 during training. 
            motion_bucket_id (`int`, *optional*, defaults to 127): 
                The motion bucket ID. Used as conditioning for the generation. The higher the number the more motion will be in the video. 
            noise_aug_strength (`int`, *optional*, defaults to 0.02): 
                The amount of noise added to the init image, the higher it is the less the video will look like the init image.
                Increase it for more motion. 
            decode_chunk_size (`int`, *optional*): 
                The number of frames to decode at a time.The higher the chunk size, the higher the temporal consistency 
                between frames, but also the higher the memory consumption.By default, the decoder will decode all frames at once 
                for maximal quality.Reduce `decode_chunk_size` to reduce memory usage. 
            num_videos_per_prompt (`int`, *optional*, defaults to 1):
                The number of images to generate per prompt. 
            generator (`torch.Generator` or `List[torch.Generator]`, *optional*): 
                A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make 
                generation deterministic. 
            latents (`torch.FloatTensor`, *optional*):
                Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image 
                generation.Can be used to tweak the same generation with different prompts.If not provided, a latents 
                tensor is generated by sampling using the supplied random `generator`. 
            output_type (`str`, *optional*, defaults to `"pil"`): 
                 The output format of the generated image.Choose between `PIL.Image` or `np.array`. 
            callback_on_step_end (`Callable`, *optional*): 
                A function that calls at the end of each denoising steps during the inference.The function is called 
            with the following arguments:
                `callback_on_step_end(self: DiffusionPipeline, step: int, timestep: int, 
            callback_kwargs: Dict)`. `callback_kwargs` will include a list of all tensors as specified by 
                `callback_on_step_end_tensor_inputs`. 
            callback_on_step_end_tensor_inputs (`List`, *optional*): 
                The list of tensor inputs for the `callback_on_step_end` function.The tensors specified in the list 
                will be passed as `callback_kwargs` argument.You will only be able to include variables listed in the 
                `._callback_tensor_inputs` attribute of your pipeline class. 
            return_dict (`bool`, *optional*, defaults to `True`):
                 Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a 
                plain tuple.
 
        Returns: 
                [`~pipelines.stable_diffusion.StableVideoDiffusionPipelineOutput`] or `tuple`:
                    If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableVideoDiffusionPipelineOutput`] is returned, 
                otherwise a `tuple` is returned where the first element is a list of list with the generated frames.
 
        Examples:

        ```py 
        from diffusers import StableVideoDiffusionPipeline 
        from diffusers.utils import load_image, export_to_video 

        pipe = StableVideoDiffusionPipeline.from_pretrained("stabilityai/stable-video-diffusion-img2vid-xt", torch_dtype=torch.float16, variant="fp16") 
        pipe.to("cuda") 

        image = load_image("https://lh3.googleusercontent.com/y-iFOHfLTwkuQSUegpwDdgKmOjRSTvPxat63dQLB25xkTs4lhIbRUFeNBWZzYf370g=s1200") 
        image = image.resize((1024, 576)) 

        frames = pipe(image, num_frames=25, decode_chunk_size=8).frames[0] 
        export_to_video(frames, "generated.mp4", fps=7) 
        ``` 
        """ 
        # 0. Unet のデフォルトの高さと幅 
        height = height or 96 * self.vae_scale_factor 
        width = width or 96 * self.vae_scale_factor 

        num_frames = num_frames if num_frames is not None else 25 
        decode_chunk_size = decode_chunk_size if decode_chunk_size is not None else num_frames 

        # 1. 入力を確認。正しくない場合はエラーを発生 
        self.check_inputs(image, height, width) 

        # 2. 呼び出しパラメーターを定義 
        if isinstance(image, PIL.Image.Image): 
            batch_size = 1 
        elif isinstance(image, list): 
            batch_size = len(image) 
        else: 
            batch_size = image.shape[0] 
        device = torch.device("cpu") 

        # ここで、`guidance_scale` は、Imagen 論文 (https://arxiv.org/pdf/2205.11487.pdf)  
        # の式 (2) のガイダンス重み`w` と同様に定義されます。`guidance_scale = 1` 
        # は、分類器フリーのガイダンスを行わないことに相当 
        do_classifier_free_guidance = max_guidance_scale > 1.0
 
        # 3. 入力画像をエンコード 
        image_embeddings = self._encode_image(image, device, num_videos_per_prompt, do_classifier_free_guidance) 

        # 注: Stable Diffusion Video は fps - 1 で条件付けされていたため、 
        # ここでは削減されています
        # https://github.com/Stability-AI/generative-models/blob/ed0997173f98eaf8f4edf7ba5fe8f15c6b877fd3/scripts/sampling/simple_video_sample.py#L188 を参照 
        fps = fps - 1 

        # 4. VAE を使用して入力画像をエンコード 
        image = self.image_processor.preprocess(image, height=height, width=width) 
        noise = randn_tensor(image.shape, generator=generator, device=image.device, dtype=image.dtype) 
        image = image + noise_aug_strength * noise 

        image_latents = self._encode_vae_image(image, device, num_videos_per_prompt, do_classifier_free_guidance) 
        image_latents = image_latents.to(image_embeddings.dtype) 

        # 各フレームの画像潜在変数を繰り返してノイズと連結 
        # image_latents [batch, channels, height, width] ->[batch, num_frames, channels, height, width] 
        image_latents = image_latents.unsqueeze(1).repeat(1, num_frames, 1, 1, 1) 

        # 5. 追加された時間 ID を取得 
        added_time_ids = self._get_add_time_ids( 
            fps, 
            motion_bucket_id, 
            noise_aug_strength, 
            image_embeddings.dtype, 
            batch_size, 
            num_videos_per_prompt, 
            do_classifier_free_guidance, 
        ) 
        added_time_ids = added_time_ids 

        # 4. タイムステップを準備 
        self.scheduler.set_timesteps(num_inference_steps, device=device) 
        timesteps = self.scheduler.timesteps 

        # 5. 潜在変数を準備 
        num_channels_latents = 8 
        latents = self.prepare_latents( 
            batch_size * num_videos_per_prompt, 
            num_frames, 
            num_channels_latents, 
            height, 
            width, 
            image_embeddings.dtype, 
            device, 
            generator, 
            latents, 
        ) 

        # 7. ガイダンススケールを準備 
        guidance_scale = torch.linspace(min_guidance_scale, max_guidance_scale, num_frames).unsqueeze(0) 
        guidance_scale = guidance_scale.to(device, latents.dtype) 
        guidance_scale = guidance_scale.repeat(batch_size * num_videos_per_prompt, 1) 
        guidance_scale = _append_dims(guidance_scale, latents.ndim) 

        # 8. ノイズ除去ループ 
        num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order 
        num_timesteps = len(timesteps) 
        with self.progress_bar(total=num_inference_steps) as progress_bar: 
            for i, t in enumerate(timesteps):
                # 分類器のフリーガイダンスを行う場合は潜在変数を拡張 
                latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents 
                latent_model_input = self.scheduler.scale_model_input(latent_model_input, t) 

                # チャネル次元にわたって画像潜在変数を連結 
                latent_model_input = torch.cat([latent_model_input, image_latents], dim=2) 
                # ノイズ残留を予測 
                noise_pred = torch.from_numpy( 
                    self.unet( 
                        [ 
                            latent_model_input, 
                            t, 
                            image_embeddings, 
                            added_time_ids, 
                        ] 
                    )[0] 
                )
                # ガイダンスを実行 
                if do_classifier_free_guidance: 
                    noise_pred_uncond, noise_pred_cond = noise_pred.chunk(2) 
                    noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_cond - noise_pred_uncond) 

                # 前のノイズサンプルを計算 x_t -> x_t-1 
                latents = self.scheduler.step(noise_pred, t, latents).prev_sample 

                if callback_on_step_end is not None: 
                    callback_kwargs = {} 
                    for k in callback_on_step_end_tensor_inputs: 
                        callback_kwargs[k] = locals()[k] 
                    callback_outputs = callback_on_step_end(self, i, t, callback_kwargs) 

                    latents = callback_outputs.pop("latents", latents) 

                if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0): 
                    progress_bar.update() 

        if not output_type == "latent": 
            frames = self.decode_latents(latents, num_frames, decode_chunk_size) 
            frames = tensor2vid(frames, self.image_processor, output_type=output_type) 
        else: 
            frames = latents 

        if not return_dict: 
            return frames 

        return StableVideoDiffusionPipelineOutput(frames=frames) 

# サイズ変更ユーティリティー 
def _resize_with_antialiasing(input, size, interpolation="bicubic", align_corners=True): 
    h, w = input.shape[-2:] 
    factors = (h / size[0], w / size[1])
 
    # 最初に、シグマを決定します 
    # skimage から取得: https://github.com/scikit-image/scikit-image/blob/v0.19.2/skimage/transform/_warps.py#L171 
    sigmas = ( 
        max((factors[0] - 1.0) / 2.0, 0.001), 
        max((factors[1] - 1.0) / 2.0, 0.001), 
    ) 
    # 次はカーネルサイズ。3 シグマでは良い結果が得られますが少し低速です。Pillow は 1 シグマを使用します 
    # https://github.com/python-pillow/Pillow/blob/master/src/libImaging/Resample.c#L206 
    # しかし、2 回のパスで実行することで、より良い結果が得られます。とりあえず 2 シグマを試します 
    ks = int(max(2.0 * 2 * sigmas[0], 3)), int(max(2.0 * 2 * sigmas[1], 3)) 

    # 奇数であることを確認 
    if (ks[0] % 2) == 0: 
        ks = ks[0] + 1, ks[1] 

    if (ks[1] % 2) == 0: 
        ks = ks[0], ks[1] + 1 

    input = _gaussian_blur2d(input, ks, sigmas) 

    output = torch.nn.functional.interpolate(input, size=size, mode=interpolation, align_corners=align_corners) 
    return output 

def _compute_padding(kernel_size): 
    """Compute padding tuple.""" 
    # 4 または 6 ints: (padding_left, padding_right,padding_top,padding_bottom) 
    # https://pytorch.org/docs/stable/nn.html#torch.nn.functional.pad 
    if len(kernel_size) < 2: 
        raise AssertionError(kernel_size) 
    computed = [k - 1 for k in kernel_size] 

    # 偶数カーネルの場合は非対称パディングを行う必要があります :( 
    out_padding = 2 * len(kernel_size) * [0] 

    for i in range(len(kernel_size)): 
        computed_tmp = computed[-(i + 1)] 

        pad_front = computed_tmp // 2 
        pad_rear = computed_tmp - pad_front 

        out_padding[2 * i + 0] = pad_front 
        out_padding[2 * i + 1] = pad_rear 

    return out_padding 

def _filter2d(input, kernel):
    # カーネルを準備 
    b, c, h, w = input.shape 
    tmp_kernel = kernel[:, None, ...].to(device=input.device, dtype=input.dtype) 

    tmp_kernel = tmp_kernel.expand(-1, c, -1, -1) 

    height, width = tmp_kernel.shape[-2:] 

    padding_shape: list[int] = _compute_padding([height, width]) 
    input = torch.nn.functional.pad(input, padding_shape, mode="reflect") 

    # カーネルと入力テンソルを要素ごとまたはバッチごとにパラメーターを揃えるために再形成 
    tmp_kernel = tmp_kernel.reshape(-1, 1, height, width) 
    input = input.view(-1, tmp_kernel.size(0), input.size(-2), input.size(-1)) 

    # テンソルをカーネルと畳み込み 
    output = torch.nn.functional.conv2d(input, tmp_kernel, groups=tmp_kernel.size(0), padding=0, stride=1) 

    out = output.view(b, c, h, w) 
    return out 

def _gaussian(window_size: int, sigma): 
    if isinstance(sigma, float): 
        sigma = torch.tensor([[sigma]]) 

    batch_size = sigma.shape[0] 

    x = (torch.arange(window_size, device=sigma.device, dtype=sigma.dtype) - window_size // 2).expand(batch_size, -1) 

    if window_size % 2 == 0: 
        x = x + 0.5 

    gauss = torch.exp(-x.pow(2.0) / (2 * sigma.pow(2.0))) 

    return gauss / gauss.sum(-1, keepdim=True) 

def _gaussian_blur2d(input, kernel_size, sigma): 
    if isinstance(sigma, tuple): 
        sigma = torch.tensor([sigma], dtype=input.dtype) 
    else: 
        sigma = sigma.to(dtype=input.dtype) 

    ky, kx = int(kernel_size[0]), int(kernel_size[1]) 
    bs = sigma.shape[0] 
    kernel_x = _gaussian(kx, sigma[:, 1].view(bs, 1)) 
    kernel_y = _gaussian(ky, sigma[:, 0].view(bs, 1)) 
    out_x = _filter2d(input, kernel_x[..., None, :]) 
    out = _filter2d(out_x, kernel_y[..., None]) 

    return out

ビデオ生成を実行#

推論デバイスの選択#

import ipywidgets as widgets 

core = ov.Core() 
device = widgets.Dropdown( 
    options=core.available_devices + ["AUTO"], 
    value="AUTO", 
    description="Device:", 
    disabled=False, 
) 

device
Dropdown(description='Device:', index=4, options=('CPU', 'GPU.0', 'GPU.1', 'GPU.2', 'AUTO'), value='AUTO')
from transformers import CLIPImageProcessor 

vae_encoder = core.compile_model(VAE_ENCODER_PATH, device.value) 
image_encoder = core.compile_model(IMAGE_ENCODER_PATH, device.value) 
unet = core.compile_model(UNET_PATH, device.value) 
vae_decoder = core.compile_model(VAE_DECODER_PATH, device.value) 
scheduler = AnimateLCMSVDStochasticIterativeScheduler.from_pretrained(MODEL_DIR / "scheduler") 
feature_extractor = CLIPImageProcessor.from_pretrained(MODEL_DIR / "feature_extractor")

実際のモデルを見てみます。ビデオ生成はメモリーと時間のかかるプロセスであることに注意してください。メモリー消費量を削減するため、入力ビデオの解像度を 576x320 に減らし、生成されるビデオの品質に影響を与える可能性のある生成フレームの数を減らしました。パイプラインに heightwidthnum_frames パラメーターを指定して、これらの設定を手動で変更できます。

ov_pipe = OVStableVideoDiffusionPipeline(vae_encoder, image_encoder, unet, vae_decoder, scheduler, feature_extractor)
frames = ov_pipe( 
    image, 
    num_inference_steps=4, 
    motion_bucket_id=60, 
    num_frames=8, 
    height=320, 
    width=512, 
    generator=torch.manual_seed(12342), 
).frames[0]
0%|          | 0/4 [00:00<?, ?it/s]
denoise currently 
tensor(128.5637) 
denoise currently 
tensor(13.6784) 
denoise currently 
tensor(0.4969) 
denoise currently 
tensor(0.)
out_path = Path("generated.mp4") 

export_to_video(frames, str(out_path), fps=7) 
frames[0].save( 
    "generated.gif", 
    save_all=True, 
    append_images=frames[1:], 
    optimize=False, 
    duration=120, 
    loop=0, 
)
from IPython.display import HTML 

HTML('<img src="generated.gif">')

量子化#

NNCF は、量子化レイヤーをモデルグラフに追加し、トレーニング・データセットのサブセットを使用してこれらの追加の量子化レイヤーのパラメーターを初期化することで、トレーニング後の量子化を可能にします。量子化操作は FP32/FP16 ではなく INT8 で実行されるため、モデル推論が高速化されます。

OVStableVideoDiffusionPipeline 構造によれば、拡散モデルはパイプライン全体の実行時間の大部分を占めます。ここでは、NNCFを使用して UNet 部分を最適化し、計算コストを削減してパイプラインを高速化する方法を説明します。パイプラインの残りのパーツを量子化しても、推論パフォーマンスは大幅に向上せず、精度が大幅に低下する可能性があります。メモリー・フットプリントを削減するため、vae encodervae decoder には重み圧縮のみを使用します。

Unet モデルでは、ハイブリッド・モードで量子化を適用します。つまり、次の量子化を行います: (1) MatMul レイヤーと埋め込みレイヤーの重みと (2) 他のレイヤーの活性化。手順は次のとおりです:

  1. 量子化用のキャリブレーション・データセットを作成します。

  2. 重み付きの操作を収集します。

  3. nncf.compress_model() を実行して、モデルの重みだけを圧縮します。

  4. ignored_scope パラメーターを指定して重み付け操作を無視し、圧縮モデルで nncf.quantize() を実行します。

  5. openvino.save_model() 関数を使用して INT8 モデルを保存します。

モデルの推論速度を向上させるため量子化を実行するかどうかを以下で選択してください。

: 量子化は時間とメモリーを消費する操作です。以下の量子化コードの実行には時間がかかる場合があります。

to_quantize = widgets.Checkbox( 
    value=True, 
    description="Quantization", 
    disabled=False, 
) 

to_quantize
Checkbox(value=True, description='Quantization')
# `skip_kernel_extension` モジュールを取得 
import requests 

r = requests.get( 

url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py", 
) 
open("skip_kernel_extension.py", "w").write(r.text) 

ov_int8_pipeline = None 
OV_INT8_UNET_PATH = MODEL_DIR / "unet_int8.xml" 
OV_INT8_VAE_ENCODER_PATH = MODEL_DIR / "vae_encoder_int8.xml" 
OV_INT8_VAE_DECODER_PATH = MODEL_DIR / "vae_decoder_int8.xml" 

%load_ext skip_kernel_extension

キャリブレーション・データセットの準備#

Hugging Face の fusing/instructpix2pix-1000-samples データセットの一部をキャリブレーション・データとして使用します。UNet 最適化用の中間モデル入力を収集するには、CompiledModel をカスタマイズする必要があります。

%%skip not $to_quantize.value 

from typing import Any 

import datasets 
import numpy as np 
from tqdm.notebook import tqdm 
from IPython.utils import io 

class CompiledModelDecorator(ov.CompiledModel): 
    def __init__(self, compiled_model: ov.CompiledModel, data_cache: List[Any] = None, keep_prob: float = 0.5): 
        super().__init__(compiled_model) 
        self.data_cache = data_cache if data_cache is not None else [] 
        self.keep_prob = keep_prob 

    def __call__(self, *args, **kwargs): 
        if np.random.rand() <= self.keep_prob: 
            self.data_cache.append(*args) 
        return super().__call__(*args, **kwargs) 

def collect_calibration_data(ov_pipe, calibration_dataset_size: int, num_inference_steps: int = 50) -> List[Dict]: 
    original_unet = ov_pipe.unet 
    calibration_data = [] 
    ov_pipe.unet = CompiledModelDecorator(original_unet, calibration_data, keep_prob=1) 

    dataset = datasets.load_dataset("fusing/instructpix2pix-1000-samples", split="train", streaming=False).shuffle(seed=42) 
    # Run inference for data collection 
    pbar = tqdm(total=calibration_dataset_size) 
    for batch in dataset: 
        image = batch["input_image"] 

        with io.capture_output() as captured: 
            ov_pipe( 
                image, 
                num_inference_steps=4, 
                motion_bucket_id=60, 
                num_frames=8, 
                height=256, 
                width=256, 
                generator=torch.manual_seed(12342), 
            ) 
        pbar.update(len(calibration_data) - pbar.n) 
        if len(calibration_data) >= calibration_dataset_size: 
            break 

    ov_pipe.unet = original_unet 
    return calibration_data[:calibration_dataset_size]
%%skip not $to_quantize.value 

if not OV_INT8_UNET_PATH.exists(): 
    subset_size = 200 
    calibration_data = collect_calibration_data(ov_pipe, calibration_dataset_size=subset_size)

ハイブリッド・モデル量子化の実行#

%%skip not $to_quantize.value 

from collections import deque 

def get_operation_const_op(operation, const_port_id: int): 
    node = operation.input_value(const_port_id).get_node() 
    queue = deque([node]) 
    constant_node = None 
    allowed_propagation_types_list = ["Convert", "FakeQuantize", "Reshape"] 

    while len(queue) != 0: 
        curr_node = queue.popleft() 
        if curr_node.get_type_name() == "Constant": 
            constant_node = curr_node 
            break 
        if len(curr_node.inputs()) == 0: 
            break 
        if curr_node.get_type_name() in allowed_propagation_types_list: 
            queue.append(curr_node.input_value(0).get_node())

    return constant_node 

def is_embedding(node) -> bool: 
    allowed_types_list = ["f16", "f32", "f64"] 
    const_port_id = 0 
    input_tensor = node.input_value(const_port_id) 
    if input_tensor.get_element_type().get_type_name() in allowed_types_list: 
        const_node = get_operation_const_op(node, const_port_id) 
        if const_node is not None: 
            return True 

    return False 

def collect_ops_with_weights(model): 
    ops_with_weights = [] 
    for op in model.get_ops(): 
        if op.get_type_name() == "MatMul": 
            constant_node_0 = get_operation_const_op(op, const_port_id=0) 
            constant_node_1 = get_operation_const_op(op, const_port_id=1) 
            if constant_node_0 or constant_node_1: 
                ops_with_weights.append(op.get_friendly_name()) 
        if op.get_type_name() == "Gather" and is_embedding(op): 
            ops_with_weights.append(op.get_friendly_name()) 

    return ops_with_weights
%%skip not $to_quantize.value 

import nncf 
import logging 
from nncf.quantization.advanced_parameters import AdvancedSmoothQuantParameters 

nncf.set_log_level(logging.ERROR) 

if not OV_INT8_UNET_PATH.exists(): 
    diffusion_model = core.read_model(UNET_PATH) 
    unet_ignored_scope = collect_ops_with_weights(diffusion_model) 
    compressed_diffusion_model = nncf.compress_weights(diffusion_model, ignored_scope=nncf.IgnoredScope(types=['Convolution'])) 
    quantized_diffusion_model = nncf.quantize( 
        model=diffusion_model, 
        calibration_dataset=nncf.Dataset(calibration_data), 
        subset_size=subset_size, 
        model_type=nncf.ModelType.TRANSFORMER, 
        # さらに、世代の質を向上させるため最初の畳み込みを無視 
        ignored_scope=nncf.IgnoredScope(names=unet_ignored_scope + ["__module.conv_in/aten::_convolution/Convolution"]), 

advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alphas=AdvancedSmoothQuantParameters(matmul=-1)) 
    ) 
    ov.save_model(quantized_diffusion_model, OV_INT8_UNET_PATH)

重み圧縮の実行#

vae encoder と VAE Decoder を量子化しても推論パフォーマンスは大幅に向上せず、精度が大幅に低下する可能性があります。重み圧縮テンソルごとに 1 つのスケールのみが許されます。フットプリントの削減に適用されます。

%%skip not $to_quantize.value 

nncf.set_log_level(logging.INFO) 

if not OV_INT8_VAE_ENCODER_PATH.exists(): 
    text_encoder_model = core.read_model(VAE_ENCODER_PATH) 
    compressed_text_encoder_model = nncf.compress_weights(text_encoder_model, mode=nncf.CompressWeightsMode.INT4_SYM, group_size=64) 
    ov.save_model(compressed_text_encoder_model, OV_INT8_VAE_ENCODER_PATH) 

if not OV_INT8_VAE_DECODER_PATH.exists(): 
    decoder_model = core.read_model(VAE_DECODER_PATH) 
    compressed_decoder_model = nncf.compress_weights(decoder_model, mode=nncf.CompressWeightsMode.INT4_SYM, group_size=64) 
    ov.save_model(compressed_decoder_model, OV_INT8_VAE_DECODER_PATH)
INFO:nncf:Statistics of the bitwidth distribution: 
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑ 
│   Num bits (N) │ % all parameters (layers)   │ % ratio-defining parameters (layers)   │ 
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥ 
│              8 │ 98% (29 / 32)               │ 0% (0 / 3)                             │ 
├────────────────┼─────────────────────────────┼────────────────────────────────────────┤ 
│              4 │ 2% (3 / 32)                 │ 100% (3 / 3)                           │ 
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
INFO:nncf:Statistics of the bitwidth distribution: 
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑ 
│   Num bits (N) │ % all parameters (layers)   │ % ratio-defining parameters (layers)   │ 
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥ 
│              8 │ 99% (65 / 68)               │ 0% (0 / 3)                             │ 
├────────────────┼─────────────────────────────┼────────────────────────────────────────┤ 
│              4 │ 1% (3 / 68)                 │ 100% (3 / 3)                           │ 
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()

元のパイプラインと最適化されたパイプラインによって生成されたビデオを比較します。

%%skip not $to_quantize.value 

ov_int8_vae_encoder = core.compile_model(OV_INT8_VAE_ENCODER_PATH, device.value) 
ov_int8_unet = core.compile_model(OV_INT8_UNET_PATH, device.value) 
ov_int8_decoder = core.compile_model(OV_INT8_VAE_DECODER_PATH, device.value) 

ov_int8_pipeline = OVStableVideoDiffusionPipeline( 
    ov_int8_vae_encoder, image_encoder, ov_int8_unet, ov_int8_decoder, scheduler, feature_extractor 
) 

int8_frames = ov_int8_pipeline( 
    image, 
    num_inference_steps=4, 
    motion_bucket_id=60, 
    num_frames=8, 
    height=320, 
    width=512, 
    generator=torch.manual_seed(12342), 
).frames[0]
0%|          | 0/4 [00:00<?, ?it/s]
/home/ltalamanova/env_ci/lib/python3.8/site-packages/diffusers/configuration_utils.py:139: FutureWarning: Accessing config attribute unet directly via 'OVStableVideoDiffusionPipeline' object attribute is deprecated. Please access 'unet' over 'OVStableVideoDiffusionPipeline's config object instead, e.g. 'scheduler.config.unet'. 
  deprecate("direct config name access", "1.0.0", deprecation_message, standard_warn=False)
denoise currently 
tensor(128.5637) 
denoise currently 
tensor(13.6784) 
denoise currently 
tensor(0.4969) 
denoise currently 
tensor(0.)
int8_out_path = Path("generated_int8.mp4") 

export_to_video(frames, str(out_path), fps=7) 
int8_frames[0].save( 
    "generated_int8.gif", 
    save_all=True, 
    append_images=int8_frames[1:], 
    optimize=False, 
    duration=120, 
    loop=0, 
) 
HTML('<img src="generated_int8.gif">')

モデルのファイルサイズを比較#

%%skip not $to_quantize.value 

fp16_model_paths = [VAE_ENCODER_PATH, UNET_PATH, VAE_DECODER_PATH] 
int8_model_paths = [OV_INT8_VAE_ENCODER_PATH, OV_INT8_UNET_PATH, OV_INT8_VAE_DECODER_PATH] 

for fp16_path, int8_path in zip(fp16_model_paths, int8_model_paths): 
    fp16_ir_model_size = fp16_path.with_suffix(".bin").stat().st_size 
    int8_model_size = int8_path.with_suffix(".bin").stat().st_size 
    print(f"{fp16_path.stem} compression rate: {fp16_ir_model_size / int8_model_size:.3f}")
vae_encoder compression rate: 2.018 
unet compression rate: 1.996 
vae_decoder compression rate: 2.007

FP16 モデルと INT8 パイプラインの推論時間を比較#

FP16 および INT8 パイプラインの推論パフォーマンスを測定するには、キャリブレーション・サブセットの推論時間の中央値を使用します。

: 最も正確なパフォーマンス推定を行うには、他のアプリケーションを閉じた後、ターミナル/コマンドプロンプトで benchmark_app を実行することを推奨します。

%%skip not $to_quantize.value 

import time 

def calculate_inference_time(pipeline, validation_data): 
    inference_time = [] 
    for prompt in validation_data: 
        start = time.perf_counter() 
        with io.capture_output() as captured:
             _ = pipeline( 
                image, 
                num_inference_steps=4, 
                motion_bucket_id=60, 
                num_frames=8, 
                height=320, 
                width=512, 
                generator=torch.manual_seed(12342), 
            ) 
        end = time.perf_counter() 
        delta = end - start 
        inference_time.append(delta) 
    return np.median(inference_time)
%%skip not $to_quantize.value 

validation_size = 3 
validation_dataset = datasets.load_dataset("fusing/instructpix2pix-1000-samples", split="train", streaming=True).shuffle(seed=42).take(validation_size) 
validation_data = [data["input_image"] for data in validation_dataset] 

fp_latency = calculate_inference_time(ov_pipe, validation_data) 
int8_latency = calculate_inference_time(ov_int8_pipeline, validation_data) 
print(f"Performance speed-up: {fp_latency / int8_latency:.3f}")
Performance speed-up: 1.243

インタラクティブなデモ#

インタラクティブなデモを起動するため量子化モデルを使用するかどうか以下で選択してください。

quantized_model_present = ov_int8_pipeline is not None 

use_quantized_model = widgets.Checkbox( 
    value=quantized_model_present, 
    description="Use quantized model", 
    disabled=not quantized_model_present, 
) 

use_quantized_model
Checkbox(value=True, description='Use quantized model')
import gradio as gr 
import random 

max_64_bit_int = 2**63 - 1 
pipeline = ov_int8_pipeline if use_quantized_model.value else ov_pipe 

example_images_urls = [ 
    "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/ship-7833921_1280.jpg?download=true", 
    "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/ai-generated-8476858_1280.png?download=true", 
    "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/ai-generated-8481641_1280.jpg?download=true", 
    "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/dog-7396912_1280.jpg?download=true", 
    "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/cupcakes-380178_1280.jpg?download=true", 
] 

example_images_dir = Path("example_images") 
example_images_dir.mkdir(exist_ok=True) 
example_imgs = [] 

for image_id, url in enumerate(example_images_urls): 
    img = load_image(url) 
    image_path = example_images_dir / f"{image_id}.png" 
    img.save(image_path) 
    example_imgs.append([image_path]) 

def sample( 
    image: PIL.Image, 
    seed: Optional[int] = 42, 
    randomize_seed: bool = True, 
    motion_bucket_id: int = 127, 
    fps_id: int = 6, 
    num_inference_steps: int = 15, 
    num_frames: int = 4, 
    max_guidance_scale=1.0, 
    min_guidance_scale=1.0, 
    decoding_t: int = 8, # 一度にデコードされるフレームの数。これはほとんどの VRAM を消費します。必要に応じて減らします 
    output_folder: str = "outputs", 
    progress=gr.Progress(track_tqdm=True), 
): 
    if image.mode == "RGBA": 
        image = image.convert("RGB") 

    if randomize_seed: 
        seed = random.randint(0, max_64_bit_int) 
    generator = torch.manual_seed(seed) 

    output_folder = Path(output_folder) 
    output_folder.mkdir(exist_ok=True) 
    base_count = len(list(output_folder.glob("*.mp4"))) 
    video_path = output_folder / f"{base_count:06d}.mp4" 

    frames = pipeline( 
        image, 
        decode_chunk_size=decoding_t, 
        generator=generator, 
        motion_bucket_id=motion_bucket_id, 
        noise_aug_strength=0.1, 
        num_frames=num_frames, 
        num_inference_steps=num_inference_steps, 
        max_guidance_scale=max_guidance_scale, 
        min_guidance_scale=min_guidance_scale, 
    ).frames[0] 
    export_to_video(frames, str(video_path), fps=fps_id) 

    return video_path, seed 

def resize_image(image, output_size=(512, 320)):
    # アスペクト比を計算 
    target_aspect = output_size[0] / output_size[1] # 希望するサイズのアスペクト比 
    image_aspect = image.width / image.height # 元の画像のアスペクト比 

    # 元の画像が大きい場合はサイズを変更して切り取り 
    if image_aspect > target_aspect:
        # アスペクト比を維持しながら、ターゲットの高さに合わせて画像のサイズを変更 
        new_height = output_size[1] 
        new_width = int(new_height * image_aspect) 
        resized_image = image.resize((new_width, new_height), PIL.Image.LANCZOS) 
        # 切り抜きの座標を計算 
        left = (new_width - output_size[0]) / 2 
        top = 0 
        right = (new_width + output_size[0]) / 2 
        bottom = output_size[1] 
    else:
        # 縦横比を維持しながら、ターゲットの幅に合わせて画像のサイズを変更 
        new_width = output_size[0] 
        new_height = int(new_width / image_aspect) 
        resized_image = image.resize((new_width, new_height), PIL.Image.LANCZOS) 
        # 切り抜きの座標を計算 
        left = 0 
        top = (new_height - output_size[1]) / 2 
        right = output_size[0] 
        bottom = (new_height + output_size[1]) / 2 

    # 画像を切り取り 
    cropped_image = resized_image.crop((left, top, right, bottom)) 
    return cropped_image 

with gr.Blocks() as demo: 
    gr.Markdown( 
        """# Stable Video Diffusion: Image to Video Generation with OpenVINO.
    """ 
    ) 
    with gr.Row(): 
        with gr.Column(): 
            image_in = gr.Image(label="Upload your image", type="pil") 
            generate_btn = gr.Button("Generate") 
        video = gr.Video() 
    with gr.Accordion("Advanced options", open=False): 
        seed = gr.Slider( 
            label="Seed", 
            value=42, 
            randomize=True, 
            minimum=0, 
            maximum=max_64_bit_int, 
            step=1, 
        ) 
        randomize_seed = gr.Checkbox(label="Randomize seed", value=True) 
        motion_bucket_id = gr.Slider( 
            label="Motion bucket id", 
            info="Controls how much motion to add/remove from the image", 
            value=127, 
            minimum=1, 
            maximum=255, 
        ) 
        fps_id = gr.Slider( 
            label="Frames per second", 
            info="The length of your video in seconds will be num_frames / fps", 
            value=6, 
            minimum=5, 
            maximum=30, 
            step=1, 
        ) 
        num_frames = gr.Slider(label="Number of Frames", value=8, minimum=2, maximum=25, step=1) 
        num_steps = gr.Slider(label="Number of generation steps", value=4, minimum=1, maximum=8, step=1) 
        max_guidance_scale = gr.Slider( 
            label="Max guidance scale", 
            info="classifier-free guidance strength", 
            value=1.2, 
            minimum=1, 
            maximum=2, 
        ) 
        min_guidance_scale = gr.Slider( 
            label="Min guidance scale", 
            info="classifier-free guidance strength", 
            value=1, 
            minimum=1, 
            maximum=1.5, 
        ) 
    examples = gr.Examples( 
        examples=example_imgs, 
        inputs=[image_in], 
        outputs=[video, seed], 
    ) 

    image_in.upload(fn=resize_image, inputs=image_in, outputs=image_in) 
    generate_btn.click( 
        fn=sample, 
        inputs=[ 
            image_in, 
            seed, 
            randomize_seed, 
            motion_bucket_id, 
            fps_id, 
            num_steps, 
            num_frames, 
            max_guidance_scale, 
            min_guidance_scale, 
        ], 
        outputs=[video, seed], 
        api_name="video", 
    ) 

try: 
    demo.queue().launch(debug=False) 
except Exception: 
    demo.queue().launch(debug=False, share=True) 
# リモートで起動する場合は、server_name と server_port を指定 
# demo.launch(server_name='your server name', server_port='server port in int') 
# 詳細はドキュメントをご覧ください: https://gradio.app/docs/