AnimeVideoEditot/core/video_processor.py
2026-03-14 19:52:44 +03:00

418 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import subprocess
import tempfile
import warnings
from moviepy import VideoFileClip, concatenate_videoclips
from moviepy.video.fx import Resize
from datetime import datetime
from core.logger import Logger
from utils.file_utils import ensure_directory
# Метаданные в итоговое видео
APP_NAME = "Anime Video Editor"
APP_TAG = "Anime Video Editor"
# Этапы для отображения в окне выполнения
STAGE_PREPARE = 1 # Обработка фрагментов
STAGE_CONCAT = 2 # Объединение
STAGE_WRITE = 3 # Сохранение (кодирование)
try:
import proglog
class WriteProgressLogger(proglog.ProgressBarLogger):
"""Логгер прогресса записи видео для передачи в окно выполнения."""
def __init__(self, callback):
super().__init__()
self._callback = callback
def bars_callback(self, bar, attr, value, old_value=None):
if not self._callback or bar not in self.bars:
return
total = self.bars[bar].get("total")
if total and total > 0:
pct = value / total
self._callback(STAGE_WRITE, pct, f"Сохранение: {int(pct * 100)}%")
_HAS_PROGLOG = True
except Exception:
_HAS_PROGLOG = False
WriteProgressLogger = None
# При числе файлов выше порога обрабатываем по одному во временные файлы, затем склеиваем — меньше пиков по памяти
BATCH_THRESHOLD = 3
class CancelledError(Exception):
"""Остановка выполнения по запросу пользователя."""
pass
# Убираем шумные предупреждения о последних кадрах в некоторых MP4 (reader подставляет последний валидный кадр)
warnings.filterwarnings(
"ignore",
message=".*bytes wanted but 0 bytes read.*",
category=UserWarning,
)
def _get_ffmpeg_exe():
"""Путь к FFmpeg (тот же, что использует MoviePy/imageio)."""
try:
import imageio_ffmpeg
return imageio_ffmpeg.get_ffmpeg_exe()
except Exception:
pass
try:
from moviepy.config import get_setting
return get_setting("FFMPEG_BINARY") or "ffmpeg"
except Exception:
pass
return "ffmpeg"
def _write_video_metadata(file_path, metadata_dict, logger=None):
"""
Добавляет метаданные в видеофайл через FFmpeg (stream copy, без перекодирования).
metadata_dict: {"comment": "...", "title": "...", "encoder": "..."} и т.д.
"""
file_path = os.path.abspath(os.path.normpath(file_path))
if not metadata_dict or not os.path.isfile(file_path):
return
try:
ffmpeg_exe = _get_ffmpeg_exe()
out_dir = os.path.dirname(file_path)
if not out_dir:
out_dir = os.getcwd()
fd, temp_path = tempfile.mkstemp(suffix=os.path.splitext(file_path)[1], dir=out_dir)
os.close(fd)
cmd = [ffmpeg_exe, "-y", "-i", file_path, "-c", "copy"]
for key, value in metadata_dict.items():
if value is None or (isinstance(value, str) and not value.strip()):
continue
val = str(value).replace("\n", " ").replace("\r", "").strip()
if not val:
continue
cmd.extend(["-metadata", f"{key}={val}"])
cmd.append(temp_path)
result = subprocess.run(cmd, capture_output=True, text=True, timeout=120)
if result.returncode == 0 and os.path.isfile(temp_path):
os.replace(temp_path, file_path)
if logger:
logger.log("INFO", "Метаданные записаны в файл")
else:
if os.path.isfile(temp_path):
try:
os.remove(temp_path)
except OSError:
pass
if logger:
logger.log("WARNING", "Не удалось записать метаданные (FFmpeg): " + (result.stderr or "")[:200])
except Exception as e:
if logger:
logger.log("WARNING", f"Метаданные не записаны: {e}")
class VideoProcessor:
def __init__(self, logger=None):
self.logger = logger or Logger()
def process_videos(self, video_data, output_dir, output_format="mp4",
quality="high", progress_callback=None,
fps=None, preset="medium", audio_bitrate="192k",
target_height=None, filename_prefix="edited_video",
cancel_check=None):
"""
cancel_check: callable(), возвращает True если нужно остановить выполнение.
"""
self.logger.log("INFO", f"Начало обработки {len(video_data)} видео файлов")
check = cancel_check if callable(cancel_check) else lambda: False
use_batch = len(video_data) > BATCH_THRESHOLD
if use_batch:
self.logger.log("INFO", f"Режим пакетной обработки (файлов > {BATCH_THRESHOLD}), экономия памяти")
return self._process_videos_batch(
video_data, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, check,
)
return self._process_videos_memory(
video_data, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, check,
)
def _process_videos_memory(self, video_data, output_dir, output_format, quality,
progress_callback, fps, preset, audio_bitrate,
target_height, filename_prefix, cancel_check):
"""Обработка всех клипов в памяти (подходит для малого числа файлов)."""
clips = []
source_videos = []
all_keep_ranges = []
try:
for data in video_data:
if cancel_check():
raise CancelledError()
video_path = data['path']
remove_ranges = data['time_ranges']
self.logger.log("INFO", f"Файл: {os.path.basename(video_path)}")
video = VideoFileClip(video_path)
source_videos.append(video)
keep = self._ranges_to_keep(remove_ranges, video.duration)
all_keep_ranges.append(keep)
for s, e in keep:
self.logger.log("INFO", f" Оставляем: {self.format_time(s)}{self.format_time(e)}")
total_clip_steps = sum(len(k) for k in all_keep_ranges)
current_step = 0
for i in range(len(video_data)):
if cancel_check():
raise CancelledError()
video = source_videos[i]
keep_ranges = all_keep_ranges[i]
for start, end in keep_ranges:
if cancel_check():
raise CancelledError()
if progress_callback:
p = current_step / total_clip_steps if total_clip_steps else 1.0
progress_callback(STAGE_PREPARE, p, f"Обработка фрагмента {i + 1}/{len(video_data)}")
clip = video.subclipped(start, end)
clips.append(clip)
current_step += 1
if not clips:
raise ValueError("Нет видео фрагментов для объединения")
if cancel_check():
raise CancelledError()
if progress_callback:
progress_callback(STAGE_CONCAT, 1.0, "Объединение фрагментов")
self.logger.log("INFO", f"Объединение {len(clips)} фрагментов")
final_clip = concatenate_videoclips(clips)
if cancel_check():
raise CancelledError()
output_path = self._write_final(
final_clip, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, cancel_check,
)
final_clip.close()
for c in clips:
try:
c.close()
except Exception:
pass
for v in source_videos:
try:
v.close()
except Exception:
pass
if progress_callback:
progress_callback(STAGE_WRITE, 1.0, "Готово")
self.logger.log("SUCCESS", f"Обработка завершена успешно: {output_path}")
return output_path
except CancelledError:
for c in clips:
try:
c.close()
except Exception:
pass
for v in source_videos:
try:
v.close()
except Exception:
pass
raise
except Exception as e:
self.logger.log("ERROR", f"Ошибка обработки: {str(e)}")
for c in clips:
try:
c.close()
except Exception:
pass
for v in source_videos:
try:
v.close()
except Exception:
pass
raise
def _process_videos_batch(self, video_data, output_dir, output_format, quality,
progress_callback, fps, preset, audio_bitrate,
target_height, filename_prefix, cancel_check):
"""Обработка по одному файлу во временные ролики, затем склейка — меньше пиков памяти."""
temp_paths = []
n_files = len(video_data)
try:
ensure_directory(output_dir)
with tempfile.TemporaryDirectory(prefix="ave_") as tmpdir:
for i, data in enumerate(video_data):
if cancel_check():
raise CancelledError()
if progress_callback:
p = (i + 1) / n_files if n_files else 1.0
progress_callback(STAGE_PREPARE, p, f"Файл {i + 1}/{n_files}")
video_path = data['path']
remove_ranges = data['time_ranges']
keep = None
video = VideoFileClip(video_path)
try:
keep = self._ranges_to_keep(remove_ranges, video.duration)
if not keep:
current_step += 1
continue
part_clips = []
for start, end in keep:
part_clips.append(video.subclipped(start, end))
if len(part_clips) == 1:
part = part_clips[0]
else:
part = concatenate_videoclips(part_clips)
for c in part_clips:
try:
c.close()
except Exception:
pass
temp_path = os.path.join(tmpdir, f"part_{i:04d}.mp4")
write_kw = {"codec": "libx264", "bitrate": self.get_bitrate(quality), "audio_codec": "aac"}
if fps and fps > 0:
write_kw["fps"] = fps
try:
part.write_videofile(temp_path, **write_kw)
except TypeError:
part.write_videofile(temp_path, codec="libx264", bitrate=self.get_bitrate(quality), audio_codec="aac")
part.close()
temp_paths.append(temp_path)
finally:
try:
video.close()
except Exception:
pass
if not temp_paths:
raise ValueError("Нет видео фрагментов для объединения")
if cancel_check():
raise CancelledError()
if progress_callback:
progress_callback(STAGE_CONCAT, 1.0, "Финальная склейка")
clip_list = [VideoFileClip(p) for p in temp_paths]
try:
final_clip = concatenate_videoclips(clip_list)
if cancel_check():
raise CancelledError()
output_path = self._write_final(
final_clip, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, cancel_check,
)
final_clip.close()
finally:
for c in clip_list:
try:
c.close()
except Exception:
pass
if progress_callback:
progress_callback(STAGE_WRITE, 1.0, "Готово")
self.logger.log("SUCCESS", f"Обработка завершена успешно: {output_path}")
return output_path
except CancelledError:
raise
except Exception as e:
self.logger.log("ERROR", f"Ошибка обработки: {str(e)}")
raise
def _write_final(self, final_clip, output_dir, output_format, quality, progress_callback,
fps, preset, audio_bitrate, target_height, filename_prefix, cancel_check=None):
"""Масштабирование (если нужно) и запись итогового файла. final_clip закрывает вызывающий код."""
check = cancel_check if callable(cancel_check) else lambda: False
if check():
raise CancelledError()
to_write = final_clip
resized = None
if target_height and target_height > 0:
try:
if final_clip.h != target_height:
resized = final_clip.with_effects([Resize(height=target_height)])
to_write = resized
self.logger.log("INFO", f"Масштабирование до высоты {target_height}px")
except Exception as e:
self.logger.log("WARNING", f"Масштабирование пропущено: {e}")
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
safe_prefix = "".join(c for c in filename_prefix if c.isalnum() or c in " _-") or "edited_video"
output_filename = f"{safe_prefix}_{timestamp}.{output_format}"
output_path = os.path.join(output_dir, output_filename)
ensure_directory(output_dir)
if progress_callback:
progress_callback(STAGE_WRITE, 0.0, "Сохранение итогового файла...")
self.logger.log("INFO", f"Сохранение файла: {output_path}")
write_kw = {"codec": "libx264", "bitrate": self.get_bitrate(quality), "audio_codec": "aac"}
if fps is not None and fps > 0:
write_kw["fps"] = fps
write_kw["preset"] = preset
write_kw["audio_bitrate"] = audio_bitrate
if _HAS_PROGLOG and progress_callback and WriteProgressLogger is not None:
write_kw["logger"] = WriteProgressLogger(progress_callback)
try:
to_write.write_videofile(output_path, **write_kw)
except TypeError:
write_kw.pop("preset", None)
write_kw.pop("audio_bitrate", None)
write_kw.pop("logger", None)
to_write.write_videofile(output_path, **write_kw)
if resized is not None:
try:
resized.close()
except Exception:
pass
# Метаданные: программа, дата, название
meta = {
"comment": f"Отредактировано в {APP_NAME}. Дата: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}.",
"title": safe_prefix or "edited_video",
"encoder": APP_TAG,
"description": f"Создано в {APP_NAME} — объединение и нарезка видео.",
}
_write_video_metadata(output_path, meta, self.logger)
return output_path
def get_bitrate(self, quality):
"""Определение битрейта в зависимости от качества"""
quality_settings = {
'low': '1000k',
'medium': '3000k',
'high': '8000k'
}
return quality_settings.get(quality, '3000k')
def format_time(self, seconds):
"""Форматирование времени в читаемый вид"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
secs = int(seconds % 60)
return f"{hours:02d}:{minutes:02d}:{secs:02d}"
@staticmethod
def _ranges_to_keep(remove_ranges, duration):
"""
По диапазонам на удаление и длительности возвращает диапазоны для сохранения.
remove_ranges: список (start, end) — что вырезать.
Возвращает список (start, end) — что оставить и склеить.
"""
if not remove_ranges:
return [(0, duration)] if duration > 0 else []
# Сортируем и сливаем пересекающиеся
sorted_r = sorted((max(0, s), min(duration, e)) for s, e in remove_ranges if s < e)
merged = []
for s, e in sorted_r:
if merged and s <= merged[-1][1]:
merged[-1] = (merged[-1][0], max(merged[-1][1], e))
else:
merged.append((s, e))
# Обратное: оставляем всё, кроме merged
keep = []
t = 0
for rs, re in merged:
if t < rs:
keep.append((t, rs))
t = max(t, re)
if t < duration:
keep.append((t, duration))
return keep