Stable Video Diffusion による画像からのビデオ生成#
この Jupyter ノートブックは、ローカルへのインストール後にのみ起動できます。
Stable Video Diffusion (SVD) の画像からビデオは、静止画像を条件フレームとして取り込み、そこからビデオを生成する拡散モデルです。このチュートリアルでは、OpenVINO を使用して Stable Video Diffusion を実行する方法を説明します。ここでは、stable-video-diffusion-img2video-xt モデルを使用します。さらに、ビデオ生成プロセスを高速化するため、AnimateLCM LoRA 重みを適用し、NNCF を使用して最適化を行います。
目次:
必要条件#
%pip install -q "torch>=2.1" "diffusers>=0.25" "peft==0.6.2" "transformers" "openvino>=2024.1.0" Pillow opencv-python tqdm "gradio>=4.19" safetensors --extra-index-url https://download.pytorch.org/whl/cpu
%pip install -q datasets "nncf>=2.10.0"
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
PyTorch モデルをダウンロード#
以下のコードは、Diffusers ライブラリーを使用して Stable Video Diffusion XT モデルを読み込み、Consistency Distilled AnimateLCM 重みを適用します。
import torch
from pathlib import Path
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
from diffusers.models.attention_processor import AttnProcessor
from safetensors import safe_open
import gc
import requests
lcm_scheduler_url = "https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/raw/main/lcm_scheduler.py"
r = requests.get(lcm_scheduler_url)
with open("lcm_scheduler.py", "w") as f:
f.write(r.text)
from lcm_scheduler import AnimateLCMSVDStochasticIterativeScheduler
from huggingface_hub import hf_hub_download
MODEL_DIR = Path("model")
IMAGE_ENCODER_PATH = MODEL_DIR / "image_encoder.xml"
VAE_ENCODER_PATH = MODEL_DIR / "vae_encoder.xml"
VAE_DECODER_PATH = MODEL_DIR / "vae_decoder.xml"
UNET_PATH = MODEL_DIR / "unet.xml"
load_pt_pipeline = not (VAE_ENCODER_PATH.exists() and VAE_DECODER_PATH.exists() and UNET_PATH.exists() and IMAGE_ENCODER_PATH.exists())
unet, vae, image_encoder = None, None, None
if load_pt_pipeline:
noise_scheduler = AnimateLCMSVDStochasticIterativeScheduler(
num_train_timesteps=40,
sigma_min=0.002,
sigma_max=700.0,
sigma_data=1.0,
s_noise=1.0,
rho=7,
clip_denoised=False,
)
pipe = StableVideoDiffusionPipeline.from_pretrained(
"stabilityai/stable-video-diffusion-img2vid-xt",
variant="fp16",
scheduler=noise_scheduler,
)
pipe.unet.set_attn_processor(AttnProcessor())
hf_hub_download(
repo_id="wangfuyun/AnimateLCM-SVD-xt",
filename="AnimateLCM-SVD-xt.safetensors",
local_dir="./checkpoints",
)
state_dict = {}
LCM_LORA_PATH = Path(
"checkpoints/AnimateLCM-SVD-xt.safetensors",
)
with safe_open(LCM_LORA_PATH, framework="pt", device="cpu") as f:
for key in f.keys():
state_dict[key] = f.get_tensor(key)
missing, unexpected = pipe.unet.load_state_dict(state_dict, strict=True)
pipe.scheduler.save_pretrained(MODEL_DIR / "scheduler")
pipe.feature_extractor.save_pretrained(MODEL_DIR / "feature_extractor")
unet = pipe.unet
unet.eval()
vae = pipe.vae
vae.eval()
image_encoder = pipe.image_encoder
image_encoder.eval()
del pipe
gc.collect()
# コンディショニング画像を読み込み
image = load_image("https://huggingface.co/datasets/huggingface/documentation-images/resolve/main/diffusers/svd/rocket.png?download=true")
image = image.resize((512, 256))
モデルを OpenVINO 中間表現に変換#
OpenVINO は、中間表現 (IR) への変換により PyTorch モデルをサポートします。OpenVINO ov.Model
オブジェクト・インスタンスを取得するには、モデル・オブジェクト、モデルトレース用の入力データを ov.convert_model
関数に提供する必要があります。ov.save_model
関数を使用して、次回のデプロイのためにモデルをディスクに保存できます。
Stable Video Diffusion は 3 つの部分で構成されます:
入力画像から埋め込みを抽出する画像エンコーダー。
段階的にノイズを除去する潜像ビデオクリップの U-Net。
入力画像を潜在空間にエンコードし、生成されたビデオをデコードする VAE。
各パーツを変換してみます。
画像エンコーダー#
import openvino as ov
def cleanup_torchscript_cache():
"""
Helper for removing cached model representation
"""
torch._C._jit_clear_class_registry()
torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
torch.jit._state._clear_class_state()
if not IMAGE_ENCODER_PATH.exists():
with torch.no_grad():
ov_model = ov.convert_model(
image_encoder,
example_input=torch.zeros((1, 3, 224, 224)),
input=[-1, 3, 224, 224],
)
ov.save_model(ov_model, IMAGE_ENCODER_PATH)
del ov_model
cleanup_torchscript_cache()
print(f"Image Encoder successfully converted to IR and saved to {IMAGE_ENCODER_PATH}")
del image_encoder
gc.collect();
U-net#
if not UNET_PATH.exists():
unet_inputs = {
"sample": torch.ones([2, 2, 8, 32, 32]),
"timestep": torch.tensor(1.256),
"encoder_hidden_states": torch.zeros([2, 1, 1024]),
"added_time_ids": torch.ones([2, 3]),
}
with torch.no_grad():
ov_model = ov.convert_model(unet, example_input=unet_inputs)
ov.save_model(ov_model, UNET_PATH)
del ov_model
cleanup_torchscript_cache()
print(f"UNet successfully converted to IR and saved to {UNET_PATH}")
del unet
gc.collect();
VAE エンコーダーとデコーダー#
前述したように、VAE モデルは初期画像のエンコードと生成されたビデオのデコードに使用されます。エンコードとデコードは異なるパイプライン・ステージで行われるため、使い勝手を考慮して VAE をエンコードとデコードの 2 つに分割します。
class VAEEncoderWrapper(torch.nn.Module):
def __init__(self, vae):
super().__init__()
self.vae = vae
def forward(self, image):
return self.vae.encode(x=image)["latent_dist"].sample()
class VAEDecoderWrapper(torch.nn.Module):
def __init__(self, vae):
super().__init__()
self.vae = vae
def forward(self, latents, num_frames: int):
return self.vae.decode(latents, num_frames=num_frames)
if not VAE_ENCODER_PATH.exists():
vae_encoder = VAEEncoderWrapper(vae)
with torch.no_grad():
ov_model = ov.convert_model(vae_encoder, example_input=torch.zeros((1, 3, 576, 1024)))
ov.save_model(ov_model, VAE_ENCODER_PATH)
cleanup_torchscript_cache()
print(f"VAE Encoder successfully converted to IR and saved to {VAE_ENCODER_PATH}")
del vae_encoder
gc.collect()
if not VAE_DECODER_PATH.exists():
vae_decoder = VAEDecoderWrapper(vae)
with torch.no_grad():
ov_model = ov.convert_model(vae_decoder, example_input=(torch.zeros((8, 4, 72, 128)), torch.tensor(8)))
ov.save_model(ov_model, VAE_DECODER_PATH)
cleanup_torchscript_cache()
print(f"VAE Decoder successfully converted to IR and saved to {VAE_ENCODER_PATH}")
del vae_decoder
gc.collect()
del vae
gc.collect();
推論パイプラインの準備#
以下のコードは、OpenVINO を使用してビデオ生成を実行する OVStableVideoDiffusionPipeline
クラスを実装します。パイプラインは入力画像を受け入れ、生成されたフレームのシーケンスを返します。下図は、簡略化されたパイプライン・ワークフローを表しています。
svd#
このパイプラインは、Stable Diffusion 画像から画像生成パイプラインに非常に似ていますが、テキスト・エンコーダーの代わりに画像エンコーダーが使用される点が異なります。モデルは入力画像とランダムシードを初期プロンプトとして受け取ります。次に、画像は画像エンコーダーを使用して埋め込み空間にエンコードされ、VAE Encoder を使用して潜在空間にエンコードされ、U-Net モデルへの入力として渡されます。次に、U-Net モデルは、画像埋め込みを条件として、ランダムな潜在ビデオ表現を繰り返しノイズ除去します。U-Net の出力はノイズ残差であり、生成サイクルの次の反復のためのスケジューラー・アルゴリズムによりノイズ除去された潜在画像表現を計算するのに使用されます。このプロセスは指定された回数繰り返され、最後に、VAE デコーダーはノイズ除去された潜在データをビデオフレームのシーケンスに変換します。
from diffusers.pipelines.pipeline_utils import DiffusionPipeline
import PIL.Image
from diffusers.image_processor import VaeImageProcessor
from diffusers.utils.torch_utils import randn_tensor
from typing import Callable, Dict, List, Optional, Union
from diffusers.pipelines.stable_video_diffusion import (
StableVideoDiffusionPipelineOutput,
)
def _append_dims(x, target_dims):
"""Appends dimensions to the end of a tensor until it has target_dims dimensions."""
dims_to_append = target_dims - x.ndim
if dims_to_append < 0:
raise ValueError(f"input has {x.ndim} dims but target_dims is {target_dims}, which is less")
return x[(...,) + (None,) * dims_to_append]
def tensor2vid(video: torch.Tensor, processor, output_type="np"):
# Based on:
# https://github.com/modelscope/modelscope/blob/1509fdb973e5871f37148a4b5e5964cafd43e64d/modelscope/pipelines/multi_modal/text_to_video_synthesis_pipeline.py#L78
batch_size, channels, num_frames, height, width = video.shape
outputs = []
for batch_idx in range(batch_size):
batch_vid = video[batch_idx].permute(1, 0, 2, 3)
batch_output = processor.postprocess(batch_vid, output_type)
outputs.append(batch_output)
return outputs
class OVStableVideoDiffusionPipeline(DiffusionPipeline):
r"""
Pipeline to generate video from an input image using Stable Video Diffusion.
This model inherits from [`DiffusionPipeline`].Check the superclass documentation for the generic methods
implemented for all pipelines (downloading, saving, running on a particular device, etc.).
Args:
vae ([`AutoencoderKL`]):
Variational Auto-Encoder (VAE) model to encode and decode images to and from latent representations.
image_encoder ([`~transformers.CLIPVisionModelWithProjection`]):
Frozen CLIP image-encoder ([laion/CLIP-ViT-H-14-laion2B-s32B-b79K](https://huggingface.co/laion/CLIP-ViT-H-14-laion2B-s32B-b79K)).
unet ([`UNetSpatioTemporalConditionModel`]):
A `UNetSpatioTemporalConditionModel` to denoise the encoded image latents.
scheduler ([`EulerDiscreteScheduler`]):
A scheduler to be used in combination with `unet` to denoise the encoded image latents.
feature_extractor ([`~transformers.CLIPImageProcessor`]):
A `CLIPImageProcessor` to extract features from generated images.
"""
def __init__(
self,
vae_encoder,
image_encoder,
unet,
vae_decoder,
scheduler,
feature_extractor,
):
super().__init__()
self.vae_encoder = vae_encoder
self.vae_decoder = vae_decoder
self.image_encoder = image_encoder
self.register_to_config(unet=unet)
self.scheduler = scheduler
self.feature_extractor = feature_extractor
self.vae_scale_factor = 2 ** (4 - 1)
self.image_processor = VaeImageProcessor(vae_scale_factor=self.vae_scale_factor)
def _encode_image(self, image, device, num_videos_per_prompt, do_classifier_free_guidance):
dtype = torch.float32
if not isinstance(image, torch.Tensor):
image = self.image_processor.pil_to_numpy(image)
image = self.image_processor.numpy_to_pt(image)
# 元の実装に合わせてサイズを変更する前に画像を正規化
# 次に、サイズを変更した後に非正規化
image = image * 2.0 - 1.0
image = _resize_with_antialiasing(image, (224, 224))
image = (image + 1.0) / 2.0
# CLIP 入力用に画像を正規化
image = self.feature_extractor(
images=image,
do_normalize=True,
do_center_crop=False,
do_resize=False,
do_rescale=False,
return_tensors="pt",
).pixel_values
image = image.to(device=device, dtype=dtype)
image_embeddings = torch.from_numpy(self.image_encoder(image)[0])
image_embeddings = image_embeddings.unsqueeze(1)
# mps フレンドリーな方法を使用して、プロンプトごとに各世代の画像埋め込みを複製
bs_embed, seq_len, _ = image_embeddings.shape
image_embeddings = image_embeddings.repeat(1, num_videos_per_prompt, 1)
image_embeddings = image_embeddings.view(bs_embed * num_videos_per_prompt, seq_len, -1)
if do_classifier_free_guidance:
negative_image_embeddings = torch.zeros_like(image_embeddings)
# 分類器のフリーガイダンスでは、2 回のフォワードパスを実行する必要があります。
# ここでは、無条件埋め込みとテキスト埋め込みを 1 つのバッチに連結して、
# 2 回のフォワードパスを回避します
image_embeddings = torch.cat([negative_image_embeddings, image_embeddings])
return image_embeddings
def _encode_vae_image(
self,
image: torch.Tensor,
device,
num_videos_per_prompt,
do_classifier_free_guidance,
):
image_latents = torch.from_numpy(self.vae_encoder(image)[0])
if do_classifier_free_guidance:
negative_image_latents = torch.zeros_like(image_latents)
# 分類器のフリーガイダンスでは、2 回のフォワードパスが必要です
# ここでは、無条件埋め込みとテキスト埋め込みを 1 つのバッチに連結して、
# 2 回のフォワードパスを回避
image_latents = torch.cat([negative_image_latents, image_latents])
# mps フレンドリーな方法を使用して、プロンプトごとに各世代の image_latents を複製
image_latents = image_latents.repeat(num_videos_per_prompt, 1, 1, 1)
return image_latents
def _get_add_time_ids(
self,
fps,
motion_bucket_id,
noise_aug_strength,
dtype,
batch_size,
num_videos_per_prompt,
do_classifier_free_guidance,
):
add_time_ids = [fps, motion_bucket_id, noise_aug_strength]
passed_add_embed_dim = 256 * len(add_time_ids)
expected_add_embed_dim = 3 * 256
if expected_add_embed_dim != passed_add_embed_dim:
raise ValueError(
f"Model expects an added time embedding vector of length {expected_add_embed_dim}, but a vector of {passed_add_embed_dim} was created. The model has an incorrect config. Please check `unet.config.time_embedding_type` and `text_encoder_2.config.projection_dim`." )
add_time_ids = torch.tensor([add_time_ids], dtype=dtype)
add_time_ids = add_time_ids.repeat(batch_size * num_videos_per_prompt, 1)
if do_classifier_free_guidance:
add_time_ids = torch.cat([add_time_ids, add_time_ids])
return add_time_ids
def decode_latents(self, latents, num_frames, decode_chunk_size=14):
# [batch, frames, channels, height, width] -> [batch*frames, channels, height, width]
latents = latents.flatten(0, 1)
latents = 1 / 0.18215 * latents
# OOM を回避するため、decode_chunk_size フレームを一度にデコード
frames = []
for i in range(0, latents.shape[0], decode_chunk_size):
frame = torch.from_numpy(self.vae_decoder([latents[i : i + decode_chunk_size], num_frames])[0])
frames.append(frame)
frames = torch.cat(frames, dim=0)
# [batch*frames, channels, height, width] -> [batch, channels, frames, height, width]
frames = frames.reshape(-1, num_frames, *frames.shape[1:]).permute(0, 2, 1, 3, 4)
# 常に float32 にキャストします。これは大きなオーバーヘッドを発生せず、bfloat16 と互換性があります
frames = frames.float()
return frames
def check_inputs(self, image, height, width):
if not isinstance(image, torch.Tensor) and not isinstance(image, PIL.Image.Image) and not isinstance(image, list):
raise ValueError("`image` has to be of type `torch.FloatTensor` or `PIL.Image.Image` or `List[PIL.Image.Image]` but is" f" {type(image)}")
if height % 8 != 0 or width % 8 != 0:
raise ValueError(f"`height` and `width` have to be divisible by 8 but are {height} and {width}.")
def prepare_latents(
self,
batch_size,
num_frames,
num_channels_latents,
height,
width,
dtype,
device,
generator,
latents=None,
):
shape = (
batch_size,
num_frames,
num_channels_latents // 2,
height // self.vae_scale_factor,
width // self.vae_scale_factor,
)
if isinstance(generator, list) and len(generator) != batch_size:
raise ValueError(
f"You have passed a list of generators of length {len(generator)}, but requested an effective batch"
f" size of {batch_size}.Make sure the batch size matches the length of the generators." )
if latents is None:
latents = randn_tensor(shape, generator=generator, device=device, dtype=dtype)
else:
latents = latents.to(device)
# スケジューラーが要求する標準偏差で初期ノイズをスケール
latents = latents * self.scheduler.init_noise_sigma
return latents
@torch.no_grad()
def __call__(
self,
image: Union[PIL.Image.Image, List[PIL.Image.Image], torch.FloatTensor],
height: int = 320,
width: int = 512,
num_frames: Optional[int] = 8,
num_inference_steps: int = 4,
min_guidance_scale: float = 1.0,
max_guidance_scale: float = 1.2,
fps: int = 7,
motion_bucket_id: int = 80,
noise_aug_strength: int = 0.01,
decode_chunk_size: Optional[int] = None,
num_videos_per_prompt: Optional[int] = 1,
generator: Optional[Union[torch.Generator, List[torch.Generator]]] = None,
latents: Optional[torch.FloatTensor] = None,
output_type: Optional[str] = "pil",
callback_on_step_end: Optional[Callable[[int, int, Dict], None]] = None,
callback_on_step_end_tensor_inputs: List[str] = ["latents"],
return_dict: bool = True,
):
r"""
The call function to the pipeline for generation.
Args:discussed
image (`PIL.Image.Image` or `List[PIL.Image.Image]` or `torch.FloatTensor`):
Image or images to guide image generation.If you provide a tensor, it needs to be compatible with
[`CLIPImageProcessor`](https://huggingface.co/lambdalabs/sd-image-variations-diffusers/blob/main/feature_extractor/preprocessor_config.json).
height (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`):
The height in pixels of the generated image.
width (`int`, *optional*, defaults to `self.unet.config.sample_size * self.vae_scale_factor`):
The width in pixels of the generated image.
num_frames (`int`, *optional*):
The number of video frames to generate.Defaults to 14 for `stable-video-diffusion-img2vid` and to 25 for `stable-video-diffusion-img2vid-xt`
num_inference_steps (`int`, *optional*, defaults to 25):
The number of denoising steps.More denoising steps usually lead to a higher quality image at the
expense of slower inference.This parameter is modulated by `strength`.
min_guidance_scale (`float`, *optional*, defaults to 1.0):
The minimum guidance scale.Used for the classifier free guidance with first frame.
max_guidance_scale (`float`, *optional*, defaults to 3.0):
The maximum guidance scale.Used for the classifier free guidance with last frame.
fps (`int`, *optional*, defaults to 7):
Frames per second. The rate at which the generated images shall be exported to a video after generation.
Note that Stable Diffusion Video's UNet was micro-conditioned on fps-1 during training.
motion_bucket_id (`int`, *optional*, defaults to 127):
The motion bucket ID. Used as conditioning for the generation. The higher the number the more motion will be in the video.
noise_aug_strength (`int`, *optional*, defaults to 0.02):
The amount of noise added to the init image, the higher it is the less the video will look like the init image.
Increase it for more motion.
decode_chunk_size (`int`, *optional*):
The number of frames to decode at a time.The higher the chunk size, the higher the temporal consistency
between frames, but also the higher the memory consumption.By default, the decoder will decode all frames at once
for maximal quality.Reduce `decode_chunk_size` to reduce memory usage.
num_videos_per_prompt (`int`, *optional*, defaults to 1):
The number of images to generate per prompt.
generator (`torch.Generator` or `List[torch.Generator]`, *optional*):
A [`torch.Generator`](https://pytorch.org/docs/stable/generated/torch.Generator.html) to make
generation deterministic.
latents (`torch.FloatTensor`, *optional*):
Pre-generated noisy latents sampled from a Gaussian distribution, to be used as inputs for image
generation.Can be used to tweak the same generation with different prompts.If not provided, a latents
tensor is generated by sampling using the supplied random `generator`.
output_type (`str`, *optional*, defaults to `"pil"`):
The output format of the generated image.Choose between `PIL.Image` or `np.array`.
callback_on_step_end (`Callable`, *optional*):
A function that calls at the end of each denoising steps during the inference.The function is called
with the following arguments:
`callback_on_step_end(self: DiffusionPipeline, step: int, timestep: int,
callback_kwargs: Dict)`. `callback_kwargs` will include a list of all tensors as specified by
`callback_on_step_end_tensor_inputs`.
callback_on_step_end_tensor_inputs (`List`, *optional*):
The list of tensor inputs for the `callback_on_step_end` function.The tensors specified in the list
will be passed as `callback_kwargs` argument.You will only be able to include variables listed in the
`._callback_tensor_inputs` attribute of your pipeline class.
return_dict (`bool`, *optional*, defaults to `True`):
Whether or not to return a [`~pipelines.stable_diffusion.StableDiffusionPipelineOutput`] instead of a
plain tuple.
Returns:
[`~pipelines.stable_diffusion.StableVideoDiffusionPipelineOutput`] or `tuple`:
If `return_dict` is `True`, [`~pipelines.stable_diffusion.StableVideoDiffusionPipelineOutput`] is returned,
otherwise a `tuple` is returned where the first element is a list of list with the generated frames.
Examples:
```py
from diffusers import StableVideoDiffusionPipeline
from diffusers.utils import load_image, export_to_video
pipe = StableVideoDiffusionPipeline.from_pretrained("stabilityai/stable-video-diffusion-img2vid-xt", torch_dtype=torch.float16, variant="fp16")
pipe.to("cuda")
image = load_image("https://lh3.googleusercontent.com/y-iFOHfLTwkuQSUegpwDdgKmOjRSTvPxat63dQLB25xkTs4lhIbRUFeNBWZzYf370g=s1200")
image = image.resize((1024, 576))
frames = pipe(image, num_frames=25, decode_chunk_size=8).frames[0]
export_to_video(frames, "generated.mp4", fps=7)
```
"""
# 0. Unet のデフォルトの高さと幅
height = height or 96 * self.vae_scale_factor
width = width or 96 * self.vae_scale_factor
num_frames = num_frames if num_frames is not None else 25
decode_chunk_size = decode_chunk_size if decode_chunk_size is not None else num_frames
# 1. 入力を確認。正しくない場合はエラーを発生
self.check_inputs(image, height, width)
# 2. 呼び出しパラメーターを定義
if isinstance(image, PIL.Image.Image):
batch_size = 1
elif isinstance(image, list):
batch_size = len(image)
else:
batch_size = image.shape[0]
device = torch.device("cpu")
# ここで、`guidance_scale` は、Imagen 論文 (https://arxiv.org/pdf/2205.11487.pdf)
# の式 (2) のガイダンス重み`w` と同様に定義されます。`guidance_scale = 1`
# は、分類器フリーのガイダンスを行わないことに相当
do_classifier_free_guidance = max_guidance_scale > 1.0
# 3. 入力画像をエンコード
image_embeddings = self._encode_image(image, device, num_videos_per_prompt, do_classifier_free_guidance)
# 注: Stable Diffusion Video は fps - 1 で条件付けされていたため、
# ここでは削減されています
# https://github.com/Stability-AI/generative-models/blob/ed0997173f98eaf8f4edf7ba5fe8f15c6b877fd3/scripts/sampling/simple_video_sample.py#L188 を参照
fps = fps - 1
# 4. VAE を使用して入力画像をエンコード
image = self.image_processor.preprocess(image, height=height, width=width)
noise = randn_tensor(image.shape, generator=generator, device=image.device, dtype=image.dtype)
image = image + noise_aug_strength * noise
image_latents = self._encode_vae_image(image, device, num_videos_per_prompt, do_classifier_free_guidance)
image_latents = image_latents.to(image_embeddings.dtype)
# 各フレームの画像潜在変数を繰り返してノイズと連結
# image_latents [batch, channels, height, width] ->[batch, num_frames, channels, height, width]
image_latents = image_latents.unsqueeze(1).repeat(1, num_frames, 1, 1, 1)
# 5. 追加された時間 ID を取得
added_time_ids = self._get_add_time_ids(
fps,
motion_bucket_id,
noise_aug_strength,
image_embeddings.dtype,
batch_size,
num_videos_per_prompt,
do_classifier_free_guidance,
)
added_time_ids = added_time_ids
# 4. タイムステップを準備
self.scheduler.set_timesteps(num_inference_steps, device=device)
timesteps = self.scheduler.timesteps
# 5. 潜在変数を準備
num_channels_latents = 8
latents = self.prepare_latents(
batch_size * num_videos_per_prompt,
num_frames,
num_channels_latents,
height,
width,
image_embeddings.dtype,
device,
generator,
latents,
)
# 7. ガイダンススケールを準備
guidance_scale = torch.linspace(min_guidance_scale, max_guidance_scale, num_frames).unsqueeze(0)
guidance_scale = guidance_scale.to(device, latents.dtype)
guidance_scale = guidance_scale.repeat(batch_size * num_videos_per_prompt, 1)
guidance_scale = _append_dims(guidance_scale, latents.ndim)
# 8. ノイズ除去ループ
num_warmup_steps = len(timesteps) - num_inference_steps * self.scheduler.order
num_timesteps = len(timesteps)
with self.progress_bar(total=num_inference_steps) as progress_bar:
for i, t in enumerate(timesteps):
# 分類器のフリーガイダンスを行う場合は潜在変数を拡張
latent_model_input = torch.cat([latents] * 2) if do_classifier_free_guidance else latents
latent_model_input = self.scheduler.scale_model_input(latent_model_input, t)
# チャネル次元にわたって画像潜在変数を連結
latent_model_input = torch.cat([latent_model_input, image_latents], dim=2)
# ノイズ残留を予測
noise_pred = torch.from_numpy(
self.unet(
[
latent_model_input,
t,
image_embeddings,
added_time_ids,
]
)[0]
)
# ガイダンスを実行
if do_classifier_free_guidance:
noise_pred_uncond, noise_pred_cond = noise_pred.chunk(2)
noise_pred = noise_pred_uncond + guidance_scale * (noise_pred_cond - noise_pred_uncond)
# 前のノイズサンプルを計算 x_t -> x_t-1
latents = self.scheduler.step(noise_pred, t, latents).prev_sample
if callback_on_step_end is not None:
callback_kwargs = {}
for k in callback_on_step_end_tensor_inputs:
callback_kwargs[k] = locals()[k]
callback_outputs = callback_on_step_end(self, i, t, callback_kwargs)
latents = callback_outputs.pop("latents", latents)
if i == len(timesteps) - 1 or ((i + 1) > num_warmup_steps and (i + 1) % self.scheduler.order == 0):
progress_bar.update()
if not output_type == "latent":
frames = self.decode_latents(latents, num_frames, decode_chunk_size)
frames = tensor2vid(frames, self.image_processor, output_type=output_type)
else:
frames = latents
if not return_dict:
return frames
return StableVideoDiffusionPipelineOutput(frames=frames)
# サイズ変更ユーティリティー
def _resize_with_antialiasing(input, size, interpolation="bicubic", align_corners=True):
h, w = input.shape[-2:]
factors = (h / size[0], w / size[1])
# 最初に、シグマを決定します
# skimage から取得: https://github.com/scikit-image/scikit-image/blob/v0.19.2/skimage/transform/_warps.py#L171
sigmas = (
max((factors[0] - 1.0) / 2.0, 0.001),
max((factors[1] - 1.0) / 2.0, 0.001),
)
# 次はカーネルサイズ。3 シグマでは良い結果が得られますが少し低速です。Pillow は 1 シグマを使用します
# https://github.com/python-pillow/Pillow/blob/master/src/libImaging/Resample.c#L206
# しかし、2 回のパスで実行することで、より良い結果が得られます。とりあえず 2 シグマを試します
ks = int(max(2.0 * 2 * sigmas[0], 3)), int(max(2.0 * 2 * sigmas[1], 3))
# 奇数であることを確認
if (ks[0] % 2) == 0:
ks = ks[0] + 1, ks[1]
if (ks[1] % 2) == 0:
ks = ks[0], ks[1] + 1
input = _gaussian_blur2d(input, ks, sigmas)
output = torch.nn.functional.interpolate(input, size=size, mode=interpolation, align_corners=align_corners)
return output
def _compute_padding(kernel_size):
"""Compute padding tuple."""
# 4 または 6 ints: (padding_left, padding_right,padding_top,padding_bottom)
# https://pytorch.org/docs/stable/nn.html#torch.nn.functional.pad
if len(kernel_size) < 2:
raise AssertionError(kernel_size)
computed = [k - 1 for k in kernel_size]
# 偶数カーネルの場合は非対称パディングを行う必要があります :(
out_padding = 2 * len(kernel_size) * [0]
for i in range(len(kernel_size)):
computed_tmp = computed[-(i + 1)]
pad_front = computed_tmp // 2
pad_rear = computed_tmp - pad_front
out_padding[2 * i + 0] = pad_front
out_padding[2 * i + 1] = pad_rear
return out_padding
def _filter2d(input, kernel):
# カーネルを準備
b, c, h, w = input.shape
tmp_kernel = kernel[:, None, ...].to(device=input.device, dtype=input.dtype)
tmp_kernel = tmp_kernel.expand(-1, c, -1, -1)
height, width = tmp_kernel.shape[-2:]
padding_shape: list[int] = _compute_padding([height, width])
input = torch.nn.functional.pad(input, padding_shape, mode="reflect")
# カーネルと入力テンソルを要素ごとまたはバッチごとにパラメーターを揃えるために再形成
tmp_kernel = tmp_kernel.reshape(-1, 1, height, width)
input = input.view(-1, tmp_kernel.size(0), input.size(-2), input.size(-1))
# テンソルをカーネルと畳み込み
output = torch.nn.functional.conv2d(input, tmp_kernel, groups=tmp_kernel.size(0), padding=0, stride=1)
out = output.view(b, c, h, w)
return out
def _gaussian(window_size: int, sigma):
if isinstance(sigma, float):
sigma = torch.tensor([[sigma]])
batch_size = sigma.shape[0]
x = (torch.arange(window_size, device=sigma.device, dtype=sigma.dtype) - window_size // 2).expand(batch_size, -1)
if window_size % 2 == 0:
x = x + 0.5
gauss = torch.exp(-x.pow(2.0) / (2 * sigma.pow(2.0)))
return gauss / gauss.sum(-1, keepdim=True)
def _gaussian_blur2d(input, kernel_size, sigma):
if isinstance(sigma, tuple):
sigma = torch.tensor([sigma], dtype=input.dtype)
else:
sigma = sigma.to(dtype=input.dtype)
ky, kx = int(kernel_size[0]), int(kernel_size[1])
bs = sigma.shape[0]
kernel_x = _gaussian(kx, sigma[:, 1].view(bs, 1))
kernel_y = _gaussian(ky, sigma[:, 0].view(bs, 1))
out_x = _filter2d(input, kernel_x[..., None, :])
out = _filter2d(out_x, kernel_y[..., None])
return out
ビデオ生成を実行#
推論デバイスの選択#
import ipywidgets as widgets
core = ov.Core()
device = widgets.Dropdown(
options=core.available_devices + ["AUTO"],
value="AUTO",
description="Device:",
disabled=False,
)
device
Dropdown(description='Device:', index=4, options=('CPU', 'GPU.0', 'GPU.1', 'GPU.2', 'AUTO'), value='AUTO')
from transformers import CLIPImageProcessor
vae_encoder = core.compile_model(VAE_ENCODER_PATH, device.value)
image_encoder = core.compile_model(IMAGE_ENCODER_PATH, device.value)
unet = core.compile_model(UNET_PATH, device.value)
vae_decoder = core.compile_model(VAE_DECODER_PATH, device.value)
scheduler = AnimateLCMSVDStochasticIterativeScheduler.from_pretrained(MODEL_DIR / "scheduler")
feature_extractor = CLIPImageProcessor.from_pretrained(MODEL_DIR / "feature_extractor")
実際のモデルを見てみます。ビデオ生成はメモリーと時間のかかるプロセスであることに注意してください。メモリー消費量を削減するため、入力ビデオの解像度を 576x320 に減らし、生成されるビデオの品質に影響を与える可能性のある生成フレームの数を減らしました。パイプラインに height
、width
、num_frames
パラメーターを指定して、これらの設定を手動で変更できます。
ov_pipe = OVStableVideoDiffusionPipeline(vae_encoder, image_encoder, unet, vae_decoder, scheduler, feature_extractor)
frames = ov_pipe(
image,
num_inference_steps=4,
motion_bucket_id=60,
num_frames=8,
height=320,
width=512,
generator=torch.manual_seed(12342),
).frames[0]
0%| | 0/4 [00:00<?, ?it/s]
denoise currently
tensor(128.5637)
denoise currently
tensor(13.6784)
denoise currently
tensor(0.4969)
denoise currently
tensor(0.)
out_path = Path("generated.mp4")
export_to_video(frames, str(out_path), fps=7)
frames[0].save(
"generated.gif",
save_all=True,
append_images=frames[1:],
optimize=False,
duration=120,
loop=0,
)
from IPython.display import HTML
HTML('<img src="generated.gif">')

量子化#
NNCF は、量子化レイヤーをモデルグラフに追加し、トレーニング・データセットのサブセットを使用してこれらの追加の量子化レイヤーのパラメーターを初期化することで、トレーニング後の量子化を可能にします。量子化操作は FP32
/FP16
ではなく INT8
で実行されるため、モデル推論が高速化されます。
OVStableVideoDiffusionPipeline
構造によれば、拡散モデルはパイプライン全体の実行時間の大部分を占めます。ここでは、NNCFを使用して UNet 部分を最適化し、計算コストを削減してパイプラインを高速化する方法を説明します。パイプラインの残りのパーツを量子化しても、推論パフォーマンスは大幅に向上せず、精度が大幅に低下する可能性があります。メモリー・フットプリントを削減するため、vae encoder
と vae decoder
には重み圧縮のみを使用します。
Unet モデルでは、ハイブリッド・モードで量子化を適用します。つまり、次の量子化を行います: (1) MatMul レイヤーと埋め込みレイヤーの重みと (2) 他のレイヤーの活性化。手順は次のとおりです:
量子化用のキャリブレーション・データセットを作成します。
重み付きの操作を収集します。
nncf.compress_model()
を実行して、モデルの重みだけを圧縮します。ignored_scope
パラメーターを指定して重み付け操作を無視し、圧縮モデルでnncf.quantize()
を実行します。openvino.save_model()
関数を使用してINT8
モデルを保存します。
モデルの推論速度を向上させるため量子化を実行するかどうかを以下で選択してください。
注: 量子化は時間とメモリーを消費する操作です。以下の量子化コードの実行には時間がかかる場合があります。
to_quantize = widgets.Checkbox(
value=True,
description="Quantization",
disabled=False,
)
to_quantize
Checkbox(value=True, description='Quantization')
# `skip_kernel_extension` モジュールを取得
import requests
r = requests.get(
url="https://raw.githubusercontent.com/openvinotoolkit/openvino_notebooks/latest/utils/skip_kernel_extension.py",
)
open("skip_kernel_extension.py", "w").write(r.text)
ov_int8_pipeline = None
OV_INT8_UNET_PATH = MODEL_DIR / "unet_int8.xml"
OV_INT8_VAE_ENCODER_PATH = MODEL_DIR / "vae_encoder_int8.xml"
OV_INT8_VAE_DECODER_PATH = MODEL_DIR / "vae_decoder_int8.xml"
%load_ext skip_kernel_extension
キャリブレーション・データセットの準備#
Hugging Face の fusing/instructpix2pix-1000-samples データセットの一部をキャリブレーション・データとして使用します。UNet 最適化用の中間モデル入力を収集するには、CompiledModel
をカスタマイズする必要があります。
%%skip not $to_quantize.value
from typing import Any
import datasets
import numpy as np
from tqdm.notebook import tqdm
from IPython.utils import io
class CompiledModelDecorator(ov.CompiledModel):
def __init__(self, compiled_model: ov.CompiledModel, data_cache: List[Any] = None, keep_prob: float = 0.5):
super().__init__(compiled_model)
self.data_cache = data_cache if data_cache is not None else []
self.keep_prob = keep_prob
def __call__(self, *args, **kwargs):
if np.random.rand() <= self.keep_prob:
self.data_cache.append(*args)
return super().__call__(*args, **kwargs)
def collect_calibration_data(ov_pipe, calibration_dataset_size: int, num_inference_steps: int = 50) -> List[Dict]:
original_unet = ov_pipe.unet
calibration_data = []
ov_pipe.unet = CompiledModelDecorator(original_unet, calibration_data, keep_prob=1)
dataset = datasets.load_dataset("fusing/instructpix2pix-1000-samples", split="train", streaming=False).shuffle(seed=42)
# Run inference for data collection
pbar = tqdm(total=calibration_dataset_size)
for batch in dataset:
image = batch["input_image"]
with io.capture_output() as captured:
ov_pipe(
image,
num_inference_steps=4,
motion_bucket_id=60,
num_frames=8,
height=256,
width=256,
generator=torch.manual_seed(12342),
)
pbar.update(len(calibration_data) - pbar.n)
if len(calibration_data) >= calibration_dataset_size:
break
ov_pipe.unet = original_unet
return calibration_data[:calibration_dataset_size]
%%skip not $to_quantize.value
if not OV_INT8_UNET_PATH.exists():
subset_size = 200
calibration_data = collect_calibration_data(ov_pipe, calibration_dataset_size=subset_size)
ハイブリッド・モデル量子化の実行#
%%skip not $to_quantize.value
from collections import deque
def get_operation_const_op(operation, const_port_id: int):
node = operation.input_value(const_port_id).get_node()
queue = deque([node])
constant_node = None
allowed_propagation_types_list = ["Convert", "FakeQuantize", "Reshape"]
while len(queue) != 0:
curr_node = queue.popleft()
if curr_node.get_type_name() == "Constant":
constant_node = curr_node
break
if len(curr_node.inputs()) == 0:
break
if curr_node.get_type_name() in allowed_propagation_types_list:
queue.append(curr_node.input_value(0).get_node())
return constant_node
def is_embedding(node) -> bool:
allowed_types_list = ["f16", "f32", "f64"]
const_port_id = 0
input_tensor = node.input_value(const_port_id)
if input_tensor.get_element_type().get_type_name() in allowed_types_list:
const_node = get_operation_const_op(node, const_port_id)
if const_node is not None:
return True
return False
def collect_ops_with_weights(model):
ops_with_weights = []
for op in model.get_ops():
if op.get_type_name() == "MatMul":
constant_node_0 = get_operation_const_op(op, const_port_id=0)
constant_node_1 = get_operation_const_op(op, const_port_id=1)
if constant_node_0 or constant_node_1:
ops_with_weights.append(op.get_friendly_name())
if op.get_type_name() == "Gather" and is_embedding(op):
ops_with_weights.append(op.get_friendly_name())
return ops_with_weights
%%skip not $to_quantize.value
import nncf
import logging
from nncf.quantization.advanced_parameters import AdvancedSmoothQuantParameters
nncf.set_log_level(logging.ERROR)
if not OV_INT8_UNET_PATH.exists():
diffusion_model = core.read_model(UNET_PATH)
unet_ignored_scope = collect_ops_with_weights(diffusion_model)
compressed_diffusion_model = nncf.compress_weights(diffusion_model, ignored_scope=nncf.IgnoredScope(types=['Convolution']))
quantized_diffusion_model = nncf.quantize(
model=diffusion_model,
calibration_dataset=nncf.Dataset(calibration_data),
subset_size=subset_size,
model_type=nncf.ModelType.TRANSFORMER,
# さらに、世代の質を向上させるため最初の畳み込みを無視
ignored_scope=nncf.IgnoredScope(names=unet_ignored_scope + ["__module.conv_in/aten::_convolution/Convolution"]),
advanced_parameters=nncf.AdvancedQuantizationParameters(smooth_quant_alphas=AdvancedSmoothQuantParameters(matmul=-1))
)
ov.save_model(quantized_diffusion_model, OV_INT8_UNET_PATH)
重み圧縮の実行#
vae encoder
と VAE Decoder を量子化しても推論パフォーマンスは大幅に向上せず、精度が大幅に低下する可能性があります。重み圧縮テンソルごとに 1 つのスケールのみが許されます。フットプリントの削減に適用されます。
%%skip not $to_quantize.value
nncf.set_log_level(logging.INFO)
if not OV_INT8_VAE_ENCODER_PATH.exists():
text_encoder_model = core.read_model(VAE_ENCODER_PATH)
compressed_text_encoder_model = nncf.compress_weights(text_encoder_model, mode=nncf.CompressWeightsMode.INT4_SYM, group_size=64)
ov.save_model(compressed_text_encoder_model, OV_INT8_VAE_ENCODER_PATH)
if not OV_INT8_VAE_DECODER_PATH.exists():
decoder_model = core.read_model(VAE_DECODER_PATH)
compressed_decoder_model = nncf.compress_weights(decoder_model, mode=nncf.CompressWeightsMode.INT4_SYM, group_size=64)
ov.save_model(compressed_decoder_model, OV_INT8_VAE_DECODER_PATH)
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 98% (29 / 32) │ 0% (0 / 3) │
├────────────────┼─────────────────────────────┼────────────────────────────────────────┤
│ 4 │ 2% (3 / 32) │ 100% (3 / 3) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 99% (65 / 68) │ 0% (0 / 3) │
├────────────────┼─────────────────────────────┼────────────────────────────────────────┤
│ 4 │ 1% (3 / 68) │ 100% (3 / 3) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
元のパイプラインと最適化されたパイプラインによって生成されたビデオを比較します。
%%skip not $to_quantize.value
ov_int8_vae_encoder = core.compile_model(OV_INT8_VAE_ENCODER_PATH, device.value)
ov_int8_unet = core.compile_model(OV_INT8_UNET_PATH, device.value)
ov_int8_decoder = core.compile_model(OV_INT8_VAE_DECODER_PATH, device.value)
ov_int8_pipeline = OVStableVideoDiffusionPipeline(
ov_int8_vae_encoder, image_encoder, ov_int8_unet, ov_int8_decoder, scheduler, feature_extractor
)
int8_frames = ov_int8_pipeline(
image,
num_inference_steps=4,
motion_bucket_id=60,
num_frames=8,
height=320,
width=512,
generator=torch.manual_seed(12342),
).frames[0]
0%| | 0/4 [00:00<?, ?it/s]
/home/ltalamanova/env_ci/lib/python3.8/site-packages/diffusers/configuration_utils.py:139: FutureWarning: Accessing config attribute unet directly via 'OVStableVideoDiffusionPipeline' object attribute is deprecated. Please access 'unet' over 'OVStableVideoDiffusionPipeline's config object instead, e.g. 'scheduler.config.unet'. deprecate("direct config name access", "1.0.0", deprecation_message, standard_warn=False)
denoise currently
tensor(128.5637)
denoise currently
tensor(13.6784)
denoise currently
tensor(0.4969)
denoise currently
tensor(0.)
int8_out_path = Path("generated_int8.mp4")
export_to_video(frames, str(out_path), fps=7)
int8_frames[0].save(
"generated_int8.gif",
save_all=True,
append_images=int8_frames[1:],
optimize=False,
duration=120,
loop=0,
)
HTML('<img src="generated_int8.gif">')

モデルのファイルサイズを比較#
%%skip not $to_quantize.value
fp16_model_paths = [VAE_ENCODER_PATH, UNET_PATH, VAE_DECODER_PATH]
int8_model_paths = [OV_INT8_VAE_ENCODER_PATH, OV_INT8_UNET_PATH, OV_INT8_VAE_DECODER_PATH]
for fp16_path, int8_path in zip(fp16_model_paths, int8_model_paths):
fp16_ir_model_size = fp16_path.with_suffix(".bin").stat().st_size
int8_model_size = int8_path.with_suffix(".bin").stat().st_size
print(f"{fp16_path.stem} compression rate: {fp16_ir_model_size / int8_model_size:.3f}")
vae_encoder compression rate: 2.018
unet compression rate: 1.996
vae_decoder compression rate: 2.007
FP16 モデルと INT8 パイプラインの推論時間を比較#
FP16
および INT8
パイプラインの推論パフォーマンスを測定するには、キャリブレーション・サブセットの推論時間の中央値を使用します。
注: 最も正確なパフォーマンス推定を行うには、他のアプリケーションを閉じた後、ターミナル/コマンドプロンプトで
benchmark_app
を実行することを推奨します。
%%skip not $to_quantize.value
import time
def calculate_inference_time(pipeline, validation_data):
inference_time = []
for prompt in validation_data:
start = time.perf_counter()
with io.capture_output() as captured:
_ = pipeline(
image,
num_inference_steps=4,
motion_bucket_id=60,
num_frames=8,
height=320,
width=512,
generator=torch.manual_seed(12342),
)
end = time.perf_counter()
delta = end - start
inference_time.append(delta)
return np.median(inference_time)
%%skip not $to_quantize.value
validation_size = 3
validation_dataset = datasets.load_dataset("fusing/instructpix2pix-1000-samples", split="train", streaming=True).shuffle(seed=42).take(validation_size)
validation_data = [data["input_image"] for data in validation_dataset]
fp_latency = calculate_inference_time(ov_pipe, validation_data)
int8_latency = calculate_inference_time(ov_int8_pipeline, validation_data)
print(f"Performance speed-up: {fp_latency / int8_latency:.3f}")
Performance speed-up: 1.243
インタラクティブなデモ#
インタラクティブなデモを起動するため量子化モデルを使用するかどうか以下で選択してください。
quantized_model_present = ov_int8_pipeline is not None
use_quantized_model = widgets.Checkbox(
value=quantized_model_present,
description="Use quantized model",
disabled=not quantized_model_present,
)
use_quantized_model
Checkbox(value=True, description='Use quantized model')
import gradio as gr
import random
max_64_bit_int = 2**63 - 1
pipeline = ov_int8_pipeline if use_quantized_model.value else ov_pipe
example_images_urls = [
"https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/ship-7833921_1280.jpg?download=true",
"https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/ai-generated-8476858_1280.png?download=true",
"https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/ai-generated-8481641_1280.jpg?download=true",
"https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/dog-7396912_1280.jpg?download=true",
"https://huggingface.co/spaces/wangfuyun/AnimateLCM-SVD/resolve/main/test_imgs/cupcakes-380178_1280.jpg?download=true",
]
example_images_dir = Path("example_images")
example_images_dir.mkdir(exist_ok=True)
example_imgs = []
for image_id, url in enumerate(example_images_urls):
img = load_image(url)
image_path = example_images_dir / f"{image_id}.png"
img.save(image_path)
example_imgs.append([image_path])
def sample(
image: PIL.Image,
seed: Optional[int] = 42,
randomize_seed: bool = True,
motion_bucket_id: int = 127,
fps_id: int = 6,
num_inference_steps: int = 15,
num_frames: int = 4,
max_guidance_scale=1.0,
min_guidance_scale=1.0,
decoding_t: int = 8, # 一度にデコードされるフレームの数。これはほとんどの VRAM を消費します。必要に応じて減らします
output_folder: str = "outputs",
progress=gr.Progress(track_tqdm=True),
):
if image.mode == "RGBA":
image = image.convert("RGB")
if randomize_seed:
seed = random.randint(0, max_64_bit_int)
generator = torch.manual_seed(seed)
output_folder = Path(output_folder)
output_folder.mkdir(exist_ok=True)
base_count = len(list(output_folder.glob("*.mp4")))
video_path = output_folder / f"{base_count:06d}.mp4"
frames = pipeline(
image,
decode_chunk_size=decoding_t,
generator=generator,
motion_bucket_id=motion_bucket_id,
noise_aug_strength=0.1,
num_frames=num_frames,
num_inference_steps=num_inference_steps,
max_guidance_scale=max_guidance_scale,
min_guidance_scale=min_guidance_scale,
).frames[0]
export_to_video(frames, str(video_path), fps=fps_id)
return video_path, seed
def resize_image(image, output_size=(512, 320)):
# アスペクト比を計算
target_aspect = output_size[0] / output_size[1] # 希望するサイズのアスペクト比
image_aspect = image.width / image.height # 元の画像のアスペクト比
# 元の画像が大きい場合はサイズを変更して切り取り
if image_aspect > target_aspect:
# アスペクト比を維持しながら、ターゲットの高さに合わせて画像のサイズを変更
new_height = output_size[1]
new_width = int(new_height * image_aspect)
resized_image = image.resize((new_width, new_height), PIL.Image.LANCZOS)
# 切り抜きの座標を計算
left = (new_width - output_size[0]) / 2
top = 0
right = (new_width + output_size[0]) / 2
bottom = output_size[1]
else:
# 縦横比を維持しながら、ターゲットの幅に合わせて画像のサイズを変更
new_width = output_size[0]
new_height = int(new_width / image_aspect)
resized_image = image.resize((new_width, new_height), PIL.Image.LANCZOS)
# 切り抜きの座標を計算
left = 0
top = (new_height - output_size[1]) / 2
right = output_size[0]
bottom = (new_height + output_size[1]) / 2
# 画像を切り取り
cropped_image = resized_image.crop((left, top, right, bottom))
return cropped_image
with gr.Blocks() as demo:
gr.Markdown(
"""# Stable Video Diffusion: Image to Video Generation with OpenVINO.
"""
)
with gr.Row():
with gr.Column():
image_in = gr.Image(label="Upload your image", type="pil")
generate_btn = gr.Button("Generate")
video = gr.Video()
with gr.Accordion("Advanced options", open=False):
seed = gr.Slider(
label="Seed",
value=42,
randomize=True,
minimum=0,
maximum=max_64_bit_int,
step=1,
)
randomize_seed = gr.Checkbox(label="Randomize seed", value=True)
motion_bucket_id = gr.Slider(
label="Motion bucket id",
info="Controls how much motion to add/remove from the image",
value=127,
minimum=1,
maximum=255,
)
fps_id = gr.Slider(
label="Frames per second",
info="The length of your video in seconds will be num_frames / fps",
value=6,
minimum=5,
maximum=30,
step=1,
)
num_frames = gr.Slider(label="Number of Frames", value=8, minimum=2, maximum=25, step=1)
num_steps = gr.Slider(label="Number of generation steps", value=4, minimum=1, maximum=8, step=1)
max_guidance_scale = gr.Slider(
label="Max guidance scale",
info="classifier-free guidance strength",
value=1.2,
minimum=1,
maximum=2,
)
min_guidance_scale = gr.Slider(
label="Min guidance scale",
info="classifier-free guidance strength",
value=1,
minimum=1,
maximum=1.5,
)
examples = gr.Examples(
examples=example_imgs,
inputs=[image_in],
outputs=[video, seed],
)
image_in.upload(fn=resize_image, inputs=image_in, outputs=image_in)
generate_btn.click(
fn=sample,
inputs=[
image_in,
seed,
randomize_seed,
motion_bucket_id,
fps_id,
num_steps,
num_frames,
max_guidance_scale,
min_guidance_scale,
],
outputs=[video, seed],
api_name="video",
)
try:
demo.queue().launch(debug=False)
except Exception:
demo.queue().launch(debug=False, share=True)
# リモートで起動する場合は、server_name と server_port を指定
# demo.launch(server_name='your server name', server_port='server port in int')
# 詳細はドキュメントをご覧ください: https://gradio.app/docs/