Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 38 additions & 42 deletions perturbation/check_video.py
Original file line number Diff line number Diff line change
@@ -1,68 +1,64 @@
import argparse
import os

import cv2


def parse_args():
parser = argparse.ArgumentParser(description='Check video information.')
parser.add_argument('--vid_in',
type=str,
required=True,
help='path to the input video')
parser.add_argument('--vid_out',
type=str,
required=True,
help='path to the output video')
args = parser.parse_args()

return args

parser.add_argument('--vid_in', type=str, required=True, help='Path to the input video')
parser.add_argument('--vid_out', type=str, required=True, help='Path to the output video')
return parser.parse_args()

def get_vid_info(vid):
w = vid.get(cv2.CAP_PROP_FRAME_WIDTH)
h = vid.get(cv2.CAP_PROP_FRAME_HEIGHT)
fps = vid.get(cv2.CAP_PROP_FPS)
fourcc = vid.get(cv2.CAP_PROP_FOURCC)
frame_count = vid.get(cv2.CAP_PROP_FRAME_COUNT)

return w, h, fps, fourcc, frame_count


def main():
args = parse_args()

vid_in = args.vid_in
vid_out = args.vid_out

# check input args
assert os.path.exists(vid_in), 'Input video does not exist.'
assert os.path.exists(vid_out), 'Output video does not exist.'
if not os.path.exists(vid_in):
raise FileNotFoundError('Input video does not exist.')

if not os.path.exists(vid_out):
raise FileNotFoundError('Output video does not exist.')

# check video info
vid1 = cv2.VideoCapture(vid_in)
w1, h1, fps1, fourcc1, frame_count1 = get_vid_info(vid1)
vid1.release()
vid2 = cv2.VideoCapture(vid_out)
w2, h2, fps2, fourcc2, frame_count2 = get_vid_info(vid2)
vid2.release()
assert w1 == w2, ('Frame width should be the same, '
f'but got {w1} in input video, {w2} in output video.')
assert h1 == h2, ('Frame height should be the same, '
f'but got {h1} in input video, {h2} in output video.')
assert fps1 == fps2, ('Video fps should be the same, but got '
f'{fps1} in input video, {fps2} in output video.')
assert fourcc1 == fourcc2, ('Video fourcc should be the same, but got '
f'{fourcc1} in input video, {fourcc2} in '
'output video.')
assert frame_count1 == frame_count2, ('Frame count should be the same, '
f'but got {frame_count1} in '
f'input video, {frame_count2} in '
'output video.')
try:
vid1 = cv2.VideoCapture(vid_in)
if not vid1.isOpened():
raise RuntimeError('Could not open input video.')

# pass all assertions, succeed
print('No problem.')
w1, h1, fps1, fourcc1, frame_count1 = get_vid_info(vid1)
vid1.release()

vid2 = cv2.VideoCapture(vid_out)
if not vid2.isOpened():
raise RuntimeError('Could not open output video.')

w2, h2, fps2, fourcc2, frame_count2 = get_vid_info(vid2)
vid2.release()

except Exception as e:
print(f"Error occurred: {e}")
return

# Check video properties
if w1 != w2:
raise ValueError(f'Frame width mismatch: {w1} in input vs {w2} in output.')
if h1 != h2:
raise ValueError(f'Frame height mismatch: {h1} in input vs {h2} in output.')
if fps1 != fps2:
raise ValueError(f'FPS mismatch: {fps1} in input vs {fps2} in output.')
if fourcc1 != fourcc2:
raise ValueError(f'FOURCC mismatch: {fourcc1} in input vs {fourcc2} in output.')
if frame_count1 != frame_count2:
raise ValueError(f'Frame count mismatch: {frame_count1} in input vs {frame_count2} in output.')

print('No problem.')

if __name__ == '__main__':
main()
168 changes: 122 additions & 46 deletions perturbation/distortions.py
Original file line number Diff line number Diff line change
@@ -1,92 +1,168 @@
import math
import os
import random

import cv2
import numpy as np


def bgr2ycbcr(img_bgr):
"""
Convert BGR image to YCbCr color space.

Args:
img_bgr (np.ndarray): Input image in BGR format.

Returns:
np.ndarray: Image converted to YCbCr format.
"""
img_bgr = img_bgr.astype(np.float32)
img_ycrcb = cv2.cvtColor(img_bgr, cv2.COLOR_BGR2YCR_CB)
img_ycbcr = img_ycrcb[:, :, (0, 2, 1)].astype(np.float32)
# to [16/255, 235/255]
img_ycbcr = img_ycrcb[:, :, (0, 2, 1)]

# Scale to [16/255, 235/255] for Y and [16/255, 240/255] for Cb and Cr
img_ycbcr[:, :, 0] = (img_ycbcr[:, :, 0] * (235 - 16) + 16) / 255.0
# to [16/255, 240/255]
img_ycbcr[:, :, 1:] = (img_ycbcr[:, :, 1:] * (240 - 16) + 16) / 255.0

return img_ycbcr


def ycbcr2bgr(img_ycbcr):
"""
Convert YCbCr image back to BGR color space.

Args:
img_ycbcr (np.ndarray): Input image in YCbCr format.

Returns:
np.ndarray: Image converted to BGR format.
"""
img_ycbcr = img_ycbcr.astype(np.float32)
# to [0, 1]

