File size: 4,189 Bytes
31f2f28 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 |
# Prediction interface for Cog ⚙️
# https://github.com/replicate/cog/blob/main/docs/python.md
import os
import subprocess
from cog import BasePredictor, Input, Path
import inference
from time import time
from functools import wraps
import torch
def make_mem_efficient(cls: BasePredictor):
if not torch.cuda.is_available():
return cls
old_setup = cls.setup
old_predict = cls.predict
@wraps(old_setup)
def new_setup(self, *args, **kwargs):
ret = old_setup(self, *args, **kwargs)
_move_to(self, "cpu")
return ret
@wraps(old_predict)
def new_predict(self, *args, **kwargs):
_move_to(self, "cuda")
try:
ret = old_predict(self, *args, **kwargs)
finally:
_move_to(self, "cpu")
return ret
cls.setup = new_setup
cls.predict = new_predict
return cls
def _move_to(self, device):
try:
self = self.cached_models
except AttributeError:
pass
for attr, value in vars(self).items():
try:
value = value.to(device)
except AttributeError:
pass
else:
print(f"Moving {self.__name__}.{attr} to {device}")
setattr(self, attr, value)
torch.cuda.empty_cache()
@make_mem_efficient
class Predictor(BasePredictor):
cached_models = inference
def setup(self):
inference.do_load("checkpoints/wav2lip_gan.pth")
def predict(
self,
face: Path = Input(description="video/image that contains faces to use"),
audio: Path = Input(description="video/audio file to use as raw audio source"),
pads: str = Input(
description="Padding for the detected face bounding box.\n"
"Please adjust to include chin at least\n"
'Format: "top bottom left right"',
default="0 10 0 0",
),
smooth: bool = Input(
description="Smooth face detections over a short temporal window",
default=True,
),
fps: float = Input(
description="Can be specified only if input is a static image",
default=25.0,
),
out_height: int = Input(
description="Output video height. Best results are obtained at 480 or 720",
default=480,
),
) -> Path:
try:
os.remove("results/result_voice.mp4")
except FileNotFoundError:
pass
face_ext = os.path.splitext(face)[-1]
if face_ext not in [".mp4", ".mov", ".png" , ".jpg" , ".jpeg" , ".gif", ".mkv", ".webp"]:
raise ValueError(f'Unsupported face format {face_ext!r}')
audio_ext = os.path.splitext(audio)[-1]
if audio_ext not in [".wav", ".mp3"]:
raise ValueError(f'Unsupported audio format {audio_ext!r}')
args = [
"--checkpoint_path", "checkpoints/wav2lip_gan.pth",
"--face", str(face),
"--audio", str(audio),
"--pads", *pads.split(" "),
"--fps", str(fps),
"--out_height", str(out_height),
]
if not smooth:
args += ["--nosmooth"]
print("-> run:", " ".join(args))
inference.args = inference.parser.parse_args(args)
s = time()
try:
inference.main()
except ValueError as e:
print('-> Encountered error, skipping lipsync:', e)
args = [
"ffmpeg", "-y",
# "-vsync", "0", "-hwaccel", "cuda", "-hwaccel_output_format", "cuda",
"-stream_loop", "-1",
"-i", str(face),
"-i", str(audio),
"-shortest",
"-fflags", "+shortest",
"-max_interleave_delta", "100M",
"-map", "0:v:0",
"-map", "1:a:0",
# "-c", "copy",
# "-c:v", "h264_nvenc",
"results/result_voice.mp4",
]
print("-> run:", " ".join(args))
print(subprocess.check_output(args, encoding="utf-8"))
print(time() - s)
return Path("results/result_voice.mp4")
|