diff --git a/perturbation/check_video.py b/perturbation/check_video.py index f5da18a..54e923a 100644 --- a/perturbation/check_video.py +++ b/perturbation/check_video.py @@ -1,23 +1,12 @@ import argparse import os - import cv2 - def parse_args(): parser = argparse.ArgumentParser(description='Check video information.') - parser.add_argument('--vid_in', - type=str, - required=True, - help='path to the input video') - parser.add_argument('--vid_out', - type=str, - required=True, - help='path to the output video') - args = parser.parse_args() - - return args - + parser.add_argument('--vid_in', type=str, required=True, help='Path to the input video') + parser.add_argument('--vid_out', type=str, required=True, help='Path to the output video') + return parser.parse_args() def get_vid_info(vid): w = vid.get(cv2.CAP_PROP_FRAME_WIDTH) @@ -25,44 +14,51 @@ def get_vid_info(vid): fps = vid.get(cv2.CAP_PROP_FPS) fourcc = vid.get(cv2.CAP_PROP_FOURCC) frame_count = vid.get(cv2.CAP_PROP_FRAME_COUNT) - return w, h, fps, fourcc, frame_count - def main(): args = parse_args() - vid_in = args.vid_in vid_out = args.vid_out - # check input args - assert os.path.exists(vid_in), 'Input video does not exist.' - assert os.path.exists(vid_out), 'Output video does not exist.' + if not os.path.exists(vid_in): + raise FileNotFoundError('Input video does not exist.') + + if not os.path.exists(vid_out): + raise FileNotFoundError('Output video does not exist.') - # check video info - vid1 = cv2.VideoCapture(vid_in) - w1, h1, fps1, fourcc1, frame_count1 = get_vid_info(vid1) - vid1.release() - vid2 = cv2.VideoCapture(vid_out) - w2, h2, fps2, fourcc2, frame_count2 = get_vid_info(vid2) - vid2.release() - assert w1 == w2, ('Frame width should be the same, ' - f'but got {w1} in input video, {w2} in output video.') - assert h1 == h2, ('Frame height should be the same, ' - f'but got {h1} in input video, {h2} in output video.') - assert fps1 == fps2, ('Video fps should be the same, but got ' - f'{fps1} in input video, {fps2} in output video.') - assert fourcc1 == fourcc2, ('Video fourcc should be the same, but got ' - f'{fourcc1} in input video, {fourcc2} in ' - 'output video.') - assert frame_count1 == frame_count2, ('Frame count should be the same, ' - f'but got {frame_count1} in ' - f'input video, {frame_count2} in ' - 'output video.') + try: + vid1 = cv2.VideoCapture(vid_in) + if not vid1.isOpened(): + raise RuntimeError('Could not open input video.') - # pass all assertions, succeed - print('No problem.') + w1, h1, fps1, fourcc1, frame_count1 = get_vid_info(vid1) + vid1.release() + + vid2 = cv2.VideoCapture(vid_out) + if not vid2.isOpened(): + raise RuntimeError('Could not open output video.') + + w2, h2, fps2, fourcc2, frame_count2 = get_vid_info(vid2) + vid2.release() + except Exception as e: + print(f"Error occurred: {e}") + return + + # Check video properties + if w1 != w2: + raise ValueError(f'Frame width mismatch: {w1} in input vs {w2} in output.') + if h1 != h2: + raise ValueError(f'Frame height mismatch: {h1} in input vs {h2} in output.') + if fps1 != fps2: + raise ValueError(f'FPS mismatch: {fps1} in input vs {fps2} in output.') + if fourcc1 != fourcc2: + raise ValueError(f'FOURCC mismatch: {fourcc1} in input vs {fourcc2} in output.') + if frame_count1 != frame_count2: + raise ValueError(f'Frame count mismatch: {frame_count1} in input vs {frame_count2} in output.') + + print('No problem.') if __name__ == '__main__': main() diff --git a/perturbation/distortions.py b/perturbation/distortions.py index 4a7faaf..458e4b5 100644 --- a/perturbation/distortions.py +++ b/perturbation/distortions.py @@ -1,92 +1,168 @@ import math import os import random - import cv2 import numpy as np - def bgr2ycbcr(img_bgr): + """ + Convert BGR image to YCbCr color space. + + Args: + img_bgr (np.ndarray): Input image in BGR format. + + Returns: + np.ndarray: Image converted to YCbCr format. + """ img_bgr = img_bgr.astype(np.float32) img_ycrcb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YCR_CB) - img_ycbcr = img_ycrcb[:, :, (0, 2, 1)].astype(np.float32) - # to [16/255, 235/255] + img_ycbcr = img_ycrcb[:, :, (0, 2, 1)] + + # Scale to [16/255, 235/255] for Y and [16/255, 240/255] for Cb and Cr img_ycbcr[:, :, 0] = (img_ycbcr[:, :, 0] * (235 - 16) + 16) / 255.0 - # to [16/255, 240/255] img_ycbcr[:, :, 1:] = (img_ycbcr[:, :, 1:] * (240 - 16) + 16) / 255.0 - + return img_ycbcr - def ycbcr2bgr(img_ycbcr): + """ + Convert YCbCr image back to BGR color space. + + Args: + img_ycbcr (np.ndarray): Input image in YCbCr format. + + Returns: + np.ndarray: Image converted to BGR format. + """ img_ycbcr = img_ycbcr.astype(np.float32) - # to [0, 1] + + # Scale back to [0, 1] img_ycbcr[:, :, 0] = (img_ycbcr[:, :, 0] * 255.0 - 16) / (235 - 16) - # to [0, 1] img_ycbcr[:, :, 1:] = (img_ycbcr[:, :, 1:] * 255.0 - 16) / (240 - 16) - img_ycrcb = img_ycbcr[:, :, (0, 2, 1)].astype(np.float32) + + img_ycrcb = img_ycbcr[:, :, (0, 2, 1)] img_bgr = cv2.cvtColor(img_ycrcb, cv2.COLOR_YCR_CB2BGR) - + return img_bgr - def color_saturation(img, param): + """ + Adjust the color saturation of the image. + + Args: + img (np.ndarray): Input image in BGR format. + param (float): Saturation adjustment factor (1.0 = no change). + + Returns: + np.ndarray: Saturated image in BGR format. + """ ycbcr = bgr2ycbcr(img) - ycbcr[:, :, 1] = 0.5 + (ycbcr[:, :, 1] - 0.5) * param - ycbcr[:, :, 2] = 0.5 + (ycbcr[:, :, 2] - 0.5) * param + ycbcr[:, :, 1] = np.clip(0.5 + (ycbcr[:, :, 1] - 0.5) * param, 0, 1) + ycbcr[:, :, 2] = np.clip(0.5 + (ycbcr[:, :, 2] - 0.5) * param, 0, 1) + img = ycbcr2bgr(ycbcr).astype(np.uint8) - return img - def color_contrast(img, param): - img = img.astype(np.float32) * param - img = img.astype(np.uint8) + """ + Adjust the contrast of the image. - return img + Args: + img (np.ndarray): Input image in BGR format. + param (float): Contrast adjustment factor (1.0 = no change). + Returns: + np.ndarray: Contrast-adjusted image in BGR format. + """ + img = np.clip(img.astype(np.float32) * param, 0, 255).astype(np.uint8) + return img def block_wise(img, param): + """ + Apply block-wise masking on the image. + + Args: + img (np.ndarray): Input image in BGR format. + param (int): Number of blocks to mask. + + Returns: + np.ndarray: Image with block masking applied. + """ width = 8 - block = np.ones((width, width, 3)).astype(int) * 128 - param = min(img.shape[0], img.shape[1]) // 256 * param - for i in range(param): - r_w = random.randint(0, img.shape[1] - 1 - width) - r_h = random.randint(0, img.shape[0] - 1 - width) + block = np.ones((width, width, 3), dtype=np.uint8) * 128 + num_blocks = min(img.shape[0] * img.shape[1] // (width * width), param) + + for _ in range(num_blocks): + r_w = random.randint(0, img.shape[1] - width) + r_h = random.randint(0, img.shape[0] - width) img[r_h:r_h + width, r_w:r_w + width, :] = block - + return img - def gaussian_noise_color(img, param): - ycbcr = bgr2ycbcr(img) / 255 - size_a = ycbcr.shape - b = (ycbcr + math.sqrt(param) * - np.random.randn(size_a[0], size_a[1], size_a[2])) * 255 - b = ycbcr2bgr(b) - img = np.clip(b, 0, 255).astype(np.uint8) + """ + Add Gaussian noise to the image. - return img + Args: + img (np.ndarray): Input image in BGR format. + param (float): Standard deviation of the Gaussian noise. + Returns: + np.ndarray: Noisy image in BGR format. + """ + noise = np.random.normal(0, math.sqrt(param), img.shape).astype(np.float32) + noisy_img = np.clip(img.astype(np.float32) + noise, 0, 255).astype(np.uint8) + return noisy_img def gaussian_blur(img, param): - img = cv2.GaussianBlur(img, (param, param), param * 1.0 / 6) - + """ + Apply Gaussian blur to the image. + + Args: + img (np.ndarray): Input image in BGR format. + param (int): Kernel size for the blur (should be odd). + + Returns: + np.ndarray: Blurred image in BGR format. + """ + if param % 2 == 0: + param += 1 # Ensure kernel size is odd + img = cv2.GaussianBlur(img, (param, param), param / 6) return img - def jpeg_compression(img, param): - h, w, _ = img.shape - s_h = h // param - s_w = w // param - img = cv2.resize(img, (s_w, s_h)) - img = cv2.resize(img, (w, h)) + """ + Compress the image using JPEG compression. + + Args: + img (np.ndarray): Input image in BGR format. + param (int): Quality factor (1-100, where 100 is highest quality). + + Returns: + np.ndarray: JPEG-compressed image in BGR format. + """ + if not (1 <= param <= 100): + raise ValueError("JPEG quality parameter must be between 1 and 100.") + + encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), param] + _, img_encoded = cv2.imencode('.jpg', img, encode_param) + img_decoded = cv2.imdecode(img_encoded, cv2.IMREAD_COLOR) + + return img_decoded - return img +def video_compression(vid_in, vid_out, param): + """ + Compress a video using FFmpeg. + Args: + vid_in (str): Input video file path. + vid_out (str): Output video file path. + param (int): Constant Rate Factor (CRF) for quality control. -def video_compression(vid_in, vid_out, param): - cmd = f'ffmpeg -i {vid_in} -crf {param} -y {vid_out}' - os.system(cmd) + Raises: + RuntimeError: If the compression command fails. + """ + cmd = f'ffmpeg -i "{vid_in}" -crf {param} -y "{vid_out}"' + if os.system(cmd) != 0: + raise RuntimeError(f'Video compression failed for {vid_in}.') - return