# Scale back to [0, 1]
img_ycbcr[:, :, 0] = (img_ycbcr[:, :, 0] * 255.0 - 16) / (235 - 16)
# to [0, 1]
img_ycbcr[:, :, 1:] = (img_ycbcr[:, :, 1:] * 255.0 - 16) / (240 - 16)
img_ycrcb = img_ycbcr[:, :, (0, 2, 1)].astype(np.float32)

img_ycrcb = img_ycbcr[:, :, (0, 2, 1)]
img_bgr = cv2.cvtColor(img_ycrcb, cv2.COLOR_YCR_CB2BGR)

return img_bgr


def color_saturation(img, param):
"""
Adjust the color saturation of the image.

Args:
img (np.ndarray): Input image in BGR format.
param (float): Saturation adjustment factor (1.0 = no change).

Returns:
np.ndarray: Saturated image in BGR format.
"""
ycbcr = bgr2ycbcr(img)
ycbcr[:, :, 1] = 0.5 + (ycbcr[:, :, 1] - 0.5) * param
ycbcr[:, :, 2] = 0.5 + (ycbcr[:, :, 2] - 0.5) * param
ycbcr[:, :, 1] = np.clip(0.5 + (ycbcr[:, :, 1] - 0.5) * param, 0, 1)
ycbcr[:, :, 2] = np.clip(0.5 + (ycbcr[:, :, 2] - 0.5) * param, 0, 1)

img = ycbcr2bgr(ycbcr).astype(np.uint8)

return img


def color_contrast(img, param):
img = img.astype(np.float32) * param
img = img.astype(np.uint8)
"""
Adjust the contrast of the image.

return img
Args:
img (np.ndarray): Input image in BGR format.
param (float): Contrast adjustment factor (1.0 = no change).

Returns:
np.ndarray: Contrast-adjusted image in BGR format.
"""
img = np.clip(img.astype(np.float32) * param, 0, 255).astype(np.uint8)
return img

def block_wise(img, param):
"""
Apply block-wise masking on the image.

Args:
img (np.ndarray): Input image in BGR format.
param (int): Number of blocks to mask.

Returns:
np.ndarray: Image with block masking applied.
"""
width = 8
block = np.ones((width, width, 3)).astype(int) * 128
param = min(img.shape[0], img.shape[1]) // 256 * param
for i in range(param):
r_w = random.randint(0, img.shape[1] - 1 - width)
r_h = random.randint(0, img.shape[0] - 1 - width)
block = np.ones((width, width, 3), dtype=np.uint8) * 128
num_blocks = min(img.shape[0] * img.shape[1] // (width * width), param)

for _ in range(num_blocks):
r_w = random.randint(0, img.shape[1] - width)
r_h = random.randint(0, img.shape[0] - width)
img[r_h:r_h + width, r_w:r_w + width, :] = block

return img


def gaussian_noise_color(img, param):
ycbcr = bgr2ycbcr(img) / 255
size_a = ycbcr.shape
b = (ycbcr + math.sqrt(param) *
np.random.randn(size_a[0], size_a[1], size_a[2])) * 255
b = ycbcr2bgr(b)
img = np.clip(b, 0, 255).astype(np.uint8)
"""
Add Gaussian noise to the image.

return img
Args:
img (np.ndarray): Input image in BGR format.
param (float): Standard deviation of the Gaussian noise.

Returns:
np.ndarray: Noisy image in BGR format.
"""
noise = np.random.normal(0, math.sqrt(param), img.shape).astype(np.float32)
noisy_img = np.clip(img.astype(np.float32) + noise, 0, 255).astype(np.uint8)
return noisy_img

def gaussian_blur(img, param):
img = cv2.GaussianBlur(img, (param, param), param * 1.0 / 6)

"""
Apply Gaussian blur to the image.

Args:
img (np.ndarray): Input image in BGR format.
param (int): Kernel size for the blur (should be odd).

Returns:
np.ndarray: Blurred image in BGR format.
"""
if param % 2 == 0:
param += 1 # Ensure kernel size is odd
img = cv2.GaussianBlur(img, (param, param), param / 6)
return img


def jpeg_compression(img, param):
h, w, _ = img.shape
s_h = h // param
s_w = w // param
img = cv2.resize(img, (s_w, s_h))
img = cv2.resize(img, (w, h))
"""
Compress the image using JPEG compression.

Args:
img (np.ndarray): Input image in BGR format.
param (int): Quality factor (1-100, where 100 is highest quality).

Returns:
np.ndarray: JPEG-compressed image in BGR format.
"""
if not (1 <= param <= 100):
raise ValueError("JPEG quality parameter must be between 1 and 100.")

encode_param = [int(cv2.IMWRITE_JPEG_QUALITY), param]
_, img_encoded = cv2.imencode('.jpg', img, encode_param)
img_decoded = cv2.imdecode(img_encoded, cv2.IMREAD_COLOR)

return img_decoded

return img
def video_compression(vid_in, vid_out, param):
"""
Compress a video using FFmpeg.

Args:
vid_in (str): Input video file path.
vid_out (str): Output video file path.
param (int): Constant Rate Factor (CRF) for quality control.

def video_compression(vid_in, vid_out, param):
cmd = f'ffmpeg -i {vid_in} -crf {param} -y {vid_out}'
os.system(cmd)
Raises:
RuntimeError: If the compression command fails.
"""
cmd = f'ffmpeg -i "{vid_in}" -crf {param} -y "{vid_out}"'
if os.system(cmd) != 0:
raise RuntimeError(f'Video compression failed for {vid_in}.')

return