feat: Video Trimmer and more

This commit is contained in:
Markos Gogoulos
2025-06-11 14:48:30 +03:00
committed by GitHub
parent d34fc328bf
commit b28c2d8271
124 changed files with 15696 additions and 586 deletions

View File

@@ -3,6 +3,7 @@
import hashlib
import json
import logging
import os
import random
import shutil
@@ -15,6 +16,9 @@ from django.conf import settings
CHARS = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
logger = logging.getLogger(__name__)
CRF_ENCODING_NUM_SECONDS = 2 # 0 * 60 # videos with greater duration will get
# CRF encoding and not two-pass
# Encoding individual chunks may yield quality variations if you use a
@@ -787,6 +791,179 @@ def clean_query(query):
return query.lower()
def timestamp_to_seconds(timestamp):
"""Convert a timestamp in format HH:MM:SS.mmm to seconds
Args:
timestamp (str): Timestamp in format HH:MM:SS.mmm
Returns:
float: Timestamp in seconds
"""
h, m, s = timestamp.split(':')
s, ms = s.split('.')
return int(h) * 3600 + int(m) * 60 + int(s) + float('0.' + ms)
def seconds_to_timestamp(seconds):
"""Convert seconds to timestamp in format HH:MM:SS.mmm
Args:
seconds (float): Time in seconds
Returns:
str: Timestamp in format HH:MM:SS.mmm
"""
hours = int(seconds // 3600)
minutes = int((seconds % 3600) // 60)
seconds_remainder = seconds % 60
seconds_int = int(seconds_remainder)
milliseconds = int((seconds_remainder - seconds_int) * 1000)
return f"{hours:02d}:{minutes:02d}:{seconds_int:02d}.{milliseconds:03d}" # noqa
def get_trim_timestamps(media_file_path, timestamps_list, run_ffprobe=False):
"""Process a list of timestamps to align start times with I-frames for better video trimming
Args:
media_file_path (str): Path to the media file
timestamps_list (list): List of dictionaries with startTime and endTime
Returns:
list: Processed timestamps with adjusted startTime values
"""
if not isinstance(timestamps_list, list):
return []
timestamps_results = []
timestamps_to_process = []
for item in timestamps_list:
if isinstance(item, dict) and 'startTime' in item and 'endTime' in item:
timestamps_to_process.append(item)
if not timestamps_to_process:
return []
# just a single timestamp with no startTime, no need to process
if len(timestamps_to_process) == 1 and timestamps_to_process[0]['startTime'] == "00:00:00.000":
return timestamps_list
# Process each timestamp
for item in timestamps_to_process:
startTime = item['startTime']
endTime = item['endTime']
# with ffmpeg -ss -i that is getting run, there is no need to call ffprobe to find the I-frame,
# as ffmpeg will do that. Keeping this for now in case it is needed
i_frames = []
if run_ffprobe:
SEC_TO_SUBTRACT = 10
start_seconds = timestamp_to_seconds(startTime)
search_start = max(0, start_seconds - SEC_TO_SUBTRACT)
# Create ffprobe command to find nearest I-frame
cmd = [
settings.FFPROBE_COMMAND,
"-v",
"error",
"-select_streams",
"v:0",
"-show_entries",
"frame=pts_time,pict_type",
"-of",
"csv=p=0",
"-read_intervals",
f"{search_start}%{startTime}",
media_file_path,
]
cmd = [str(s) for s in cmd]
logger.info(f"trim cmd: {cmd}")
stdout = run_command(cmd).get("out")
if stdout:
for line in stdout.strip().split('\n'):
if line and line.endswith(',I'):
i_frames.append(line.replace(',I', ''))
if i_frames:
adjusted_startTime = seconds_to_timestamp(float(i_frames[-1]))
if not i_frames:
adjusted_startTime = startTime
timestamps_results.append({'startTime': adjusted_startTime, 'endTime': endTime})
return timestamps_results
def trim_video_method(media_file_path, timestamps_list):
"""Trim a video file based on a list of timestamps
Args:
media_file_path (str): Path to the media file
timestamps_list (list): List of dictionaries with startTime and endTime
Returns:
bool: True if successful, False otherwise
"""
if not isinstance(timestamps_list, list) or not timestamps_list:
return False
if not os.path.exists(media_file_path):
return False
with tempfile.TemporaryDirectory(dir=settings.TEMP_DIRECTORY) as temp_dir:
output_file = os.path.join(temp_dir, "output.mp4")
segment_files = []
for i, item in enumerate(timestamps_list):
start_time = timestamp_to_seconds(item['startTime'])
end_time = timestamp_to_seconds(item['endTime'])
duration = end_time - start_time
# For single timestamp, we can use the output file directly
# For multiple timestamps, we need to create segment files
segment_file = output_file if len(timestamps_list) == 1 else os.path.join(temp_dir, f"segment_{i}.mp4")
cmd = [settings.FFMPEG_COMMAND, "-y", "-ss", str(item['startTime']), "-i", media_file_path, "-t", str(duration), "-c", "copy", "-avoid_negative_ts", "1", segment_file]
result = run_command(cmd) # noqa
if os.path.exists(segment_file) and os.path.getsize(segment_file) > 0:
if len(timestamps_list) > 1:
segment_files.append(segment_file)
else:
return False
if len(timestamps_list) > 1:
if not segment_files:
return False
concat_list_path = os.path.join(temp_dir, "concat_list.txt")
with open(concat_list_path, "w") as f:
for segment in segment_files:
f.write(f"file '{segment}'\n")
concat_cmd = [settings.FFMPEG_COMMAND, "-y", "-f", "concat", "-safe", "0", "-i", concat_list_path, "-c", "copy", output_file]
concat_result = run_command(concat_cmd) # noqa
if not os.path.exists(output_file) or os.path.getsize(output_file) == 0:
return False
# Replace the original file with the trimmed version
try:
rm_file(media_file_path)
shutil.copy2(output_file, media_file_path)
return True
except Exception as e:
logger.info(f"Failed to replace original file: {str(e)}")
return False
def get_alphanumeric_only(string):
"""Returns a query that contains only alphanumeric characters
This include characters other than the English alphabet too