PhotoMaker と OpenVINO を使用したテキストからの画像生成#
この Jupyter ノートブックは、ローカルへのインストール後にのみ起動できます。
PhotoMaker は効率的なパーソナライズされたテキストから画像への生成方法であり、主に ID 情報を保存するため任意の数の入力 ID 画像をスタック ID 埋め込みにエンコードします。このような埋め込みは、統一 ID 表現として機能し、同じ入力 ID の特性を包括的にカプセル化するだけでなく、その後統合するため異なる ID の特性にも対応できます。これにより、より興味深い、実用的なアプリケーションへの道が開かれます。ユーザーは、テキストプロンプトとともに 1 つまたは数枚の顔写真を入力して、カスタマイズされた写真または絵画を受け取ることができます (トレーニングは必要ありません)。さらに、このモデルは、SDXL
に基づく任意の基本モデルに適合させたり、他の LoRA
モジュールと組み合わせて使用することができます。PhotoMaker の詳細については、技術レポートを参照してください。
このノートブックでは、OpenVINO を使用して PhotoMaker パイプラインを高速化する方法を検討します。
目次:
PhotoMaker パイプラインの紹介#
提案されている PhotoMaker では、最初にテキスト・エンコーダーと画像 (ID) エンコーダーからそれぞれテキスト埋め込みと画像埋め込みを取得します。次に、対応するクラスの埋め込み (男性と女性など) と各画像の埋め込みを結合することにより、融合された埋め込みを抽出します。次に、すべての融合された埋め込みを長さの次元に沿って連結して、スタックされた ID 埋め込みを形成します。最後に、diffusion model
内の ID コンテンツを適応的にマージするため、スタックされた ID 埋め込みをすべてのクロスアテンション・レイヤーにフィードします。トレーニング中に背景がマスクされた同じ ID の画像を使用しますが、推論中に背景の歪みなしで異なる ID の画像を直接入力し、新しい ID を作成できることに注意してください。
必要条件#
PhotoMaker リポジトリーのクローンを作成
from pathlib import Path
if not Path("PhotoMaker").exists():
!git clone https://github.com/TencentARC/PhotoMaker.git
Cloning into 'PhotoMaker'...
remote: Enumerating objects: 236, done.[K
remote: Counting objects: 100% (145/145), done.[K
remote: Compressing objects: 100% (96/96), done.[K
remote: Total 236 (delta 114), reused 68 (delta 49), pack-reused 91[K
Receiving objects: 100% (236/236), 9.31 MiB | 28.72 MiB/s, done.Resolving deltas: 100% (120/120), done.
必要なパッケージをインストール
%pip install -q --extra-index-url https://download.pytorch.org/whl/cpu\ transformers "torch>=2.1" "diffusers>=0.26" "gradio>=4.19" "openvino>=2024.0.0" torchvision "peft==0.6.2" "nncf>=2.9.0" "protobuf==3.20.3"
ERROR: pip's dependency resolver does not currently take into account all the packages that are installed.This behaviour is the source of the following dependency conflicts. descript-audiotools 0.7.2 requires protobuf<3.20,>=3.9.2, but you have protobuf 3.20.3 which is incompatible. Note: you may need to restart the kernel to use updated packages.
PyTorch モデルの準備
adapter_id = "TencentARC/PhotoMaker"
base_model_id = "SG161222/RealVisXL_V3.0"
TEXT_ENCODER_OV_PATH = Path("model/text_encoder.xml")
TEXT_ENCODER_2_OV_PATH = Path("model/text_encoder_2.xml")
UNET_OV_PATH = Path("model/unet.xml")
ID_ENCODER_OV_PATH = Path("model/id_encoder.xml")
VAE_DECODER_OV_PATH = Path("model/vae_decoder.xml")
元のパイプラインをロードして変換用のモデルを準備#
各 PyTorch モデルをエクスポートするには、HuggingFace ハブから ID エンコーダーの重み、LoRa
の重みをダウンロードし、PhotoMaker のリポジトリーから PhotoMakerStableDiffusionXLPipeline
オブジェクトを使用して、元の PhotoMaker パイプラインを生成します。
import torch
import numpy as np
import os
from PIL import Image
from pathlib import Path
from PhotoMaker.photomaker.model import PhotoMakerIDEncoder
from PhotoMaker.photomaker.pipeline import PhotoMakerStableDiffusionXLPipeline
from diffusers import EulerDiscreteScheduler
import gc
trigger_word = "img"
def load_original_pytorch_pipeline_components(photomaker_path: str, base_model_id: str):
# ベースモデルをロード
pipe = PhotoMakerStableDiffusionXLPipeline.from_pretrained(base_model_id, use_safetensors=True).to("cpu")
# PhotoMaker チェックポイントをロード
pipe.load_photomaker_adapter(
os.path.dirname(photomaker_path),
subfolder="",
weight_name=os.path.basename(photomaker_path),
trigger_word=trigger_word,
)
pipe.scheduler = EulerDiscreteScheduler.from_config(pipe.scheduler.config)
pipe.fuse_lora()
gc.collect()
return pipe
2024-07-13 01:28:21.659532: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on.You may see slightly different numerical results due to floating-point round-off errors from different computation orders.To turn them off, set the environment variable TF_ENABLE_ONEDNN_OPTS=0. 2024-07-13 01:28:21.694860: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags. 2024-07-13 01:28:22.366293: W tensorflow/compiler/tf2tensorrt/utils/py_utils.cc:38] TF-TRT Warning: Could not find TensorRT
from huggingface_hub import hf_hub_download
photomaker_path = hf_hub_download(repo_id=adapter_id, filename="photomaker-v1.bin", repo_type="model")
pipe = load_original_pytorch_pipeline_components(photomaker_path, base_model_id)
Loading pipeline components...: 0%| | 0/7 [00:00<?, ?it/s]
The installed version of bitsandbytes was compiled without GPU support.8-bit optimizers, 8-bit multiplication, and GPU quantization are unavailable.
Loading PhotoMaker components [1] id_encoder from [/opt/home/k8sworker/.cache/huggingface/hub/models--TencentARC--PhotoMaker/snapshots/d7ec3fc17290263135825194aeb3bc456da67cc5]... Loading PhotoMaker components [2] lora_weights from [/opt/home/k8sworker/.cache/huggingface/hub/models--TencentARC--PhotoMaker/snapshots/d7ec3fc17290263135825194aeb3bc456da67cc5]
モデルを OpenVINO 中間表現 (IR) 形式に変換#
2023.0 リリース以降、OpenVINO は PyTorch モデルの変換を直接サポートするようになりました。OpenVINO ov.Model
オブジェクト・インスタンスを取得するには、モデル・オブジェクト、モデルトレース用の入力データを ov.convert_model
関数に提供する必要があります。ov.save_model
関数を使用して、次回のデプロイのためにモデルをディスクに保存できます。
パイプラインは 5 つの重要なパーツで構成されます:
画像のアノテーションによって条件付けする画像埋め込みを生成する ID エンコーダー。
テキスト・エンコーダーは、テキストプロンプトから画像を生成するテキスト埋め込みを作成します。
段階的にノイズを除去する潜像表現のための Unet。
潜在空間を画像にデコードするオート・エンコーダー (VAE)。
メモリー消費を削減するため、NNCF を使用して重み圧縮を最適化できます。重み圧縮は、推論中に重みを保存する大量のメモリーを必要とするモデルのメモリー・フットプリントを削減することを目的としており、次の方法で重み圧縮の利点を得られます:
デバイスのメモリーに格納できない大規模なモデルの推論を可能にします。
線形レイヤーなどの重みを使用した演算を行う際のメモリーアクセス・レイテンシーを短縮することで、モデルの推論パフォーマンスを向上させます。
ニューラル・ネットワーク圧縮フレームワーク (NNCF) は、圧縮方法として、4 ビット / 8 ビット混合重み量子化を提供します。重み圧縮とフルモデル量子化 (トレーニング後の量子化) 違いは、重み圧縮のでは、活性化が浮動小数点のままであるため、精度が向上することです。
nncf.compress_weights
関数は重み圧縮の実行に使用できます。この関数は、OpenVINO モデルとその他の圧縮パラメーターを受け入れます。
重み圧縮の詳細については、OpenVINO のドキュメントを参照してください。
import openvino as ov
import nncf
def flattenize_inputs(inputs):
"""
Helper function for resolve nested input structure (e.g. lists or tuples of tensors)
"""
flatten_inputs = []
for input_data in inputs:
if input_data is None:
continue
if isinstance(input_data, (list, tuple)):
flatten_inputs.extend(flattenize_inputs(input_data))
else:
flatten_inputs.append(input_data)
return flatten_inputs
dtype_mapping = {
torch.float32: ov.Type.f32,
torch.float64: ov.Type.f64,
torch.int32: ov.Type.i32,
torch.int64: ov.Type.i64,
torch.bool: ov.Type.boolean,
}
def prepare_input_info(input_dict):
"""
Helper function for preparing input info (shapes and data types) for conversion based on example inputs
"""
flatten_inputs = flattenize_inputs(input_dict.values())
input_info = []
for input_data in flatten_inputs:
updated_shape = list(input_data.shape)
if input_data.ndim == 5:
updated_shape[1] = -1
input_info.append((dtype_mapping[input_data.dtype], updated_shape))
return input_info
def convert(model: torch.nn.Module, xml_path: str, example_input, input_info):
"""
Helper function for converting PyTorch model to OpenVINO IR
"""
xml_path = Path(xml_path)
if not xml_path.exists():
xml_path.parent.mkdir(parents=True, exist_ok=True)
with torch.no_grad():
ov_model = ov.convert_model(model, example_input=example_input, input=input_info)
ov_model = nncf.compress_weights(ov_model)
ov.save_model(ov_model, xml_path)
del ov_model
torch._C._jit_clear_class_registry()
torch.jit._recursive.concrete_type_store = torch.jit._recursive.ConcreteTypeStore()
torch.jit._state._clear_class_state()
INFO:nncf:NNCF initialized successfully.Supported frameworks detected: torch, tensorflow, onnx, openvino
ID エンコーダー#
PhotoMaker は、画像エンコーダーとフューズモジュールを統合して ID エンコーダーを作成しました。これは、U-Net モデルの入力となるテキスト・エンコーダーの出力 (テキスト埋め込み) を更新する画像埋め込みを生成するのに使用されます。
id_encoder = pipe.id_encoder
id_encoder.eval()
def create_bool_tensor(*size):
new_tensor = torch.zeros((size), dtype=torch.bool)
return new_tensor
inputs = {
"id_pixel_values": torch.randn((1, 1, 3, 224, 224)),
"prompt_embeds": torch.randn((1, 77, 2048)),
"class_tokens_mask": create_bool_tensor(1, 77),
}
input_info = prepare_input_info(inputs)
convert(id_encoder, ID_ENCODER_OV_PATH, inputs, input_info)
del id_encoder
gc.collect()
WARNING:tensorflow:Please fix your imports.Module tensorflow.python.training.tracking.base has been moved to tensorflow.python.trackable.base.The old module will be deleted in version 2.11.
[ WARNING ] Please fix your imports.Module %s has been moved to %s.The old module will be deleted in version %s./opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/modeling_utils.py:4371: FutureWarning: _is_quantized_training_enabled is going to be deprecated in transformers 4.39.0.Please use model.hf_quantizer.is_trainable instead warnings.warn( /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/models/clip/modeling_clip.py:279: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! if attn_weights.size() != (bsz * self.num_heads, tgt_len, src_len): /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/models/clip/modeling_clip.py:319: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! if attn_output.size() != (bsz * self.num_heads, tgt_len, self.head_dim): /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/notebooks/photo-maker/PhotoMaker/photomaker/model.py:84: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs! assert class_tokens_mask.sum() == stacked_id_embeds.shape[0], f"{class_tokens_mask.sum()} != {stacked_id_embeds.shape[0]}"
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 100% (151 / 151) │ 100% (151 / 151) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
15594
テキスト・エンコーダー#
テキスト・エンコーダーは、入力プロンプト (例えば、“馬に乗った宇宙飛行士の写真”) を、U-Net が理解できる埋め込みスペースに変換する役割を果たします。これは通常、入力トークンのシーケンスを潜在テキスト埋め込みのシーケンスにマッピングする単純なトランスフォーマー・ベースのエンコーダーです。
text_encoder = pipe.text_encoder
text_encoder.eval()
text_encoder_2 = pipe.text_encoder_2
text_encoder_2.eval()
text_encoder.config.output_hidden_states = True
text_encoder.config.return_dict = False
text_encoder_2.config.output_hidden_states = True
text_encoder_2.config.return_dict = False
inputs = {"input_ids": torch.ones((1, 77), dtype=torch.long)}
input_info = prepare_input_info(inputs)
convert(text_encoder, TEXT_ENCODER_OV_PATH, inputs, input_info)
convert(text_encoder_2, TEXT_ENCODER_2_OV_PATH, inputs, input_info)
del text_encoder
del text_encoder_2
gc.collect()
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/modeling_attn_mask_utils.py:86: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
if input_shape[-1] > 1 or self.sliding_window is not None: /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/modeling_attn_mask_utils.py:162: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
if past_key_values_length > 0: /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/transformers/models/clip/modeling_clip.py:287: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
if causal_attention_mask.size() != (bsz, 1, tgt_len, src_len):
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 100% (73 / 73) │ 100% (73 / 73) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 100% (194 / 194) │ 100% (194 / 194) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
32811
U-Net#
U-Net モデル変換のプロセスは、元の Stable Diffusion XL モデルと同様です。
unet = pipe.unet
unet.eval()
class UnetWrapper(torch.nn.Module):
def __init__(self, unet):
super().__init__()
self.unet = unet
def forward(
self,
sample=None,
timestep=None,
encoder_hidden_states=None,
text_embeds=None,
time_ids=None,
):
return self.unet.forward(
sample,
timestep,
encoder_hidden_states,
added_cond_kwargs={"text_embeds": text_embeds, "time_ids": time_ids},
)
inputs = {
"sample": torch.rand([2, 4, 128, 128], dtype=torch.float32),
"timestep": torch.from_numpy(np.array(1, dtype=float)),
"encoder_hidden_states": torch.rand([2, 77, 2048], dtype=torch.float32),
"text_embeds": torch.rand([2, 1280], dtype=torch.float32),
"time_ids": torch.rand([2, 6], dtype=torch.float32),
}
input_info = prepare_input_info(inputs)
w_unet = UnetWrapper(unet)
convert(w_unet, UNET_OV_PATH, inputs, input_info)
del w_unet, unet
gc.collect()
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/diffusers/models/unets/unet_2d_condition.py:1103: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
if dim % default_overall_up_factor != 0: /opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/diffusers/models/downsampling.py:136: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
assert hidden_states.shape[1] == self.channels
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/diffusers/models/downsampling.py:145: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
assert hidden_states.shape[1] == self.channels
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/diffusers/models/upsampling.py:146: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
assert hidden_states.shape[1] == self.channels
/opt/home/k8sworker/ci-ai/cibuilds/ov-notebook/OVNotebookOps-727/.workspace/scm/ov-notebook/.venv/lib/python3.8/site-packages/diffusers/models/upsampling.py:162: TracerWarning: Converting a tensor to a Python boolean might cause the trace to be incorrect.We can't record the data flow of Python values, so this value will be treated as a constant in the future.This means that the trace might not generalize to other inputs!
if hidden_states.shape[0] >= 64:
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 100% (794 / 794) │ 100% (794 / 794) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
101843
VAE デコーダー#
VAE モデルには、エンコーダーとデコーダーの 2 つのパーツがあります。エンコーダーは、画像を低次元の潜在表現に変換するのに使用され、これが U-Net モデルの入力となります。逆に、デコーダーは潜在表現を変換して画像に戻します。
Text-to-Image パイプラインを実行すると、VAE デコーダーのみが必要であることがわかります。
vae_decoder = pipe.vae
vae_decoder.eval()
class VAEDecoderWrapper(torch.nn.Module):
def __init__(self, vae_decoder):
super().__init__()
self.vae = vae_decoder
def forward(self, latents):
return self.vae.decode(latents)
w_vae_decoder = VAEDecoderWrapper(vae_decoder)
inputs = torch.zeros((1, 4, 128, 128))
convert(w_vae_decoder, VAE_DECODER_OV_PATH, inputs, input_info=[1, 4, 128, 128])
del w_vae_decoder, vae_decoder
gc.collect()
INFO:nncf:Statistics of the bitwidth distribution:
┍━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┯━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┑
│ Num bits (N) │ % all parameters (layers) │ % ratio-defining parameters (layers) │
┝━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┿━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┥
│ 8 │ 100% (40 / 40) │ 100% (40 / 40) │
┕━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┷━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━┙
Output()
5992
推論パイプラインの準備#
この例では、PhotoMakerStableDiffusionXLPipeline
パイプラインを再利用して OpenVINO で画像を生成するため、このパイプライン内の各モデルのオブジェクトを新しい OpenVINO モデル・オブジェクトに置き換える必要があります。
Stable Diffusion パイプライン用の推論デバイスの選択#
import ipywidgets as widgets
core = ov.Core()
device = widgets.Dropdown(
options=core.available_devices + ["AUTO"],
value="CPU",
description="Device:",
disabled=False,
)
device
Dropdown(description='Device:', options=('CPU', 'AUTO'), value='CPU')
モデルをコンパイルして推論用のラッパーを作成#
元の PhotoMaker ワークフローにアクセスするには、OpenVINO コンパイル済みモデルごとに新しいラッパーを作成する必要があります。元のパイプラインと一致させるには、OpenVINO モデルラッパーの属性の一部を元のモデル・オブジェクトから再利用し、推論出力を numpy から torch.tensor
に変換する必要があります。
compiled_id_encoder = core.compile_model(ID_ENCODER_OV_PATH, device.value)
compiled_unet = core.compile_model(UNET_OV_PATH, device.value)
compiled_text_encoder = core.compile_model(TEXT_ENCODER_OV_PATH, device.value)
compiled_text_encoder_2 = core.compile_model(TEXT_ENCODER_2_OV_PATH, device.value)
compiled_vae_decoder = core.compile_model(VAE_DECODER_OV_PATH, device.value)
from collections import namedtuple
class OVIDEncoderWrapper(PhotoMakerIDEncoder):
dtype = torch.float32 # 元のワークフローでアクセス
def __init__(self, id_encoder, orig_id_encoder):
super().__init__()
self.id_encoder = id_encoder
self.modules = orig_id_encoder.modules # 元のワークフローでアクセス
self.config = orig_id_encoder.config # 元のワークフローでアクセス
def __call__(
self,
*args,
):
id_pixel_values, prompt_embeds, class_tokens_mask = args
inputs = {
"id_pixel_values": id_pixel_values,
"prompt_embeds": prompt_embeds,
"class_tokens_mask": class_tokens_mask,
}
output = self.id_encoder(inputs)[0]
return torch.from_numpy(output)
class OVTextEncoderWrapper:
dtype = torch.float32 # 元のワークフローでアクセス
def __init__(self, text_encoder, orig_text_encoder):
self.text_encoder = text_encoder
self.modules = orig_text_encoder.modules # 元のワークフローでアクセス
self.config = orig_text_encoder.config # 元のワークフローでアクセス
def __call__(self, input_ids, **kwargs):
inputs = {"input_ids": input_ids}
output = self.text_encoder(inputs)
hidden_states = []
hidden_states_len = len(output)
for i in range(1, hidden_states_len):
hidden_states.append(torch.from_numpy(output[i]))
BaseModelOutputWithPooling = namedtuple("BaseModelOutputWithPooling", "last_hidden_state hidden_states")
output = BaseModelOutputWithPooling(torch.from_numpy(output[0]), hidden_states)
return output
class OVUnetWrapper:
def __init__(self, unet, unet_orig):
self.unet = unet
self.config = unet_orig.config # 元のワークフローでアクセス
self.add_embedding = unet_orig.add_embedding # 元のワークフローでアクセス
def __call__(self, *args, **kwargs):
latent_model_input, t = args
inputs = {
"sample": latent_model_input,
"timestep": t,
"encoder_hidden_states": kwargs["encoder_hidden_states"],
"text_embeds": kwargs["added_cond_kwargs"]["text_embeds"],
"time_ids": kwargs["added_cond_kwargs"]["time_ids"],
}
output = self.unet(inputs)
return [torch.from_numpy(output[0])]
class OVVAEDecoderWrapper:
dtype = torch.float32 # 元のワークフローでアクセス
def __init__(self, vae, vae_orig):
self.vae = vae
self.config = vae_orig.config # 元のワークフローでアクセス
def decode(self, latents, return_dict=False):
output = self.vae(latents)[0]
output = torch.from_numpy(output)
return [output]
元のパイプラインの PyTorch モデル・オブジェクトを OpenVINO モデルに置き換えます
pipe.id_encoder = OVIDEncoderWrapper(compiled_id_encoder, pipe.id_encoder)
pipe.unet = OVUnetWrapper(compiled_unet, pipe.unet)
pipe.text_encoder = OVTextEncoderWrapper(compiled_text_encoder, pipe.text_encoder)
pipe.text_encoder_2 = OVTextEncoderWrapper(compiled_text_encoder_2, pipe.text_encoder_2)
pipe.vae = OVVAEDecoderWrapper(compiled_vae_decoder, pipe.vae)
OpenVINO を使用してテキストから画像への生成を実行#
from diffusers.utils import load_image
prompt = "sci-fi, closeup portrait photo of a man img in Iron man suit, face"
negative_prompt = "(asymmetry, worst quality, low quality, illustration, 3d, 2d, painting, cartoons, sketch), open mouth"
generator = torch.Generator("cpu").manual_seed(42)
input_id_images = []
original_image = load_image("./PhotoMaker/examples/newton_man/newton_0.jpg")
input_id_images.append(original_image)
## パラメーター設定
num_steps = 20
style_strength_ratio = 20
start_merge_step = int(float(style_strength_ratio) / 100 * num_steps)
if start_merge_step > 30:
start_merge_step = 30
images = pipe(
prompt=prompt,
input_id_images=input_id_images,
negative_prompt=negative_prompt,
num_images_per_prompt=1,
num_inference_steps=num_steps,
start_merge_step=start_merge_step,
generator=generator,
).images
0%| | 0/20 [00:00<?, ?it/s]
import matplotlib.pyplot as plt
def visualize_results(orig_img: Image.Image, output_img: Image.Image):
"""
Helper function for pose estimationresults visualization
Parameters:
orig_img (Image.Image): original image
output_img (Image.Image): processed image with PhotoMaker
Returns:
fig (matplotlib.pyplot.Figure): matplotlib generated figure
"""
orig_img = orig_img.resize(output_img.size)
orig_title = "Original image"
output_title = "Output image"
im_w, im_h = orig_img.size
is_horizontal = im_h < im_w
fig, axs = plt.subplots(
2 if is_horizontal else 1,
1 if is_horizontal else 2,
sharex="all",
sharey="all",
)
fig.suptitle(f"Prompt: '{prompt}'", fontweight="bold")
fig.patch.set_facecolor("white")
list_axes = list(axs.flat)
for a in list_axes:
a.set_xticklabels([])
a.set_yticklabels([])
a.get_xaxis().set_visible(False)
a.get_yaxis().set_visible(False)
a.grid(False)
list_axes[0].imshow(np.array(orig_img))
list_axes[1].imshow(np.array(output_img))
list_axes[0].set_title(orig_title, fontsize=15)
list_axes[1].set_title(output_title, fontsize=15)
fig.subplots_adjust(wspace=0.01 if is_horizontal else 0.00, hspace=0.01 if is_horizontal else 0.1)
fig.tight_layout()
return fig
fig = visualize_results(original_image, images[0])

インタラクティブなデモ#
import gradio as gr
def generate_from_text(text_promt, input_image, neg_prompt, seed, num_steps, style_strength_ratio):
"""
Helper function for generating result image from prompt text
Parameters:
text_promt (String): positive prompt
input_image (Image.Image): original image
neg_prompt (String): negative prompt
seed (Int): seed for random generator state initialization
num_steps (Int): number of sampling steps
style_strength_ratio (Int): the percentage of step when merging the ID embedding to text embedding
Returns:
result (Image.Image): generation result
"""
start_merge_step = int(float(style_strength_ratio) / 100 * num_steps)
if start_merge_step > 30:
start_merge_step = 30
result = pipe(
text_promt,
input_id_images=input_image,
negative_prompt=neg_prompt,
num_inference_steps=num_steps,
num_images_per_prompt=1,
start_merge_step=start_merge_step,
generator=torch.Generator().manual_seed(seed),
height=1024,
width=1024,
).images[0]
return result
with gr.Blocks() as demo:
with gr.Column():
with gr.Row():
input_image = gr.Image(label="Your image", sources=["upload"], type="pil")
output_image = gr.Image(label="Generated Images", type="pil")
positive_input = gr.Textbox(label=f"Text prompt, Trigger words is '{trigger_word}'")
neg_input = gr.Textbox(label="Negative prompt")
with gr.Row():
seed_input = gr.Slider(0, 10_000_000, value=42, label="Seed")
steps_input = gr.Slider(label="Steps", value=10, minimum=5, maximum=50, step=1)
style_strength_ratio_input = gr.Slider(label="Style strength ratio", value=20, minimum=5, maximum=100, step=5)
btn = gr.Button()
btn.click(
generate_from_text,
[
positive_input,
input_image,
neg_input,
seed_input,
steps_input,
style_strength_ratio_input,
],
output_image,
)
gr.Examples(
[
[prompt, negative_prompt],
[
"A woman img wearing a Christmas hat",
negative_prompt,
],
[
"A man img in a helmet and vest riding a motorcycle",
negative_prompt,
],
[
"photo of a middle-aged man img sitting on a plush leather couch, and watching television show",
negative_prompt,
],
[
"photo of a skilled doctor img in a pristine white lab coat enjoying a delicious meal in a sophisticated dining room",
negative_prompt,
],
[
"photo of superman img flying through a vibrant sunset sky, with his cape billowing in the wind",
negative_prompt,
],
],
[positive_input, neg_input],
)
demo.queue().launch()
# リモートで起動する場合は、server_name と server_port を指定
# demo.launch(server_name='your server name', server_port='server port in int')
# 詳細はドキュメントをご覧ください: https://gradio.app/docs/
ローカル URL で実行中: http://127.0.0.1:7860 パブリックリンクを作成するには、launch() で share=True を設定します。
demo.close()
Closing server running on port: 7860