AnimeVideoEditot/core/video_processor.py
stirelshka8 25782333e2 uppd
2026-03-24 18:34:32 +03:00

537 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import shutil
import subprocess
import sys
import tempfile
import warnings
import glob
from moviepy import VideoFileClip, concatenate_videoclips
from moviepy.video.fx import Resize
from datetime import datetime
from core.logger import Logger
from utils.file_utils import ensure_directory
# Метаданные в итоговое видео
APP_NAME = "Anime Video Editor"
APP_TAG = "Anime Video Editor"
# Этапы для отображения в окне выполнения
STAGE_PREPARE = 1 # Обработка фрагментов
STAGE_CONCAT = 2 # Объединение
STAGE_WRITE = 3 # Сохранение (кодирование)
try:
import proglog
class WriteProgressLogger(proglog.ProgressBarLogger):
"""Логгер прогресса записи видео для передачи в окно выполнения."""
def __init__(self, callback=None, cancel_check=None, stage=STAGE_WRITE):
super().__init__()
self._callback = callback
self._cancel_check = cancel_check if callable(cancel_check) else (lambda: False)
self._stage = stage
def bars_callback(self, bar, attr, value, old_value=None):
if self._cancel_check():
raise CancelledError()
if not self._callback or bar not in self.bars:
return
total = self.bars[bar].get("total")
if total and total > 0:
pct = value / total
self._callback(self._stage, pct, f"Сохранение: {int(pct * 100)}%")
_HAS_PROGLOG = True
except Exception:
_HAS_PROGLOG = False
WriteProgressLogger = None
# При числе файлов выше порога обрабатываем по одному во временные файлы, затем склеиваем — меньше пиков по памяти
BATCH_THRESHOLD = 3
class CancelledError(Exception):
"""Остановка выполнения по запросу пользователя."""
pass
# Убираем шумные предупреждения о последних кадрах в некоторых MP4 (reader подставляет последний валидный кадр)
warnings.filterwarnings(
"ignore",
message=".*bytes wanted but 0 bytes read.*",
category=UserWarning,
)
def _get_ffmpeg_exe():
"""Путь к FFmpeg (тот же, что использует MoviePy/imageio). В exe (PyInstaller) — из папки сборки."""
if getattr(sys, "frozen", False) and getattr(sys, "_MEIPASS", None):
base = sys._MEIPASS
for folder in ("imageio_ffmpeg", "."):
search = os.path.join(base, folder, "*.exe")
for path in glob.glob(search):
if "ffmpeg" in os.path.basename(path).lower():
return path
try:
import imageio_ffmpeg
return imageio_ffmpeg.get_ffmpeg_exe()
except Exception:
pass
try:
from moviepy.config import get_setting
return get_setting("FFMPEG_BINARY") or "ffmpeg"
except Exception:
pass
return "ffmpeg"
def _write_video_metadata(file_path, metadata_dict, logger=None):
"""
Добавляет метаданные в видеофайл через FFmpeg (stream copy, без перекодирования).
metadata_dict: {"comment": "...", "title": "...", "encoder": "..."} и т.д.
"""
file_path = os.path.abspath(os.path.normpath(file_path))
if not metadata_dict or not os.path.isfile(file_path):
return
try:
ffmpeg_exe = _get_ffmpeg_exe()
out_dir = os.path.dirname(file_path)
if not out_dir:
out_dir = os.getcwd()
fd, temp_path = tempfile.mkstemp(suffix=os.path.splitext(file_path)[1], dir=out_dir)
os.close(fd)
cmd = [ffmpeg_exe, "-y", "-i", file_path, "-c", "copy"]
for key, value in metadata_dict.items():
if value is None or (isinstance(value, str) and not value.strip()):
continue
val = str(value).replace("\n", " ").replace("\r", "").strip()
if not val:
continue
cmd.extend(["-metadata", f"{key}={val}"])
cmd.append(temp_path)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and os.path.isfile(temp_path):
os.replace(temp_path, file_path)
if logger:
logger.log("INFO", "Метаданные записаны в файл")
else:
if os.path.isfile(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
if logger:
logger.log("WARNING", "Не удалось записать метаданные (FFmpeg): " + (result.stderr or "")[:200])
except Exception as e:
if logger:
logger.log("WARNING", f"Метаданные не записаны: {e}")
class VideoProcessor:
def __init__(self, logger=None):
self.logger = logger or Logger()
def check_disk_space_for_job(self, output_dir, video_data, quality):
"""Публичная проверка места на диске до старта обработки."""
use_batch = len(video_data) > BATCH_THRESHOLD
self._check_disk_space_or_raise(output_dir, video_data, quality, use_batch)
def process_videos(self, video_data, output_dir, output_format="mp4",
quality="high", progress_callback=None,
fps=None, preset="medium", audio_bitrate="192k",
target_height=None, filename_prefix="edited_video",
cancel_check=None):
"""
cancel_check: callable(), возвращает True если нужно остановить выполнение.
"""
self.logger.log("INFO", f"Начало обработки {len(video_data)} видео файлов")
check = cancel_check if callable(cancel_check) else lambda: False
use_batch = len(video_data) > BATCH_THRESHOLD
self._check_disk_space_or_raise(output_dir, video_data, quality, use_batch)
if use_batch:
self.logger.log("INFO", f"Режим пакетной обработки (файлов > {BATCH_THRESHOLD}), экономия памяти")
return self._process_videos_batch(
video_data, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, check,
)
return self._process_videos_memory(
video_data, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, check,
)
def _process_videos_memory(self, video_data, output_dir, output_format, quality,
progress_callback, fps, preset, audio_bitrate,
target_height, filename_prefix, cancel_check):
"""Обработка всех клипов в памяти (подходит для малого числа файлов)."""
clips = []
source_videos = []
all_keep_ranges = []
try:
for data in video_data:
if cancel_check():
raise CancelledError()
video_path = data['path']
remove_ranges = data['time_ranges']
self.logger.log("INFO", f"Файл: {os.path.basename(video_path)}")
video = VideoFileClip(video_path)
source_videos.append(video)
keep = self._ranges_to_keep(remove_ranges, video.duration)
all_keep_ranges.append(keep)
for s, e in keep:
self.logger.log("INFO", f" Оставляем: {self.format_time(s)}{self.format_time(e)}")
total_clip_steps = sum(len(k) for k in all_keep_ranges)
current_step = 0
for i in range(len(video_data)):
if cancel_check():
raise CancelledError()
video = source_videos[i]
keep_ranges = all_keep_ranges[i]
for start, end in keep_ranges:
if cancel_check():
raise CancelledError()
if progress_callback:
p = current_step / total_clip_steps if total_clip_steps else 1.0
progress_callback(STAGE_PREPARE, p, f"Обработка фрагмента {i + 1}/{len(video_data)}")
clip = video.subclipped(start, end)
clips.append(clip)
current_step += 1
if not clips:
raise ValueError("Нет видео фрагментов для объединения")
if cancel_check():
raise CancelledError()
if progress_callback:
progress_callback(STAGE_CONCAT, 1.0, "Объединение фрагментов")
self.logger.log("INFO", f"Объединение {len(clips)} фрагментов")
final_clip = concatenate_videoclips(clips)
if cancel_check():
raise CancelledError()
output_path = self._write_final(
final_clip, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, cancel_check,
)
final_clip.close()
for c in clips:
try:
c.close()
except Exception:
pass
for v in source_videos:
try:
v.close()
except Exception:
pass
if progress_callback:
progress_callback(STAGE_WRITE, 1.0, "Готово")
self.logger.log("SUCCESS", f"Обработка завершена успешно: {output_path}")
return output_path
except CancelledError:
for c in clips:
try:
c.close()
except Exception:
pass
for v in source_videos:
try:
v.close()
except Exception:
pass
raise
except Exception as e:
self.logger.log("ERROR", f"Ошибка обработки: {str(e)}")
for c in clips:
try:
c.close()
except Exception:
pass
for v in source_videos:
try:
v.close()
except Exception:
pass
raise
def _process_videos_batch(self, video_data, output_dir, output_format, quality,
progress_callback, fps, preset, audio_bitrate,
target_height, filename_prefix, cancel_check):
"""Обработка по одному файлу во временные ролики, затем склейка — меньше пиков памяти."""
temp_paths = []
n_files = len(video_data)
try:
ensure_directory(output_dir)
# Временные части создаём рядом с выходным каталогом:
# это снижает риск переполнения системного диска (обычно C:\Temp).
with tempfile.TemporaryDirectory(prefix="ave_", dir=output_dir) as tmpdir:
for i, data in enumerate(video_data):
if cancel_check():
raise CancelledError()
if progress_callback:
p = (i + 1) / n_files if n_files else 1.0
progress_callback(STAGE_PREPARE, p, f"Файл {i + 1}/{n_files}")
video_path = data['path']
remove_ranges = data['time_ranges']
keep = None
video = VideoFileClip(video_path)
try:
keep = self._ranges_to_keep(remove_ranges, video.duration)
if not keep:
continue
part_clips = []
for start, end in keep:
part_clips.append(video.subclipped(start, end))
if len(part_clips) == 1:
part = part_clips[0]
else:
part = concatenate_videoclips(part_clips)
for c in part_clips:
try:
c.close()
except Exception:
pass
temp_path = os.path.join(tmpdir, f"part_{i:04d}.mp4")
# В оконной сборке stdout/stderr могут быть None; отключаем дефолтный логгер MoviePy.
write_kw = {"codec": "libx264", "bitrate": self.get_bitrate(quality), "audio_codec": "aac", "logger": None}
if fps and fps > 0:
write_kw["fps"] = fps
if _HAS_PROGLOG and WriteProgressLogger is not None:
write_kw["logger"] = WriteProgressLogger(cancel_check=cancel_check, stage=STAGE_PREPARE)
try:
part.write_videofile(temp_path, **write_kw)
except TypeError:
part.write_videofile(
temp_path,
codec="libx264",
bitrate=self.get_bitrate(quality),
audio_codec="aac",
logger=None,
)
if cancel_check():
raise CancelledError()
part.close()
temp_paths.append(temp_path)
finally:
try:
video.close()
except Exception:
pass
if not temp_paths:
raise ValueError("Нет видео фрагментов для объединения")
if cancel_check():
raise CancelledError()
if progress_callback:
progress_callback(STAGE_CONCAT, 1.0, "Финальная склейка")
clip_list = [VideoFileClip(p) for p in temp_paths]
try:
final_clip = concatenate_videoclips(clip_list)
if cancel_check():
raise CancelledError()
output_path = self._write_final(
final_clip, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, cancel_check,
)
final_clip.close()
finally:
for c in clip_list:
try:
c.close()
except Exception:
pass
if progress_callback:
progress_callback(STAGE_WRITE, 1.0, "Готово")
self.logger.log("SUCCESS", f"Обработка завершена успешно: {output_path}")
return output_path
except CancelledError:
raise
except Exception as e:
self.logger.log("ERROR", f"Ошибка обработки: {str(e)}")
raise
def _write_final(self, final_clip, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, cancel_check=None):
"""Масштабирование (если нужно) и запись итогового файла. final_clip закрывает вызывающий код."""
check = cancel_check if callable(cancel_check) else lambda: False
if check():
raise CancelledError()
to_write = final_clip
resized = None
if target_height and target_height > 0:
try:
if final_clip.h != target_height:
resized = final_clip.with_effects([Resize(height=target_height)])
to_write = resized
self.logger.log("INFO", f"Масштабирование до высоты {target_height}px")
except Exception as e:
self.logger.log("WARNING", f"Масштабирование пропущено: {e}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_prefix = "".join(c for c in filename_prefix if c.isalnum() or c in " _-") or "edited_video"
output_filename = f"{safe_prefix}_{timestamp}.{output_format}"
output_path = os.path.join(output_dir, output_filename)
ensure_directory(output_dir)
if progress_callback:
progress_callback(STAGE_WRITE, 0.0, "Сохранение итогового файла...")
self.logger.log("INFO", f"Сохранение файла: {output_path}")
write_kw = {"codec": "libx264", "bitrate": self.get_bitrate(quality), "audio_codec": "aac", "logger": None}
if fps is not None and fps > 0:
write_kw["fps"] = fps
write_kw["preset"] = preset
write_kw["audio_bitrate"] = audio_bitrate
if _HAS_PROGLOG and WriteProgressLogger is not None:
write_kw["logger"] = WriteProgressLogger(
callback=progress_callback,
cancel_check=check,
stage=STAGE_WRITE,
)
try:
to_write.write_videofile(output_path, **write_kw)
except TypeError:
write_kw.pop("preset", None)
write_kw.pop("audio_bitrate", None)
write_kw["logger"] = None
to_write.write_videofile(output_path, **write_kw)
if check():
raise CancelledError()
except CancelledError:
self._remove_if_exists(output_path)
raise
except Exception as e:
self._remove_if_exists(output_path)
raise RuntimeError(self._friendly_write_error(e, output_dir)) from e
if resized is not None:
try:
resized.close()
except Exception:
pass
# Метаданные: программа, дата, название
meta = {
"comment": f"Отредактировано в {APP_NAME}. Дата: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.",
"title": safe_prefix or "edited_video",
"encoder": APP_TAG,
"description": f"Создано в {APP_NAME} — объединение и нарезка видео.",
}
_write_video_metadata(output_path, meta, self.logger)
return output_path
@staticmethod
def _remove_if_exists(path):
try:
if path and os.path.isfile(path):
os.remove(path)
except OSError:
pass
@staticmethod
def _friendly_write_error(error, output_dir):
msg = str(error or "")
low = msg.lower()
if "no space left on device" in low or "errno 32" in low or "broken pipe" in low:
return (
"Недостаточно свободного места на диске для сохранения видео. "
f"Освободите место в папке назначения ({output_dir}) и повторите попытку."
)
return msg
def _check_disk_space_or_raise(self, output_dir, video_data, quality, use_batch):
try:
free_bytes = shutil.disk_usage(output_dir).free
except Exception:
# Если не получилось определить свободное место, не блокируем обработку.
return
required_bytes = self._estimate_required_space_bytes(video_data, quality, use_batch)
if free_bytes >= required_bytes:
self.logger.log(
"INFO",
f"Проверка места: требуется ~{self._format_size(required_bytes)}, доступно {self._format_size(free_bytes)}"
)
return
raise RuntimeError(
"Недостаточно свободного места для обработки. "
f"Нужно примерно {self._format_size(required_bytes)}, доступно {self._format_size(free_bytes)}. "
"Выберите другой диск или освободите место."
)
@staticmethod
def _estimate_required_space_bytes(video_data, quality, use_batch):
input_total = 0
for data in video_data:
path = data.get("path")
if path and os.path.isfile(path):
try:
input_total += os.path.getsize(path)
except OSError:
pass
# Грубая оценка размера результата с запасом в зависимости от качества.
quality_factor = {
"low": 0.7,
"medium": 1.0,
"high": 1.4,
}.get(quality, 1.0)
output_estimate = int(input_total * quality_factor)
# В пакетном режиме временные части могут занимать объём, близкий к итогу.
temp_estimate = output_estimate if use_batch else int(output_estimate * 0.2)
# Дополнительный запас на контейнер/служебные файлы и колебания битрейта.
safety_margin = int((output_estimate + temp_estimate) * 0.25)
return output_estimate + temp_estimate + safety_margin
@staticmethod
def _format_size(num_bytes):
units = ["B", "KB", "MB", "GB", "TB"]
value = float(max(0, num_bytes))
for unit in units:
if value < 1024 or unit == units[-1]:
return f"{value:.1f} {unit}"
value /= 1024.0
def get_bitrate(self, quality):
"""Определение битрейта в зависимости от качества"""
quality_settings = {
'low': '1000k',
'medium': '3000k',
'high': '8000k'
}
return quality_settings.get(quality, '3000k')
def format_time(self, seconds):
"""Форматирование времени в читаемый вид"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
@staticmethod
def _ranges_to_keep(remove_ranges, duration):
"""
По диапазонам на удаление и длительности возвращает диапазоны для сохранения.
remove_ranges: список (start, end) — что вырезать.
Возвращает список (start, end) — что оставить и склеить.
"""
if not remove_ranges:
return [(0, duration)] if duration > 0 else []
# Сортируем и сливаем пересекающиеся
sorted_r = sorted((max(0, s), min(duration, e)) for s, e in remove_ranges if s < e)
merged = []
for s, e in sorted_r:
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
# Обратное: оставляем всё, кроме merged
keep = []
t = 0
for rs, re in merged:
if t < rs:
keep.append((t, rs))
t = max(t, re)
if t < duration:
keep.append((t, duration))
return keep