From 22e7ed340f8e97f2e6e45756476e795df76ca1ca Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 10:34:29 +0300 Subject: [PATCH 01/16] feat: add structural dynamics filter --- .../videos/structural_dynamics_filter.py | 119 ++++++++++++++++++ pyproject.toml | 5 +- 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 DPF/filters/videos/structural_dynamics_filter.py diff --git a/DPF/filters/videos/structural_dynamics_filter.py b/DPF/filters/videos/structural_dynamics_filter.py new file mode 100644 index 0000000..9df7c1a --- /dev/null +++ b/DPF/filters/videos/structural_dynamics_filter.py @@ -0,0 +1,119 @@ +import io +from typing import Any, Optional +import cv2 +import imageio.v3 as iio +import numpy as np +import torch +from cv2.typing import MatLike +from torch import Tensor + +from DPF.types import ModalityToDataMapping + +from .video_filter import VideoFilter + +from pytorch_msssim import MS_SSIM +from time import time + + +class StructuralDynamicsFilter(VideoFilter): + """ + + Structural dynamics score from https://arxiv.org/pdf/2407.01094 + The video's current and next frame are used for MS-SSIM calculation between them. + After, the mean value of scores for the entire video is calculated on the array of scores between two frames. + + Parameters + ---------- + pass_frames: int = 12 + Number of frames to pass. pass_frames = 1, if need to process all frames. + min_frame_size: int = 512 + The size of the minimum side of the video frame after resizing + frames_batch_size: int = 16 + Batch size during one video processing + device: str = "cuda:0" + Device to use + workers: int = 16 + Number of processes to use for reading data and calculating flow scores + pbar: bool = True + Whether to use a progress bar + """ + + def __init__( + self, + pass_frames: int = 10, + min_frame_size: int = 512, + frames_batch_size: int = 16, + device: str = "cuda:0", + workers: int = 16, + pbar: bool = True, + _pbar_position: int = 0 + ): + super().__init__(pbar, _pbar_position) + self.num_workers = workers + self.device = device + + assert pass_frames >= 1, "Number of pass_frames should be greater or equal to 1." + self.pass_frames = pass_frames + self.min_frame_size = min_frame_size + self.frames_batch_size = frames_batch_size + self.model = MS_SSIM(data_range=255, size_average=False, channel=3, win_size=11) + + @property + def result_columns(self) -> list[str]: + return [f"structural_dynamics", 'structural_dynamics_max', 'structural_dynamics_min'] + + @property + def dataloader_kwargs(self) -> dict[str, Any]: + return { + "num_workers": self.num_workers, + "batch_size": 1, + "drop_last": False, + } + + def preprocess_data( + self, + modality2data: ModalityToDataMapping, + metadata: dict[str, Any] + ) -> Any: + key = metadata[self.key_column] + video_file = modality2data['video'] + + frames = iio.imread(io.BytesIO(video_file), plugin="pyav") + frames_transformed = [] + frames_transformed = [ + torch.from_numpy(frames[i]).permute(2, 0, 1).float()[None] + for i in range(self.pass_frames, len(frames), self.pass_frames) + ] + return key, frames_transformed + + def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: + df_batch_labels = self._get_dict_from_schema() + + values: list[float] = [] + for data in batch: + key, frames = data + with torch.no_grad(): + for i in range(0, len(frames)-1, self.frames_batch_size): + end = min(i+self.frames_batch_size, len(frames)-1) + current_frame = torch.cat(frames[i:end], dim=0) + next_frame = torch.cat(frames[i+1:i+self.frames_batch_size+1], dim=0) + + current_frame_cuda = current_frame.to(self.device) + next_frame_cuda = next_frame.to(self.device) + + t0 = time() + ssim = self.model( + current_frame_cuda, + next_frame_cuda + ) + values.extend(ssim.detach().cpu().numpy()) + # print('SSIM time=', time() - t0) + mean_value = np.mean(values) + mn = np.min(values) + mx = np.max(values) + + df_batch_labels[self.key_column].append(key) + df_batch_labels[self.schema[1]].append(round(mean_value, 6)) + df_batch_labels[self.schema[2]].append(round(mx, 6)) + df_batch_labels[self.schema[3]].append(round(mn, 6)) + return df_batch_labels diff --git a/pyproject.toml b/pyproject.toml index 1627740..ce445b7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -37,7 +37,10 @@ filters = [ 'salesforce-lavis', 'py3langid', 'deep_translator', - 'huggingface_hub' + 'huggingface_hub', + 'opencv-contrib-python', + 'protobuf==3.20.0', + 'pytorch_msssim' ] nsfw_detector = ['tensorflow', 'autokeras'] llava = [ From 8f26e61334b17158ae91f23ece55021446cdd53f Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 10:45:25 +0300 Subject: [PATCH 02/16] feta: add dover filter --- DPF/filters/videos/dover_filter.py | 194 +++++++++++++++++++++++++++++ pyproject.toml | 3 + 2 files changed, 197 insertions(+) create mode 100644 DPF/filters/videos/dover_filter.py diff --git a/DPF/filters/videos/dover_filter.py b/DPF/filters/videos/dover_filter.py new file mode 100644 index 0000000..b1855bd --- /dev/null +++ b/DPF/filters/videos/dover_filter.py @@ -0,0 +1,194 @@ +import os +from typing import Any +from urllib.request import urlretrieve +from io import BytesIO +import numpy as np +import torch + +from DPF.types import ModalityToDataMapping +from .video_filter import VideoFilter +import yaml + +from dover.datasets import ( + UnifiedFrameSampler, + get_single_view, +) +import decord +from decord import VideoReader +from dover.models import DOVER + +WEIGHTS_URL = {'dover': 'https://github.com/QualityAssessment/DOVER/releases/download/v0.1.0/DOVER.pth', + 'dover_plus_plus': 'https://huggingface.co/teowu/DOVER/resolve/main/DOVER_plus_plus.pth', + 'dover-mobile': 'https://github.com/QualityAssessment/DOVER/releases/download/v0.5.0/DOVER-Mobile.pth'} + +CONFIGS_URL = {'dover': 'https://raw.githubusercontent.com/teowu/DOVER-Dev/master/dover.yml', + 'dover_plus_plus': 'https://raw.githubusercontent.com/teowu/DOVER-Dev/master/dover.yml', + 'dover-mobile': 'https://raw.githubusercontent.com/teowu/DOVER-Dev/master/dover-mobile.yml'} + + +def fuse_results(results: list): + t, a = (results[0] + 0.0758) / 0.0129, (results[1] - 0.1253) / 0.0318 + # t, a = (results[0] - 0.1107) / 0.07355, (results[1] + 0.08285) / 0.03774 + x = t * 0.6104 + a * 0.3896 + return { + "aesthetic": 1 / (1 + np.exp(-a)), + "technical": 1 / (1 + np.exp(-t)), + "overall": 1 / (1 + np.exp(-x)), + } + + +def spatial_temporal_view_decomposition( + video_path, sample_types, samplers, is_train=False, augment=False, +): + video = {} + decord.bridge.set_bridge("torch") + vreader = VideoReader(video_path) + ### Avoid duplicated video decoding!!! Important!!!! + all_frame_inds = [] + frame_inds = {} + for stype in samplers: + frame_inds[stype] = samplers[stype](len(vreader), is_train) + all_frame_inds.append(frame_inds[stype]) + + ### Each frame is only decoded one time!!! + all_frame_inds = np.concatenate(all_frame_inds, 0) + frame_dict = {idx: vreader[idx] for idx in np.unique(all_frame_inds)} + + for stype in samplers: + imgs = [frame_dict[idx] for idx in frame_inds[stype]] + video[stype] = torch.stack(imgs, 0).permute(3, 0, 1, 2) + + sampled_video = {} + for stype, sopt in sample_types.items(): + sampled_video[stype] = get_single_view(video[stype], stype, **sopt) + return sampled_video, frame_inds + +class DOVERFilter(VideoFilter): + """ + DOVER model inference class to get video quality scores. + More info about the model here: https://github.com/teowu/DOVER/ + + Parameters + ---------- + weights_folder: str + Path to the folder where the weights are located. + If there are no weights, they will be downloaded automatically + model_name: str = "dover" + "dover_plus_plus", "dover" or "dover-mobile" version of the model + device: str = "cuda:0" + Device to use + workers: int = 16 + Number of processes to use for reading data and calculating flow scores + pbar: bool = True + Whether to use a progress bar + """ + + def __init__( + self, + weights_folder: str, + model_name: str = 'dover_plus_plus', + device: str = "cuda:0", + workers: int = 16, + pbar: bool = True, + _pbar_position: int = 0 + ): + super().__init__(pbar, _pbar_position) + self.num_workers = workers + self.device = device + + self.model_name = model_name + self.weights_folder = weights_folder + + # Download checkpoints and configs + path_to_model = os.path.join(self.weights_folder, self.model_name + '.pth') + if not os.path.exists(path_to_model): + os.makedirs(self.weights_folder, exist_ok=True) + urlretrieve(WEIGHTS_URL[self.model_name], path_to_model) + path_to_config = os.path.join(self.weights_folder, self.model_name + '.yml') + if not os.path.exists(path_to_config): + os.makedirs(self.weights_folder, exist_ok=True) + urlretrieve(CONFIGS_URL[self.model_name], path_to_config) + + # Load model + with open(path_to_config, "r") as f: + opt = yaml.safe_load(f) + self.model = DOVER(**opt["model"]["args"]).to(self.device) + state_dict = torch.load(path_to_model, map_location=self.device) + if self.model_name == 'dover_plus_plus': + state_dict = state_dict['state_dict'] + self.model.load_state_dict(state_dict) + + self.dopt = opt["data"]["val-l1080p"]["args"] + + @property + def result_columns(self) -> list[str]: + return [f"dover_aesthetic", f"dover_technical", f"dover_overall"] + + @property + def dataloader_kwargs(self) -> dict[str, Any]: + return { + "num_workers": self.num_workers, + "batch_size": 1, + "drop_last": False, + } + + def preprocess_data( + self, + modality2data: ModalityToDataMapping, + metadata: dict[str, Any] + ) -> Any: + key = metadata[self.key_column] + video_file = BytesIO(modality2data['video']) + + mean, std = ( + torch.FloatTensor([123.675, 116.28, 103.53]), + torch.FloatTensor([58.395, 57.12, 57.375]) + ) + + temporal_samplers = {} + for stype, sopt in self.dopt["sample_types"].items(): + if "t_frag" not in sopt: + # resized temporal sampling for TQE in DOVER + temporal_samplers[stype] = UnifiedFrameSampler( + sopt["clip_len"], sopt["num_clips"], sopt["frame_interval"] + ) + else: + # temporal sampling for AQE in DOVER + temporal_samplers[stype] = UnifiedFrameSampler( + sopt["clip_len"] // sopt["t_frag"], + sopt["t_frag"], + sopt["frame_interval"], + sopt["num_clips"], + ) + + ### View Decomposition + views, _ = spatial_temporal_view_decomposition( + video_file, self.dopt["sample_types"], temporal_samplers + ) + + for k, v in views.items(): + num_clips = self.dopt["sample_types"][k].get("num_clips", 1) + views[k] = ( + ((v.permute(1, 2, 3, 0) - mean) / std) + .permute(3, 0, 1, 2) + .reshape(v.shape[0], num_clips, -1, *v.shape[2:]) + .transpose(0, 1) + ) + + return key, views + + def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: + df_batch_labels = self._get_dict_from_schema() + + key, views = batch[0] + for k, v in views.items(): + views[k] = v.to(self.device) + + with torch.no_grad(): + results = [r.mean().item() for r in self.model(views)] + rescaled_results = fuse_results(results) + df_batch_labels[self.key_column].append(key) + df_batch_labels[self.schema[1]].append(rescaled_results['aesthetic']) + df_batch_labels[self.schema[2]].append(rescaled_results['technical']) + df_batch_labels[self.schema[3]].append(rescaled_results['overall']) + return df_batch_labels diff --git a/pyproject.toml b/pyproject.toml index ce445b7..6234f7d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,6 +72,9 @@ grounding_gpt = [ 'torchvision==0.16.2', 'torchaudio==2.1.2' ] +dover = [ + 'DOVER @ git+https://github.com/teowu/DOVER-Dev' +] [tool.hatch.version] path = "DPF/__init__.py" From 4dd24a6a1c19db62b29a80f5dfc0643941faa1f4 Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 10:52:04 +0300 Subject: [PATCH 03/16] feat: add rpknet filter --- DPF/filters/videos/rpknet_filter.py | 182 ++++++++++++++++++++++++++++ pyproject.toml | 3 +- 2 files changed, 184 insertions(+), 1 deletion(-) create mode 100644 DPF/filters/videos/rpknet_filter.py diff --git a/DPF/filters/videos/rpknet_filter.py b/DPF/filters/videos/rpknet_filter.py new file mode 100644 index 0000000..df3aaec --- /dev/null +++ b/DPF/filters/videos/rpknet_filter.py @@ -0,0 +1,182 @@ +import io +from typing import Any, Optional +import cv2 +import imageio.v3 as iio +import numpy as np +import torch +import torch.nn.functional as F +from cv2.typing import MatLike +from torch import Tensor + +from DPF.types import ModalityToDataMapping +from .video_filter import VideoFilter + +import ptlflow + +WEIGHTS_URL = 'https://dl.dropboxusercontent.com/s/4j4z58wuv8o0mfz/models.zip' + + +def transform_frame(frame: MatLike, target_size: tuple[int, int]) -> Tensor: + frame = cv2.resize(frame, dsize=(target_size[0], target_size[1]), interpolation=cv2.INTER_LINEAR) + frame_tensor = torch.from_numpy(frame).permute(2, 0, 1).float()[None] + + padder = InputPadder(frame_tensor.shape) # type: ignore + frame_tensor = padder.pad(frame_tensor)[0] + return frame_tensor + + +def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: + h, w = frame.shape[:2] + aspect_ratio = w / h + if h <= w: + new_height = min_side_size + new_width = int(aspect_ratio * new_height) + else: + new_width = min_side_size + new_height = int(new_width / aspect_ratio) + + frame = cv2.resize(frame, dsize=(new_width, new_height), interpolation=cv2.INTER_LINEAR) + frame_tensor = torch.from_numpy(frame).permute(2, 0, 1).float()[None] + + padder = InputPadder(frame_tensor.shape) # type: ignore + frame_tensor = padder.pad(frame_tensor)[0] + return frame_tensor + + +class InputPadder: + """ Pads images such that dimensions are divisible by 8 """ + + def __init__(self, dims: list[int], mode: str = 'sintel'): + self.ht, self.wd = dims[-2:] + pad_ht = (((self.ht // 8) + 1) * 8 - self.ht) % 8 + pad_wd = (((self.wd // 8) + 1) * 8 - self.wd) % 8 + if mode == 'sintel': + self._pad = [pad_wd // 2, pad_wd - pad_wd // 2, + pad_ht // 2, pad_ht - pad_ht // 2] + else: + self._pad = [pad_wd // 2, pad_wd - pad_wd // 2, + 0, pad_ht] + + def pad(self, *inputs) -> list[Tensor]: # type: ignore + return [F.pad(x, self._pad, mode='replicate') for x in inputs] + + def unpad(self, x: Tensor) -> Tensor: + ht, wd = x.shape[-2:] + c = [self._pad[2], ht - self._pad[3], self._pad[0], wd - self._pad[1]] + return x[..., c[0]:c[1], c[2]:c[3]] + + +class RPKnetOpticalFlowFilter(VideoFilter): + """ + RPKnet model inference class to get mean optical flow each video. + The video's current and next frame are used for optical flow calculation between them. + After, the mean value of optical flow for the entire video is calculated on the array of optical flow between two frames. + More info about the model here: https://github.com/hmorimitsu/ptlflow + + Parameters + ---------- + pass_frames: int = 12 + Number of frames to pass. pass_frames = 1, if need to process all frames. + num_passes: Optional[int] = None + Number of flow scores calculations in one video. Set None to calculate flow scores on all video + min_frame_size: int = 512 + The size of the minimum side of the video frame after resizing + norm: bool = True + Normalize flow or not + frames_batch_size: int = 16 + Batch size during one video processing + device: str = "cuda:0" + Device to use + workers: int = 16 + Number of processes to use for reading data and calculating flow scores + pbar: bool = True + Whether to use a progress bar + """ + + def __init__( + self, + pass_frames: int = 10, + num_passes: Optional[int] = None, + min_frame_size: int = 512, + norm: bool = True, + frames_batch_size: int = 16, + device: str = "cuda:0", + workers: int = 16, + pbar: bool = True, + _pbar_position: int = 0 + ): + super().__init__(pbar, _pbar_position) + self.num_workers = workers + self.device = device + + assert pass_frames >= 1, "Number of pass_frames should be greater or equal to 1." + self.pass_frames = pass_frames + self.num_passes = num_passes + self.min_frame_size = min_frame_size + self.frames_batch_size = frames_batch_size + self.norm = norm + + self.model = ptlflow.get_model('rpknet', pretrained_ckpt='things') + self.model.to(self.device) + self.model.eval() + + @property + def result_columns(self) -> list[str]: + return [f"optical_flow_rpk_mean", f"optical_flow_rpk_std"] + + @property + def dataloader_kwargs(self) -> dict[str, Any]: + return { + "num_workers": self.num_workers, + "batch_size": 1, + "drop_last": False, + } + + def preprocess_data( + self, + modality2data: ModalityToDataMapping, + metadata: dict[str, Any] + ) -> Any: + key = metadata[self.key_column] + video_file = modality2data['video'] + + frames = iio.imread(io.BytesIO(video_file), plugin="pyav") + max_frame_to_process = self.num_passes*self.pass_frames if self.num_passes else len(frames) + frames_transformed = [] + frames_transformed = [ + transform_keep_ar(frames[i], self.min_frame_size) + for i in range(self.pass_frames, min(max_frame_to_process+1, len(frames)), self.pass_frames) + ] + return key, frames_transformed + + def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: + df_batch_labels = self._get_dict_from_schema() + + magnitudes: list[float] = [] + for data in batch: + key, frames = data + with torch.no_grad(): + for i in range(0, len(frames)-1, self.frames_batch_size): + end = min(i+self.frames_batch_size, len(frames)-1) + current_frame = torch.cat(frames[i:end], dim=0) + next_frame = torch.cat(frames[i+1:i+self.frames_batch_size+1], dim=0) + + current_frame_cuda = current_frame.to(self.device) + next_frame_cuda = next_frame.to(self.device) + + inputs = torch.stack([current_frame_cuda, next_frame_cuda], dim=1) + + flow = self.model({'images': inputs})['flows'][:, 0] + if self.norm: + h, w = current_frame.shape[-2:] + flow[:, 0] = flow[:, 0] / w + flow[:, 1] = flow[:, 1] / h + magnitude = ((flow[:,0]**2+flow[:,1]**2)**0.5).detach().cpu().numpy() + magnitudes.extend(magnitude) + mean_value = np.mean(magnitudes) + std_value = np.std(magnitudes) + + df_batch_labels[self.key_column].append(key) + df_batch_labels[self.schema[1]].append(round(mean_value, 6)) + df_batch_labels[self.schema[2]].append(round(std_value, 6)) + return df_batch_labels diff --git a/pyproject.toml b/pyproject.toml index 6234f7d..21686ba 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -40,7 +40,8 @@ filters = [ 'huggingface_hub', 'opencv-contrib-python', 'protobuf==3.20.0', - 'pytorch_msssim' + 'pytorch_msssim', + 'ptlflow' ] nsfw_detector = ['tensorflow', 'autokeras'] llava = [ From 9321a780ccc43375f75583bd03632be544e28a27 Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 16:14:19 +0300 Subject: [PATCH 04/16] feat: add cogvlm2 filter --- DPF/filters/videos/cogvlm2_filter.py | 184 +++++++++++++++++++++++++++ pyproject.toml | 23 ++++ 2 files changed, 207 insertions(+) create mode 100644 DPF/filters/videos/cogvlm2_filter.py diff --git a/DPF/filters/videos/cogvlm2_filter.py b/DPF/filters/videos/cogvlm2_filter.py new file mode 100644 index 0000000..449d5e2 --- /dev/null +++ b/DPF/filters/videos/cogvlm2_filter.py @@ -0,0 +1,184 @@ +from io import BytesIO +from typing import Any + +from DPF.types import ModalityToDataMapping + +from .video_filter import VideoFilter +import numpy as np +import torch +from decord import VideoReader, bridge +from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig + + +prompt_templates = { + 'detailed_video': 'Describe this video and its style in a very detailed manner', + 'short_video': 'Describe this video and its style briefly', + '1_sentance': "Describe this video very shortly in 1 sentence." + } +MODEL_PATH = "THUDM/cogvlm2-video-llama3-chat" +TORCH_TYPE = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[ + 0] >= 8 else torch.float16 + + +class CogVLM2Filter(VideoFilter): + """ + CogVLM2 inference class to get captions for auto-labeling videos. + More info about the model here: https://github.com/THUDM/CogVLM2 + + Parameters + ---------- + prompt: str = '1_sentance' + Prompt for the model. + quant: int = 16 + Model quantization mode: 4, 8 or 16 + num_frames: int = 24 + Number of frames to sample from the video + device: str = "cuda:0" + Device to use + workers: int = 16 + Number of processes to use for reading data and calculating flow scores + pbar: bool = True + Whether to use a progress bar + """ + def __init__( + self, + prompt: str = '1_sentance', + quant: int = 16, + num_frames: int = 24, + temperature: float = 0.05, + max_new_tokens: int = 1024, + device: str = "cuda:0", + workers: int = 16, + pbar: bool = True, + _pbar_position: int = 0 + ): + super().__init__(pbar, _pbar_position) + self.strategy = 'chat' + self.prompt = prompt + self.tokenizer = AutoTokenizer.from_pretrained( + MODEL_PATH, + trust_remote_code=True, + # padding_side="left" + ) + self.num_frames = num_frames + + if quant == 4: + self.model = AutoModelForCausalLM.from_pretrained( + MODEL_PATH, + torch_dtype=TORCH_TYPE, + trust_remote_code=True, + quantization_config=BitsAndBytesConfig( + load_in_4bit=True, + bnb_4bit_compute_dtype=TORCH_TYPE, + ), + low_cpu_mem_usage=True, + revision='ca14f13b05f5ead425188aae3e5e725bf4905cd1' + ).eval() + elif quant == 8: + self.model = AutoModelForCausalLM.from_pretrained( + MODEL_PATH, + torch_dtype=TORCH_TYPE, + trust_remote_code=True, + quantization_config=BitsAndBytesConfig( + load_in_8bit=True, + bnb_4bit_compute_dtype=TORCH_TYPE, + ), + low_cpu_mem_usage=True, + revision='ca14f13b05f5ead425188aae3e5e725bf4905cd1' + ).eval() + else: + self.model = AutoModelForCausalLM.from_pretrained( + MODEL_PATH, + torch_dtype=TORCH_TYPE, + trust_remote_code=True, + revision='ca14f13b05f5ead425188aae3e5e725bf4905cd1' + ).eval().to(device) + + self.query = prompt_templates[prompt] + + self.num_workers = workers + self.device = device + + self.temperature = temperature + self.max_new_tokens = max_new_tokens + + @property + def result_columns(self) -> list[str]: + return [f"caption_cogvlm"] + + @property + def dataloader_kwargs(self) -> dict[str, Any]: + return { + "num_workers": self.num_workers, + "batch_size": 1, + "drop_last": False, + } + + def preprocess_data( + self, + modality2data: ModalityToDataMapping, + metadata: dict[str, Any] + ) -> Any: + key = metadata[self.key_column] + video_file = BytesIO(modality2data['video']) + video_file = self.load_video(video_file, strategy=self.strategy) + return key, video_file + + def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: + df_batch_labels = self._get_dict_from_schema() + + key, video = batch[0] + inputs = self.model.build_conversation_input_ids( + tokenizer=self.tokenizer, + query=self.query, + images=[video], + history=[], + template_version=self.strategy + ) + + inputs = { + 'input_ids': inputs['input_ids'].unsqueeze(0).to(self.device), + 'token_type_ids': inputs['token_type_ids'].unsqueeze(0).to(self.device), + 'attention_mask': inputs['attention_mask'].unsqueeze(0).to(self.device), + 'images': [[inputs['images'][0].to(self.device).to(TORCH_TYPE)]], + } + gen_kwargs = { + "max_new_tokens": self.max_new_tokens, + "pad_token_id": 128002, + "top_k": 1, + "do_sample": True, + "top_p": 0.1, + "temperature": self.temperature, + } + with torch.no_grad(): + outputs = self.model.generate(**inputs, **gen_kwargs) + outputs = outputs[:, inputs['input_ids'].shape[1]:] + response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) + df_batch_labels[self.schema[1]].extend([response]) + df_batch_labels[self.key_column].extend([key]) + return df_batch_labels + + + def load_video(self, video_path, strategy='chat'): + bridge.set_bridge('torch') + num_frames = self.num_frames + + decord_vr = VideoReader(uri=video_path) + frame_id_list = None + total_frames = len(decord_vr) + if strategy == 'base': + frame_id_list = np.linspace(0, total_frames - 1, num_frames, dtype=int) + elif strategy == 'chat': + timestamps = decord_vr.get_frame_timestamp(np.arange(total_frames)) + timestamps = [i[0] for i in timestamps] + max_second = round(max(timestamps)) + 1 + frame_id_list = [] + for second in range(max_second): + closest_num = min(timestamps, key=lambda x: abs(x - second)) + index = timestamps.index(closest_num) + frame_id_list.append(index) + if len(frame_id_list) >= num_frames: + break + video_data = decord_vr.get_batch(frame_id_list) + video_data = video_data.permute(3, 0, 1, 2) + return video_data diff --git a/pyproject.toml b/pyproject.toml index 21686ba..190918a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,29 @@ grounding_gpt = [ dover = [ 'DOVER @ git+https://github.com/teowu/DOVER-Dev' ] +cogvlm = ['pydantic==1.10.14', + 'opencv-python==4.5.5.64', + 'decord>=0.6.0', + 'torch==2.1.0', + 'torchvision== 0.16.0', + 'pytorchvideo==0.1.5', + 'transformers==4.40', + 'pillow', + 'chainlit>=1.0', + 'pydantic>=2.7.1', + 'timm>=0.9.16', + 'openai>=1.30.1', + 'loguru>=0.7.2', + 'einops', + 'sse-starlette>=2.1.0', + 'bitsandbytes>=0.43.1', + 'flask', + 'gunicorn', + 'gevent', + 'requests', + 'xformers', + 'huggingface-hub>=0.23.0', + ] [tool.hatch.version] path = "DPF/__init__.py" From a8a0baf631e104ce33cd3b59106da98e055e5bbd Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 16:43:17 +0300 Subject: [PATCH 05/16] feat: add complexity filter --- DPF/filters/images/complexity_filter.py | 120 ++++++++++++++++++++++++ pyproject.toml | 3 + 2 files changed, 123 insertions(+) create mode 100644 DPF/filters/images/complexity_filter.py diff --git a/DPF/filters/images/complexity_filter.py b/DPF/filters/images/complexity_filter.py new file mode 100644 index 0000000..00f9543 --- /dev/null +++ b/DPF/filters/images/complexity_filter.py @@ -0,0 +1,120 @@ +import os +from typing import Any +from urllib.request import urlretrieve +import numpy as np +import torch + +from ...types import ModalityToDataMapping +from DPF.utils import read_image_rgb_from_bytes +from .img_filter import ImageFilter + +from segment_anything import SamAutomaticMaskGenerator, sam_model_registry + + +WEIGHTS_URL = {'vit_h': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth', + 'vit_l': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth', + 'vit_b': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth'} + + +class ComplexityFilter(ImageFilter): + """ + Image complexity filter based on SAM: https://github.com/facebookresearch/segment-anything + + Parameters + ---------- + weights_folder: str + Folder where the weights will be stored + model_name: str = 'vit_h' + Model version to use: vit_h - huge, vit_l - large, vit_b - big + points_per_side: int = 32 + Parameter that regulates granularity of automatic segmentation + batch_size: int = 1 + Batch size during mask calculation for one image + device: str = "cuda:0" + Device to use + workers: int = 16 + Number of processes to use for reading data and calculating flow scores + pbar: bool = True + Whether to use a progress bar + """ + + def __init__( + self, + weights_folder: str, + model_name: str = 'vit_h', + points_per_side: int = 32, + workers: int = 16, + batch_size: int = 1, + device: str = "cuda:0", + pbar: bool = True, + _pbar_position: int = 0 + ): + super().__init__(pbar, _pbar_position) + self.num_workers = workers + self.batch_size = batch_size + self.device = device + + self.model_name = model_name + self.weights_folder = weights_folder + self.points_per_side = points_per_side + + # Download checkpoints + path_to_model = os.path.join(self.weights_folder, self.model_name + '.pth') + if not os.path.exists(path_to_model): + os.makedirs(self.weights_folder, exist_ok=True) + urlretrieve(WEIGHTS_URL[self.model_name], path_to_model) + + sam = sam_model_registry[self.model_name](checkpoint=path_to_model) + sam = sam.to(torch.device(self.device)) + self.mask_generator = SamAutomaticMaskGenerator( + sam, points_per_batch=batch_size, + points_per_side=points_per_side + ) + + @property + def result_columns(self) -> list[str]: + return ["complexity_num_segments", "complexity_max_segment_area", "complexity_mean_segment_area"] + + @property + def dataloader_kwargs(self) -> dict[str, Any]: + return { + "num_workers": self.num_workers, + "batch_size": 1, + "drop_last": False, + } + + def preprocess_data( + self, + modality2data: ModalityToDataMapping, + metadata: dict[str, Any] + ) -> Any: + key = metadata[self.key_column] + pil_img = read_image_rgb_from_bytes(modality2data['image']) + img = np.array(pil_img) + return key, img + + def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: + df_batch_labels = self._get_dict_from_schema() + + for data in batch: + key, img = data + h, w = img.shape[:2] + hw = h * w + with torch.no_grad(): + outputs = self.mask_generator.generate(img) + num_segments = len(outputs) + if num_segments > 0: + areas = [x['area'] for x in outputs] + bg_area = hw - np.sum(areas) + areas.append(bg_area) + max_area = np.max(areas) / hw + mean_area = np.mean(areas) / hw + else: + max_area = mean_area = 0 + + df_batch_labels["complexity_num_segments"].extend([num_segments]) + df_batch_labels["complexity_max_segment_area"].extend([max_area]) + df_batch_labels["complexity_mean_segment_area"].extend([mean_area]) + df_batch_labels[self.key_column].extend([key]) + + return df_batch_labels diff --git a/pyproject.toml b/pyproject.toml index 190918a..99a7555 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,9 @@ grounding_gpt = [ dover = [ 'DOVER @ git+https://github.com/teowu/DOVER-Dev' ] +complexity = [ + 'segment-anything @ git+https://github.com/facebookresearch/segment-anything.git' +] cogvlm = ['pydantic==1.10.14', 'opencv-python==4.5.5.64', 'decord>=0.6.0', From 4ffe584357b14dff5ed6393208cdd83046bdabb2 Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 16:56:19 +0300 Subject: [PATCH 06/16] feat: add min_frame_size argument to structural dynamics filter --- DPF/filters/videos/rpknet_filter.py | 10 +--- .../videos/structural_dynamics_filter.py | 49 +++++++++++++++++-- 2 files changed, 45 insertions(+), 14 deletions(-) diff --git a/DPF/filters/videos/rpknet_filter.py b/DPF/filters/videos/rpknet_filter.py index df3aaec..688715e 100644 --- a/DPF/filters/videos/rpknet_filter.py +++ b/DPF/filters/videos/rpknet_filter.py @@ -13,16 +13,8 @@ import ptlflow -WEIGHTS_URL = 'https://dl.dropboxusercontent.com/s/4j4z58wuv8o0mfz/models.zip' - - -def transform_frame(frame: MatLike, target_size: tuple[int, int]) -> Tensor: - frame = cv2.resize(frame, dsize=(target_size[0], target_size[1]), interpolation=cv2.INTER_LINEAR) - frame_tensor = torch.from_numpy(frame).permute(2, 0, 1).float()[None] - padder = InputPadder(frame_tensor.shape) # type: ignore - frame_tensor = padder.pad(frame_tensor)[0] - return frame_tensor +WEIGHTS_URL = 'https://dl.dropboxusercontent.com/s/4j4z58wuv8o0mfz/models.zip' def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: diff --git a/DPF/filters/videos/structural_dynamics_filter.py b/DPF/filters/videos/structural_dynamics_filter.py index 9df7c1a..b83aca3 100644 --- a/DPF/filters/videos/structural_dynamics_filter.py +++ b/DPF/filters/videos/structural_dynamics_filter.py @@ -1,20 +1,61 @@ import io -from typing import Any, Optional +from typing import Any import cv2 import imageio.v3 as iio import numpy as np import torch from cv2.typing import MatLike from torch import Tensor +import torch.nn.functional as F from DPF.types import ModalityToDataMapping from .video_filter import VideoFilter from pytorch_msssim import MS_SSIM -from time import time +def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: + h, w = frame.shape[:2] + aspect_ratio = w / h + if h <= w: + new_height = min_side_size + new_width = int(aspect_ratio * new_height) + else: + new_width = min_side_size + new_height = int(new_width / aspect_ratio) + + frame = cv2.resize(frame, dsize=(new_width, new_height), interpolation=cv2.INTER_LINEAR) + frame_tensor = torch.from_numpy(frame).permute(2, 0, 1).float()[None] + + padder = InputPadder(frame_tensor.shape) # type: ignore + frame_tensor = padder.pad(frame_tensor)[0] + return frame_tensor + + +class InputPadder: + """ Pads images such that dimensions are divisible by 8 """ + + def __init__(self, dims: list[int], mode: str = 'sintel'): + self.ht, self.wd = dims[-2:] + pad_ht = (((self.ht // 8) + 1) * 8 - self.ht) % 8 + pad_wd = (((self.wd // 8) + 1) * 8 - self.wd) % 8 + if mode == 'sintel': + self._pad = [pad_wd // 2, pad_wd - pad_wd // 2, + pad_ht // 2, pad_ht - pad_ht // 2] + else: + self._pad = [pad_wd // 2, pad_wd - pad_wd // 2, + 0, pad_ht] + + def pad(self, *inputs) -> list[Tensor]: # type: ignore + return [F.pad(x, self._pad, mode='replicate') for x in inputs] + + def unpad(self, x: Tensor) -> Tensor: + ht, wd = x.shape[-2:] + c = [self._pad[2], ht - self._pad[3], self._pad[0], wd - self._pad[1]] + return x[..., c[0]:c[1], c[2]:c[3]] + + class StructuralDynamicsFilter(VideoFilter): """ @@ -81,7 +122,7 @@ def preprocess_data( frames = iio.imread(io.BytesIO(video_file), plugin="pyav") frames_transformed = [] frames_transformed = [ - torch.from_numpy(frames[i]).permute(2, 0, 1).float()[None] + transform_keep_ar(frames[i], self.min_frame_size) for i in range(self.pass_frames, len(frames), self.pass_frames) ] return key, frames_transformed @@ -101,13 +142,11 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: current_frame_cuda = current_frame.to(self.device) next_frame_cuda = next_frame.to(self.device) - t0 = time() ssim = self.model( current_frame_cuda, next_frame_cuda ) values.extend(ssim.detach().cpu().numpy()) - # print('SSIM time=', time() - t0) mean_value = np.mean(values) mn = np.min(values) mx = np.max(values) From fc4aacf1cb3af322cf460622d0522eb16c3d598a Mon Sep 17 00:00:00 2001 From: zhirnov-m Date: Wed, 24 Jul 2024 17:44:10 +0300 Subject: [PATCH 07/16] feat: regex caption cleaner for cogvlm2 filter --- DPF/filters/videos/cogvlm2_filter.py | 39 +++++++++++++++++++++++++++- 1 file changed, 38 insertions(+), 1 deletion(-) diff --git a/DPF/filters/videos/cogvlm2_filter.py b/DPF/filters/videos/cogvlm2_filter.py index 449d5e2..70a750e 100644 --- a/DPF/filters/videos/cogvlm2_filter.py +++ b/DPF/filters/videos/cogvlm2_filter.py @@ -8,6 +8,7 @@ import torch from decord import VideoReader, bridge from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig +import re prompt_templates = { @@ -19,6 +20,40 @@ TORCH_TYPE = torch.bfloat16 if torch.cuda.is_available() and torch.cuda.get_device_capability()[ 0] >= 8 else torch.float16 +compiled_regexs = [ + (re.compile(r'the video (also )?is '), ''), + (re.compile(r'the video (also )?features '), ''), + (re.compile(r'the video (also )?shows '), ''), + (re.compile(r'the video (also )?depicts '), ''), + (re.compile(r'the video (also )?showcases '), ''), + (re.compile(r'the video (also )?captures '), ''), + (re.compile(r'the video (also )?provides '), ''), + (re.compile(r'the video (also )?showcases '), ''), + (re.compile(r'throughout the video, '), ''), +] + + +def clean_with_regex(caption): + lower_caption = str(caption).lower().strip() + for re_compiled, replacement in compiled_regexs: + iterator = reversed(list(re_compiled.finditer(lower_caption))) + for match in iterator: + pos = list(match.span()) + caption = caption[:pos[0]] + replacement + caption[pos[1]:] + lower_caption = str(caption).lower().strip() + + if caption.count('-') > 2: + split_captions = [] + for split_caption in caption.split(): + if split_caption.count('-') > 2: + split_caption = re.sub(r'-', ' ', split_caption) + split_captions.append(split_caption) + caption = ' '.join(split_captions) + + caption = caption.strip('—-:/+=|@#&*') + + return caption.strip() + class CogVLM2Filter(VideoFilter): """ @@ -104,7 +139,7 @@ def __init__( @property def result_columns(self) -> list[str]: - return [f"caption_cogvlm"] + return ["caption_cogvlm", "caption_cogvlm_clean"] @property def dataloader_kwargs(self) -> dict[str, Any]: @@ -154,7 +189,9 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: outputs = self.model.generate(**inputs, **gen_kwargs) outputs = outputs[:, inputs['input_ids'].shape[1]:] response = self.tokenizer.decode(outputs[0], skip_special_tokens=True) + response_clean = clean_with_regex(response) df_batch_labels[self.schema[1]].extend([response]) + df_batch_labels[self.schema[2]].extend([response_clean]) df_batch_labels[self.key_column].extend([key]) return df_batch_labels From 563cc092bb8ce23ff0661b3d3e9aaf7a942c513e Mon Sep 17 00:00:00 2001 From: funnylittleman Date: Fri, 26 Jul 2024 11:25:28 +0300 Subject: [PATCH 08/16] feat: reading videos from .tar files --- DPF/connectors/s3_connector.py | 27 ++++++++++++++++++++++----- DPF/dataset_reader.py | 13 ++++++++++++- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/DPF/connectors/s3_connector.py b/DPF/connectors/s3_connector.py index fdb9c8d..e176aa4 100644 --- a/DPF/connectors/s3_connector.py +++ b/DPF/connectors/s3_connector.py @@ -37,12 +37,29 @@ def _preprocess_filepath(path: str) -> str: def read_file(self, filepath: str, binary: bool) -> io.BytesIO: mode = "rb" if binary else "rt" - with self.s3client.open(self._preprocess_filepath(filepath), mode=mode) as f: + if '.tar' in filepath and '?tar_offset=' in filepath and '?size=' in filepath: + filepath = self._preprocess_filepath(filepath) + offset = filepath.split('?tar_offset=')[1].split('?size=')[0] + size = filepath.split('?size=')[1] + filepath = filepath.split('?')[0] + offset = int(offset) + size = int(size) + s3 = self.s3client._get_client() + range_header = "bytes=%d-%d" % (offset, offset + size - 1) + bucket_name = filepath.split('/')[0] + tar_key = filepath.replace(bucket_name, '')[1:] + video_obj = s3.get_object(Bucket=bucket_name, Key=tar_key, Range=range_header) + res = video_obj["Body"].read() if mode == "rb": - res = io.BytesIO(f.read()) + res = io.BytesIO(res) res.seek(0) - else: - res = f.read() + else: + with self.s3client.open(self._preprocess_filepath(filepath), mode=mode) as f: + if mode == "rb": + res = io.BytesIO(f.read()) + res.seek(0) + else: + res = f.read() return res def save_file( @@ -78,4 +95,4 @@ def join(self, *args: str) -> str: path += arg else: path += arg+'/' - return path[:-1] + return path[:-1] \ No newline at end of file diff --git a/DPF/dataset_reader.py b/DPF/dataset_reader.py index 3025248..0d43652 100644 --- a/DPF/dataset_reader.py +++ b/DPF/dataset_reader.py @@ -2,6 +2,7 @@ from typing import Optional, Union import pandas as pd +import numpy as np from tqdm.contrib.concurrent import process_map from DPF.configs import ( @@ -42,6 +43,7 @@ def __init__(self, connector: Optional[Connector] = None): if connector is None: connector = LocalConnector() self.connector = connector + self.local_connector = LocalConnector() def _read_and_validate_dataframes( self, @@ -270,7 +272,10 @@ def read_files( Instance of FilesDatasetProcessor dataset """ table_path = config.table_path.rstrip("/") - df = self.connector.read_dataframe(table_path) + try: + df = self.connector.read_dataframe(table_path) + except: + df = self.local_connector.read_dataframe(table_path) required_columns = list(config.user_column2default_column.keys()) column_set = set(df.columns.tolist()) @@ -288,6 +293,12 @@ def read_files( path_col = datatype.modality.path_column df[path_col] = df[path_col].apply(lambda x: self.connector.join(config.base_path, x)) + # process .tar files with offsets + for i, row in df.iterrows(): + if isinstance(df.at[i,'tar_offset'], np.int64) and isinstance(df.at[i,'size'], np.int64): + df.at[i, path_col] += f'?tar_offset={df.at[i,"tar_offset"]}?size={df.at[i,"size"]}' + + return FilesDatasetProcessor( connector=self.connector, df=df, From ff44ea7f60a5ff9997b163889f635be6c7d58bb4 Mon Sep 17 00:00:00 2001 From: funnylittleman Date: Fri, 26 Jul 2024 11:38:11 +0300 Subject: [PATCH 09/16] fix complex filter, multigpu filter and image filter adapter to run in pipeline --- DPF/filters/complex_filter.py | 13 +++- DPF/filters/multigpu_filter.py | 79 ++++++++-------------- DPF/filters/videos/image_filter_adapter.py | 12 ++-- 3 files changed, 48 insertions(+), 56 deletions(-) diff --git a/DPF/filters/complex_filter.py b/DPF/filters/complex_filter.py index eed1086..f5024b3 100644 --- a/DPF/filters/complex_filter.py +++ b/DPF/filters/complex_filter.py @@ -16,14 +16,21 @@ class ComplexDataFilter(DataFilter): def __init__( self, - datafilters: list[DataFilter], + datafilters, + kwargs, workers: int, pbar: bool = True, - _pbar_position: int = 0 + _pbar_position: int = 0, + device = 'cuda:0' ): super().__init__(pbar, _pbar_position) - self.datafilters = datafilters + self.datafilters = [] self.workers = workers + self.device = device + + for filter, kwarg in zip(datafilters, kwargs): + kwarg['device'] = self.device + self.datafilters.append(filter(**kwarg)) assert len(self.datafilters) > 0 assert all( diff --git a/DPF/filters/multigpu_filter.py b/DPF/filters/multigpu_filter.py index 50af5e3..e833e6f 100644 --- a/DPF/filters/multigpu_filter.py +++ b/DPF/filters/multigpu_filter.py @@ -1,6 +1,6 @@ import multiprocessing from multiprocessing import Manager -from typing import Any, Callable, Optional, Union +from typing import Any, Union import numpy as np import pandas as pd @@ -13,34 +13,6 @@ from .data_filter import DataFilter -# TODO(review) - один вызов в MultiGPUFilter, нужно перенести его внутрь класса -def run_one_process( - config: DatasetConfig, - connector: Connector, - df: pd.DataFrame, - i: int, - index: pd.Series, - results: list[pd.DataFrame], - filter_class: Optional[type[DataFilter]], - filter_kwargs: Optional[dict[str, Any]], - datafilter_init_fn: Optional[Callable[[int, Union[str, torch.device]], DataFilter]], - device: Union[str, torch.device], - filter_run_kwargs: dict[str, Any] -) -> None: - reader = DatasetReader(connector=connector) - processor = reader.from_df(config, df) - if datafilter_init_fn: - datafilter = datafilter_init_fn(i, device) - else: - datafilter = filter_class(**filter_kwargs, _pbar_position=i, device=device) # type: ignore - - datafilter._created_by_multigpu_data_filter = True - processor.apply_data_filter(datafilter, **filter_run_kwargs) - res = processor.df - res.set_index(index, inplace=True) - results.append(res) - - class MultiGPUDataFilter: """ Class for multi-gpu inference @@ -49,36 +21,31 @@ class MultiGPUDataFilter: def __init__( self, devices: list[Union[torch.device, str]], - datafilter_class: Optional[type[DataFilter]] = None, - datafilter_params: Optional[dict[str, Any]] = None, - datafilter_init_fn: Optional[Callable[[int, Union[str, torch.device]], DataFilter]] = None + datafilter_class: type[DataFilter], + datafilter_params: dict[str, Any] ): """ Parameters ---------- devices: list[Union[torch.device, str]] List of devices to run datafilter on - datafilter_class: Optional[type[DataFilter]] = None + datafilter_class: type[DataFilter] Class of datafilter to use - datafilter_params: Optional[dict[str, Any]] = None + datafilter_params: dict[str, Any] Parameters for datafilter_class initialization - datafilter_init_fn: Optional[Callable[[int, Union[str, torch.device]], DataFilter]] = None - Initialization function for a datafilter. Takes _pbar_position as first arg and device as a second arg """ self.filter_class = datafilter_class self.filter_params = datafilter_params - self.datafilter_init_fn = datafilter_init_fn - assert self.datafilter_init_fn or self.filter_class, "One method of filter initialization should be specified" self.devices = devices self.num_parts = len(devices) + self.filters = [] + for i in range(self.num_parts): + self.filters.append(datafilter_class(**datafilter_params, _pbar_position=i, device=devices[i])) + self.filters[i]._created_by_multigpu_data_filter = True + # getting result columns names - if self.datafilter_init_fn: - datafilter = self.datafilter_init_fn(0, devices[0]) - else: - datafilter = self.filter_class(**self.filter_params, device=devices[0]) # type: ignore - self._result_columns = datafilter.result_columns - del datafilter + self._result_columns = self.filters[0].result_columns torch.cuda.empty_cache() @property @@ -124,10 +91,6 @@ def run( i, df_splits[i].index, # type: ignore shared_results, - self.filter_class, - self.filter_params, - self.datafilter_init_fn, - self.devices[i], filter_run_kwargs ) ) @@ -135,7 +98,7 @@ def run( processes = [] context = multiprocessing.get_context('spawn') for param in params: - p = context.Process(target=run_one_process, args=param) + p = context.Process(target=self.run_one_process, args=param) p.start() processes.append(p) @@ -145,3 +108,21 @@ def run( res_df = pd.concat(shared_results) res_df.sort_index(inplace=True) return res_df + + + def run_one_process( + self, + config: DatasetConfig, + connector: Connector, + df: pd.DataFrame, + i: int, + index: pd.Series, + results: list[pd.DataFrame], + filter_run_kwargs: dict[str, Any] + ) -> None: + reader = DatasetReader(connector=connector) + processor = reader.from_df(config, df) + processor.apply_data_filter(self.filters[i], **filter_run_kwargs) + res = processor.df + res.set_index(index, inplace=True) + results.append(res) diff --git a/DPF/filters/videos/image_filter_adapter.py b/DPF/filters/videos/image_filter_adapter.py index 3684412..d5c376f 100644 --- a/DPF/filters/videos/image_filter_adapter.py +++ b/DPF/filters/videos/image_filter_adapter.py @@ -30,14 +30,16 @@ class ImageFilterAdapter(VideoFilter): def __init__( self, - image_filter: ImageFilter, + imagefilter_class: type[ImageFilter], + imagefilter_kwargs: dict[str, Any], video_frame: float, + device: str = "cuda:0", workers: int = 8, pbar: bool = True, _pbar_position: int = 0 ): super().__init__(pbar, _pbar_position) - self.image_filter = image_filter + self.image_filter = imagefilter_class(**imagefilter_kwargs, device=device) self.video_frame = video_frame self.num_workers = workers @@ -115,16 +117,18 @@ class MultiFrameImageFilterAdapter(VideoFilter): def __init__( self, - image_filter: ImageFilter, + imagefilter_class: type[ImageFilter], + imagefilter_kwargs: dict[str, Any], video_frames: list[float], reduce_results_fn: Callable[[str, list[Any]], Any], + device: str = "cuda:0", batch_size: int = 8, workers: int = 8, pbar: bool = True, _pbar_position: int = 0 ): super().__init__(pbar, _pbar_position) - self.image_filter = image_filter + self.image_filter = imagefilter_class(**imagefilter_kwargs, device=device) self.video_frames = video_frames self.reduce_results_fn = reduce_results_fn self.batch_size = batch_size From 2a7fb4d39a5ad85bbc464c16ffe48006f453f1a6 Mon Sep 17 00:00:00 2001 From: funnylittleman Date: Fri, 26 Jul 2024 11:42:59 +0300 Subject: [PATCH 10/16] feat: constant_gpu parameter to avoid filter reloading --- DPF/pipelines/filter_pipeline.py | 19 +++++++++++-------- DPF/pipelines/pipeline_stages.py | 13 +++++++++++-- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/DPF/pipelines/filter_pipeline.py b/DPF/pipelines/filter_pipeline.py index 91d2825..b174b2d 100644 --- a/DPF/pipelines/filter_pipeline.py +++ b/DPF/pipelines/filter_pipeline.py @@ -1,7 +1,8 @@ from dataclasses import dataclass -from typing import Any, Callable, Optional +from typing import Any, Callable, Optional, Union import pandas as pd +import torch from DPF.filters import ColumnFilter, DataFilter from DPF.filters.multigpu_filter import MultiGPUDataFilter @@ -38,12 +39,13 @@ def __init__(self, pipeline_name: str, logs_dir: Optional[str] = None): def add_datafilter( self, - datafilter: type[DataFilter], - datafilter_kwargs: dict[str, Any], + datafilter: Optional[type[DataFilter]] = None, + datafilter_kwargs: dict[str, Any] = None, devices: Optional[list[str]] = None, processor_apply_kwargs: Optional[dict[str, Any]] = None, on_error: OnErrorOptions = "stop", - skip_if_columns_exist: bool = True + skip_if_columns_exist: bool = True, + constant_gpu: bool = False ) -> None: if processor_apply_kwargs is None: processor_apply_kwargs = {} @@ -52,7 +54,7 @@ def add_datafilter( stage = FilterPipelineStage( 'datafilter', filter_class=datafilter, filter_kwargs=datafilter_kwargs, processor_apply_kwargs=processor_apply_kwargs, - skip_if_columns_exist=skip_if_columns_exist + skip_if_columns_exist=skip_if_columns_exist, constant_gpu=constant_gpu ) elif len(devices) == 0: new_kwargs = datafilter_kwargs.copy() @@ -60,7 +62,7 @@ def add_datafilter( stage = FilterPipelineStage( 'datafilter', filter_class=datafilter, filter_kwargs=new_kwargs, processor_apply_kwargs=processor_apply_kwargs, - skip_if_columns_exist=skip_if_columns_exist + skip_if_columns_exist=skip_if_columns_exist, constant_gpu=constant_gpu ) else: stage = FilterPipelineStage( @@ -69,9 +71,10 @@ def add_datafilter( "devices": devices, "datafilter_class": datafilter, "datafilter_params": datafilter_kwargs - }, + }, processor_apply_kwargs=processor_apply_kwargs, - skip_if_columns_exist=skip_if_columns_exist + skip_if_columns_exist=skip_if_columns_exist, + constant_gpu=constant_gpu ) self.stages.append( diff --git a/DPF/pipelines/pipeline_stages.py b/DPF/pipelines/pipeline_stages.py index dc1f01b..c2068ea 100644 --- a/DPF/pipelines/pipeline_stages.py +++ b/DPF/pipelines/pipeline_stages.py @@ -68,7 +68,8 @@ def __init__( filter_class: Union[type[DataFilter], type[ColumnFilter], type[MultiGPUDataFilter]], filter_kwargs: dict[str, Any], processor_apply_kwargs: Optional[dict[str, Any]] = None, - skip_if_columns_exist: bool = True + skip_if_columns_exist: bool = True, + constant_gpu: bool = False ): self.filter_type = filter_type self.filter_class = filter_class @@ -79,13 +80,21 @@ def __init__( self.processor_apply_kwargs = {} self.skip_if_columns_exist = skip_if_columns_exist + + self.constant_gpu = constant_gpu + if constant_gpu: + self.filter_obj = self.filter_class(**self.filter_kwargs) + @property def stage_name(self) -> str: return f"FilterPipelineStage(filter_class={self.filter_class}, filter_kwargs={self.filter_kwargs})" def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None: - filter_obj = self.filter_class(**self.filter_kwargs) + if self.constant_gpu: + filter_obj = self.filter_obj + else: + filter_obj = self.filter_class(**self.filter_kwargs) columns_to_be_added = filter_obj.result_columns columns_intersection = set(processor.columns).intersection(set(columns_to_be_added)) From fba9bb8adac72f63a75cded7b1bb74ab2d77ad08 Mon Sep 17 00:00:00 2001 From: funnylittleman Date: Fri, 26 Jul 2024 11:50:41 +0300 Subject: [PATCH 11/16] fix: remove rounding in motion filters --- DPF/filters/videos/farneback_filter.py | 2 +- DPF/filters/videos/raft_filter.py | 2 +- DPF/filters/videos/rpknet_filter.py | 8 +++----- DPF/filters/videos/structural_dynamics_filter.py | 6 +++--- 4 files changed, 8 insertions(+), 10 deletions(-) diff --git a/DPF/filters/videos/farneback_filter.py b/DPF/filters/videos/farneback_filter.py index 760abc4..3ef975a 100644 --- a/DPF/filters/videos/farneback_filter.py +++ b/DPF/filters/videos/farneback_filter.py @@ -155,5 +155,5 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: for data in batch: key, mean_optical_flow = data df_batch_labels[self.key_column].append(key) - df_batch_labels[self.result_columns[0]].append(round(mean_optical_flow, 3)) + df_batch_labels[self.result_columns[0]].append(mean_optical_flow) return df_batch_labels diff --git a/DPF/filters/videos/raft_filter.py b/DPF/filters/videos/raft_filter.py index 6193e7b..72e38c8 100644 --- a/DPF/filters/videos/raft_filter.py +++ b/DPF/filters/videos/raft_filter.py @@ -196,5 +196,5 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: mean_value = np.mean(mean_magnitudes) df_batch_labels[self.key_column].append(key) - df_batch_labels[self.schema[1]].append(round(mean_value, 3)) + df_batch_labels[self.schema[1]].append(mean_value) return df_batch_labels diff --git a/DPF/filters/videos/rpknet_filter.py b/DPF/filters/videos/rpknet_filter.py index 688715e..f6c96fb 100644 --- a/DPF/filters/videos/rpknet_filter.py +++ b/DPF/filters/videos/rpknet_filter.py @@ -114,7 +114,7 @@ def __init__( @property def result_columns(self) -> list[str]: - return [f"optical_flow_rpk_mean", f"optical_flow_rpk_std"] + return [f"optical_flow_rpk_mean"] @property def dataloader_kwargs(self) -> dict[str, Any]: @@ -144,9 +144,9 @@ def preprocess_data( def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: df_batch_labels = self._get_dict_from_schema() - magnitudes: list[float] = [] for data in batch: key, frames = data + magnitudes: list[float] = [] with torch.no_grad(): for i in range(0, len(frames)-1, self.frames_batch_size): end = min(i+self.frames_batch_size, len(frames)-1) @@ -166,9 +166,7 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: magnitude = ((flow[:,0]**2+flow[:,1]**2)**0.5).detach().cpu().numpy() magnitudes.extend(magnitude) mean_value = np.mean(magnitudes) - std_value = np.std(magnitudes) df_batch_labels[self.key_column].append(key) - df_batch_labels[self.schema[1]].append(round(mean_value, 6)) - df_batch_labels[self.schema[2]].append(round(std_value, 6)) + df_batch_labels[self.schema[1]].append(mean_value) return df_batch_labels diff --git a/DPF/filters/videos/structural_dynamics_filter.py b/DPF/filters/videos/structural_dynamics_filter.py index b83aca3..e791b80 100644 --- a/DPF/filters/videos/structural_dynamics_filter.py +++ b/DPF/filters/videos/structural_dynamics_filter.py @@ -152,7 +152,7 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: mx = np.max(values) df_batch_labels[self.key_column].append(key) - df_batch_labels[self.schema[1]].append(round(mean_value, 6)) - df_batch_labels[self.schema[2]].append(round(mx, 6)) - df_batch_labels[self.schema[3]].append(round(mn, 6)) + df_batch_labels[self.schema[1]].append(mean_value) + df_batch_labels[self.schema[2]].append(mx) + df_batch_labels[self.schema[3]].append(mn) return df_batch_labels From 5d778f97c4d247be2dfa2a2d5a1c6091f09da88a Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 29 Jul 2024 18:06:35 +0300 Subject: [PATCH 12/16] update pyproject --- pyproject.toml | 49 +++++++++++++++++++++++++------------------------ 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 640f803..26f899c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,7 +41,7 @@ filters = [ 'opencv-contrib-python', 'protobuf==3.20.0', 'pytorch_msssim', - 'ptlflow' + 'ptlflow', 'videohash' ] nsfw_detector = ['tensorflow', 'autokeras'] @@ -80,29 +80,30 @@ dover = [ complexity = [ 'segment-anything @ git+https://github.com/facebookresearch/segment-anything.git' ] -cogvlm = ['pydantic==1.10.14', - 'opencv-python==4.5.5.64', - 'decord>=0.6.0', - 'torch==2.1.0', - 'torchvision== 0.16.0', - 'pytorchvideo==0.1.5', - 'transformers==4.40', - 'pillow', - 'chainlit>=1.0', - 'pydantic>=2.7.1', - 'timm>=0.9.16', - 'openai>=1.30.1', - 'loguru>=0.7.2', - 'einops', - 'sse-starlette>=2.1.0', - 'bitsandbytes>=0.43.1', - 'flask', - 'gunicorn', - 'gevent', - 'requests', - 'xformers', - 'huggingface-hub>=0.23.0', - ] +cogvlm = [ + 'pydantic==1.10.14', + 'opencv-python==4.5.5.64', + 'decord>=0.6.0', + 'torch==2.1.0', + 'torchvision== 0.16.0', + 'pytorchvideo==0.1.5', + 'transformers==4.40', + 'pillow', + 'chainlit>=1.0', + 'pydantic>=2.7.1', + 'timm>=0.9.16', + 'openai>=1.30.1', + 'loguru>=0.7.2', + 'einops', + 'sse-starlette>=2.1.0', + 'bitsandbytes>=0.43.1', + 'flask', + 'gunicorn', + 'gevent', + 'requests', + 'xformers', + 'huggingface-hub>=0.23.0', +] [tool.hatch.version] path = "DPF/__init__.py" From c6281f470f13bd0e72ef8bb1d27cd86cfd88a08f Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 29 Jul 2024 18:38:14 +0300 Subject: [PATCH 13/16] fix: fix code style --- DPF/connectors/s3_connector.py | 27 +++--------- DPF/dataset_reader.py | 13 +----- DPF/filters/complex_filter.py | 13 ++---- DPF/filters/images/complexity_filter.py | 14 +++---- DPF/filters/multigpu_filter.py | 31 ++------------ DPF/filters/videos/cogvlm2_filter.py | 41 ++++++++++--------- DPF/filters/videos/dover_filter.py | 37 ++++++++--------- DPF/filters/videos/farneback_filter.py | 2 +- DPF/filters/videos/image_filter_adapter.py | 12 ++---- DPF/filters/videos/rpknet_filter.py | 12 +++--- .../videos/structural_dynamics_filter.py | 10 ++--- DPF/pipelines/filter_pipeline.py | 19 ++++----- DPF/pipelines/pipeline_stages.py | 2 +- 13 files changed, 84 insertions(+), 149 deletions(-) diff --git a/DPF/connectors/s3_connector.py b/DPF/connectors/s3_connector.py index e176aa4..fdb9c8d 100644 --- a/DPF/connectors/s3_connector.py +++ b/DPF/connectors/s3_connector.py @@ -37,29 +37,12 @@ def _preprocess_filepath(path: str) -> str: def read_file(self, filepath: str, binary: bool) -> io.BytesIO: mode = "rb" if binary else "rt" - if '.tar' in filepath and '?tar_offset=' in filepath and '?size=' in filepath: - filepath = self._preprocess_filepath(filepath) - offset = filepath.split('?tar_offset=')[1].split('?size=')[0] - size = filepath.split('?size=')[1] - filepath = filepath.split('?')[0] - offset = int(offset) - size = int(size) - s3 = self.s3client._get_client() - range_header = "bytes=%d-%d" % (offset, offset + size - 1) - bucket_name = filepath.split('/')[0] - tar_key = filepath.replace(bucket_name, '')[1:] - video_obj = s3.get_object(Bucket=bucket_name, Key=tar_key, Range=range_header) - res = video_obj["Body"].read() + with self.s3client.open(self._preprocess_filepath(filepath), mode=mode) as f: if mode == "rb": - res = io.BytesIO(res) + res = io.BytesIO(f.read()) res.seek(0) - else: - with self.s3client.open(self._preprocess_filepath(filepath), mode=mode) as f: - if mode == "rb": - res = io.BytesIO(f.read()) - res.seek(0) - else: - res = f.read() + else: + res = f.read() return res def save_file( @@ -95,4 +78,4 @@ def join(self, *args: str) -> str: path += arg else: path += arg+'/' - return path[:-1] \ No newline at end of file + return path[:-1] diff --git a/DPF/dataset_reader.py b/DPF/dataset_reader.py index 0d43652..3025248 100644 --- a/DPF/dataset_reader.py +++ b/DPF/dataset_reader.py @@ -2,7 +2,6 @@ from typing import Optional, Union import pandas as pd -import numpy as np from tqdm.contrib.concurrent import process_map from DPF.configs import ( @@ -43,7 +42,6 @@ def __init__(self, connector: Optional[Connector] = None): if connector is None: connector = LocalConnector() self.connector = connector - self.local_connector = LocalConnector() def _read_and_validate_dataframes( self, @@ -272,10 +270,7 @@ def read_files( Instance of FilesDatasetProcessor dataset """ table_path = config.table_path.rstrip("/") - try: - df = self.connector.read_dataframe(table_path) - except: - df = self.local_connector.read_dataframe(table_path) + df = self.connector.read_dataframe(table_path) required_columns = list(config.user_column2default_column.keys()) column_set = set(df.columns.tolist()) @@ -293,12 +288,6 @@ def read_files( path_col = datatype.modality.path_column df[path_col] = df[path_col].apply(lambda x: self.connector.join(config.base_path, x)) - # process .tar files with offsets - for i, row in df.iterrows(): - if isinstance(df.at[i,'tar_offset'], np.int64) and isinstance(df.at[i,'size'], np.int64): - df.at[i, path_col] += f'?tar_offset={df.at[i,"tar_offset"]}?size={df.at[i,"size"]}' - - return FilesDatasetProcessor( connector=self.connector, df=df, diff --git a/DPF/filters/complex_filter.py b/DPF/filters/complex_filter.py index f5024b3..eed1086 100644 --- a/DPF/filters/complex_filter.py +++ b/DPF/filters/complex_filter.py @@ -16,21 +16,14 @@ class ComplexDataFilter(DataFilter): def __init__( self, - datafilters, - kwargs, + datafilters: list[DataFilter], workers: int, pbar: bool = True, - _pbar_position: int = 0, - device = 'cuda:0' + _pbar_position: int = 0 ): super().__init__(pbar, _pbar_position) - self.datafilters = [] + self.datafilters = datafilters self.workers = workers - self.device = device - - for filter, kwarg in zip(datafilters, kwargs): - kwarg['device'] = self.device - self.datafilters.append(filter(**kwarg)) assert len(self.datafilters) > 0 assert all( diff --git a/DPF/filters/images/complexity_filter.py b/DPF/filters/images/complexity_filter.py index 00f9543..fda60f5 100644 --- a/DPF/filters/images/complexity_filter.py +++ b/DPF/filters/images/complexity_filter.py @@ -1,15 +1,15 @@ import os from typing import Any from urllib.request import urlretrieve + import numpy as np import torch +from segment_anything import SamAutomaticMaskGenerator, sam_model_registry -from ...types import ModalityToDataMapping from DPF.utils import read_image_rgb_from_bytes -from .img_filter import ImageFilter - -from segment_anything import SamAutomaticMaskGenerator, sam_model_registry +from ...types import ModalityToDataMapping +from .img_filter import ImageFilter WEIGHTS_URL = {'vit_h': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth', 'vit_l': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth', @@ -57,7 +57,7 @@ def __init__( self.model_name = model_name self.weights_folder = weights_folder self.points_per_side = points_per_side - + # Download checkpoints path_to_model = os.path.join(self.weights_folder, self.model_name + '.pth') if not os.path.exists(path_to_model): @@ -67,7 +67,7 @@ def __init__( sam = sam_model_registry[self.model_name](checkpoint=path_to_model) sam = sam.to(torch.device(self.device)) self.mask_generator = SamAutomaticMaskGenerator( - sam, points_per_batch=batch_size, + sam, points_per_batch=batch_size, points_per_side=points_per_side ) @@ -111,7 +111,7 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: mean_area = np.mean(areas) / hw else: max_area = mean_area = 0 - + df_batch_labels["complexity_num_segments"].extend([num_segments]) df_batch_labels["complexity_max_segment_area"].extend([max_area]) df_batch_labels["complexity_mean_segment_area"].extend([mean_area]) diff --git a/DPF/filters/multigpu_filter.py b/DPF/filters/multigpu_filter.py index acee0a4..14ae364 100644 --- a/DPF/filters/multigpu_filter.py +++ b/DPF/filters/multigpu_filter.py @@ -1,6 +1,6 @@ import multiprocessing from multiprocessing import Manager -from typing import Any, Union, Optional, Callable +from typing import Any, Callable, Optional, Union import numpy as np import pandas as pd @@ -60,9 +60,9 @@ def __init__( ---------- devices: list[Union[torch.device, str]] List of devices to run datafilter on - datafilter_class: type[DataFilter] + datafilter_class: Optional[type[DataFilter]] = None Class of datafilter to use - datafilter_params: dict[str, Any] + datafilter_params: Optional[dict[str, Any]] = None Parameters for datafilter_class initialization datafilter_init_fn: Optional[Callable[[int, Union[str, torch.device], dict[str, Any]], DataFilter]] = None Initialization function for a datafilter. Takes _pbar_position as first arg and device as a second arg @@ -77,11 +77,6 @@ def __init__( self.devices = devices self.num_parts = len(devices) - self.filters = [] - for i in range(self.num_parts): - self.filters.append(datafilter_class(**datafilter_params, _pbar_position=i, device=devices[i])) - self.filters[i]._created_by_multigpu_data_filter = True - # getting result columns names if self.datafilter_init_fn: datafilter = self.datafilter_init_fn(0, devices[0], self.datafilter_init_fn_kwargs) @@ -146,7 +141,7 @@ def run( processes = [] context = multiprocessing.get_context('spawn') for param in params: - p = context.Process(target=self.run_one_process, args=param) + p = context.Process(target=run_one_process, args=param) p.start() processes.append(p) @@ -156,21 +151,3 @@ def run( res_df = pd.concat(shared_results) res_df.sort_index(inplace=True) return res_df - - - def run_one_process( - self, - config: DatasetConfig, - connector: Connector, - df: pd.DataFrame, - i: int, - index: pd.Series, - results: list[pd.DataFrame], - filter_run_kwargs: dict[str, Any] - ) -> None: - reader = DatasetReader(connector=connector) - processor = reader.from_df(config, df) - processor.apply_data_filter(self.filters[i], **filter_run_kwargs) - res = processor.df - res.set_index(index, inplace=True) - results.append(res) diff --git a/DPF/filters/videos/cogvlm2_filter.py b/DPF/filters/videos/cogvlm2_filter.py index 70a750e..76d9016 100644 --- a/DPF/filters/videos/cogvlm2_filter.py +++ b/DPF/filters/videos/cogvlm2_filter.py @@ -1,15 +1,15 @@ +import re from io import BytesIO from typing import Any -from DPF.types import ModalityToDataMapping - -from .video_filter import VideoFilter import numpy as np import torch from decord import VideoReader, bridge from transformers import AutoModelForCausalLM, AutoTokenizer, BitsAndBytesConfig -import re +from DPF.types import ModalityToDataMapping + +from .video_filter import VideoFilter prompt_templates = { 'detailed_video': 'Describe this video and its style in a very detailed manner', @@ -33,15 +33,15 @@ ] -def clean_with_regex(caption): - lower_caption = str(caption).lower().strip() - for re_compiled, replacement in compiled_regexs: - iterator = reversed(list(re_compiled.finditer(lower_caption))) - for match in iterator: - pos = list(match.span()) +def clean_with_regex(caption: str) -> str: + lower_caption = str(caption).lower().strip() + for re_compiled, replacement in compiled_regexs: + iterator = reversed(list(re_compiled.finditer(lower_caption))) + for match in iterator: + pos = list(match.span()) caption = caption[:pos[0]] + replacement + caption[pos[1]:] lower_caption = str(caption).lower().strip() - + if caption.count('-') > 2: split_captions = [] for split_caption in caption.split(): @@ -49,9 +49,9 @@ def clean_with_regex(caption): split_caption = re.sub(r'-', ' ', split_caption) split_captions.append(split_caption) caption = ' '.join(split_captions) - + caption = caption.strip('—-:/+=|@#&*') - + return caption.strip() @@ -156,8 +156,8 @@ def preprocess_data( ) -> Any: key = metadata[self.key_column] video_file = BytesIO(modality2data['video']) - video_file = self.load_video(video_file, strategy=self.strategy) - return key, video_file + loaded_video_file = self.load_video(video_file, strategy=self.strategy) + return key, loaded_video_file def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: df_batch_labels = self._get_dict_from_schema() @@ -196,12 +196,11 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: return df_batch_labels - def load_video(self, video_path, strategy='chat'): + def load_video(self, video_path: BytesIO, strategy: str = 'chat') -> torch.Tensor: bridge.set_bridge('torch') num_frames = self.num_frames decord_vr = VideoReader(uri=video_path) - frame_id_list = None total_frames = len(decord_vr) if strategy == 'base': frame_id_list = np.linspace(0, total_frames - 1, num_frames, dtype=int) @@ -209,13 +208,15 @@ def load_video(self, video_path, strategy='chat'): timestamps = decord_vr.get_frame_timestamp(np.arange(total_frames)) timestamps = [i[0] for i in timestamps] max_second = round(max(timestamps)) + 1 - frame_id_list = [] + frame_id_list = [] # type: ignore for second in range(max_second): closest_num = min(timestamps, key=lambda x: abs(x - second)) index = timestamps.index(closest_num) - frame_id_list.append(index) + frame_id_list.append(index) # type: ignore if len(frame_id_list) >= num_frames: break - video_data = decord_vr.get_batch(frame_id_list) + else: + frame_id_list = None + video_data: torch.Tensor = decord_vr.get_batch(frame_id_list) video_data = video_data.permute(3, 0, 1, 2) return video_data diff --git a/DPF/filters/videos/dover_filter.py b/DPF/filters/videos/dover_filter.py index b1855bd..758353e 100644 --- a/DPF/filters/videos/dover_filter.py +++ b/DPF/filters/videos/dover_filter.py @@ -1,21 +1,19 @@ import os +from io import BytesIO from typing import Any from urllib.request import urlretrieve -from io import BytesIO + +import decord import numpy as np import torch +import yaml +from decord import VideoReader +from dover.datasets import UnifiedFrameSampler, get_single_view # type: ignore +from dover.models import DOVER # type: ignore from DPF.types import ModalityToDataMapping -from .video_filter import VideoFilter -import yaml -from dover.datasets import ( - UnifiedFrameSampler, - get_single_view, -) -import decord -from decord import VideoReader -from dover.models import DOVER +from .video_filter import VideoFilter WEIGHTS_URL = {'dover': 'https://github.com/QualityAssessment/DOVER/releases/download/v0.1.0/DOVER.pth', 'dover_plus_plus': 'https://huggingface.co/teowu/DOVER/resolve/main/DOVER_plus_plus.pth', @@ -26,7 +24,7 @@ 'dover-mobile': 'https://raw.githubusercontent.com/teowu/DOVER-Dev/master/dover-mobile.yml'} -def fuse_results(results: list): +def fuse_results(results: list[float]) -> dict[str, np.ndarray]: # type: ignore t, a = (results[0] + 0.0758) / 0.0129, (results[1] - 0.1253) / 0.0318 # t, a = (results[0] - 0.1107) / 0.07355, (results[1] + 0.08285) / 0.03774 x = t * 0.6104 + a * 0.3896 @@ -37,8 +35,8 @@ def fuse_results(results: list): } -def spatial_temporal_view_decomposition( - video_path, sample_types, samplers, is_train=False, augment=False, +def spatial_temporal_view_decomposition( # type: ignore + video_path: str | BytesIO, sample_types: dict, samplers: dict, is_train: bool = False, augment: bool = False, # type: ignore ): video = {} decord.bridge.set_bridge("torch") @@ -63,6 +61,7 @@ def spatial_temporal_view_decomposition( sampled_video[stype] = get_single_view(video[stype], stype, **sopt) return sampled_video, frame_inds + class DOVERFilter(VideoFilter): """ DOVER model inference class to get video quality scores. @@ -98,7 +97,7 @@ def __init__( self.model_name = model_name self.weights_folder = weights_folder - + # Download checkpoints and configs path_to_model = os.path.join(self.weights_folder, self.model_name + '.pth') if not os.path.exists(path_to_model): @@ -108,9 +107,9 @@ def __init__( if not os.path.exists(path_to_config): os.makedirs(self.weights_folder, exist_ok=True) urlretrieve(CONFIGS_URL[self.model_name], path_to_config) - + # Load model - with open(path_to_config, "r") as f: + with open(path_to_config) as f: opt = yaml.safe_load(f) self.model = DOVER(**opt["model"]["args"]).to(self.device) state_dict = torch.load(path_to_model, map_location=self.device) @@ -122,7 +121,7 @@ def __init__( @property def result_columns(self) -> list[str]: - return [f"dover_aesthetic", f"dover_technical", f"dover_overall"] + return ["dover_aesthetic", "dover_technical", "dover_overall"] @property def dataloader_kwargs(self) -> dict[str, Any]: @@ -144,7 +143,7 @@ def preprocess_data( torch.FloatTensor([123.675, 116.28, 103.53]), torch.FloatTensor([58.395, 57.12, 57.375]) ) - + temporal_samplers = {} for stype, sopt in self.dopt["sample_types"].items(): if "t_frag" not in sopt: @@ -183,7 +182,7 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: key, views = batch[0] for k, v in views.items(): views[k] = v.to(self.device) - + with torch.no_grad(): results = [r.mean().item() for r in self.model(views)] rescaled_results = fuse_results(results) diff --git a/DPF/filters/videos/farneback_filter.py b/DPF/filters/videos/farneback_filter.py index 3ef975a..0e876d0 100644 --- a/DPF/filters/videos/farneback_filter.py +++ b/DPF/filters/videos/farneback_filter.py @@ -18,7 +18,7 @@ def transform_frame(frame: MatLike, target_size: tuple[int, int]) -> MatLike: def transform_keep_ar(frame: MatLike, min_side_size: int) -> MatLike: - h, w = frame.shape[:2] + h, w = frame.shape[:2] # type: ignore aspect_ratio = w / h if h <= w: new_height = min_side_size diff --git a/DPF/filters/videos/image_filter_adapter.py b/DPF/filters/videos/image_filter_adapter.py index d5c376f..3684412 100644 --- a/DPF/filters/videos/image_filter_adapter.py +++ b/DPF/filters/videos/image_filter_adapter.py @@ -30,16 +30,14 @@ class ImageFilterAdapter(VideoFilter): def __init__( self, - imagefilter_class: type[ImageFilter], - imagefilter_kwargs: dict[str, Any], + image_filter: ImageFilter, video_frame: float, - device: str = "cuda:0", workers: int = 8, pbar: bool = True, _pbar_position: int = 0 ): super().__init__(pbar, _pbar_position) - self.image_filter = imagefilter_class(**imagefilter_kwargs, device=device) + self.image_filter = image_filter self.video_frame = video_frame self.num_workers = workers @@ -117,18 +115,16 @@ class MultiFrameImageFilterAdapter(VideoFilter): def __init__( self, - imagefilter_class: type[ImageFilter], - imagefilter_kwargs: dict[str, Any], + image_filter: ImageFilter, video_frames: list[float], reduce_results_fn: Callable[[str, list[Any]], Any], - device: str = "cuda:0", batch_size: int = 8, workers: int = 8, pbar: bool = True, _pbar_position: int = 0 ): super().__init__(pbar, _pbar_position) - self.image_filter = imagefilter_class(**imagefilter_kwargs, device=device) + self.image_filter = image_filter self.video_frames = video_frames self.reduce_results_fn = reduce_results_fn self.batch_size = batch_size diff --git a/DPF/filters/videos/rpknet_filter.py b/DPF/filters/videos/rpknet_filter.py index f6c96fb..ea5dea8 100644 --- a/DPF/filters/videos/rpknet_filter.py +++ b/DPF/filters/videos/rpknet_filter.py @@ -1,24 +1,24 @@ import io from typing import Any, Optional + import cv2 import imageio.v3 as iio import numpy as np +import ptlflow import torch import torch.nn.functional as F from cv2.typing import MatLike from torch import Tensor from DPF.types import ModalityToDataMapping -from .video_filter import VideoFilter - -import ptlflow +from .video_filter import VideoFilter WEIGHTS_URL = 'https://dl.dropboxusercontent.com/s/4j4z58wuv8o0mfz/models.zip' def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: - h, w = frame.shape[:2] + h, w = frame.shape[:2] # type: ignore aspect_ratio = w / h if h <= w: new_height = min_side_size @@ -114,7 +114,7 @@ def __init__( @property def result_columns(self) -> list[str]: - return [f"optical_flow_rpk_mean"] + return ["optical_flow_rpk_mean"] @property def dataloader_kwargs(self) -> dict[str, Any]: @@ -157,7 +157,7 @@ def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]: next_frame_cuda = next_frame.to(self.device) inputs = torch.stack([current_frame_cuda, next_frame_cuda], dim=1) - + flow = self.model({'images': inputs})['flows'][:, 0] if self.norm: h, w = current_frame.shape[-2:] diff --git a/DPF/filters/videos/structural_dynamics_filter.py b/DPF/filters/videos/structural_dynamics_filter.py index e791b80..d9cfd56 100644 --- a/DPF/filters/videos/structural_dynamics_filter.py +++ b/DPF/filters/videos/structural_dynamics_filter.py @@ -1,19 +1,19 @@ import io from typing import Any + import cv2 import imageio.v3 as iio import numpy as np import torch +import torch.nn.functional as F from cv2.typing import MatLike +from pytorch_msssim import MS_SSIM from torch import Tensor -import torch.nn.functional as F from DPF.types import ModalityToDataMapping from .video_filter import VideoFilter -from pytorch_msssim import MS_SSIM - def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: h, w = frame.shape[:2] @@ -54,7 +54,7 @@ def unpad(self, x: Tensor) -> Tensor: ht, wd = x.shape[-2:] c = [self._pad[2], ht - self._pad[3], self._pad[0], wd - self._pad[1]] return x[..., c[0]:c[1], c[2]:c[3]] - + class StructuralDynamicsFilter(VideoFilter): """ @@ -101,7 +101,7 @@ def __init__( @property def result_columns(self) -> list[str]: - return [f"structural_dynamics", 'structural_dynamics_max', 'structural_dynamics_min'] + return ["structural_dynamics", 'structural_dynamics_max', 'structural_dynamics_min'] @property def dataloader_kwargs(self) -> dict[str, Any]: diff --git a/DPF/pipelines/filter_pipeline.py b/DPF/pipelines/filter_pipeline.py index b174b2d..91d2825 100644 --- a/DPF/pipelines/filter_pipeline.py +++ b/DPF/pipelines/filter_pipeline.py @@ -1,8 +1,7 @@ from dataclasses import dataclass -from typing import Any, Callable, Optional, Union +from typing import Any, Callable, Optional import pandas as pd -import torch from DPF.filters import ColumnFilter, DataFilter from DPF.filters.multigpu_filter import MultiGPUDataFilter @@ -39,13 +38,12 @@ def __init__(self, pipeline_name: str, logs_dir: Optional[str] = None): def add_datafilter( self, - datafilter: Optional[type[DataFilter]] = None, - datafilter_kwargs: dict[str, Any] = None, + datafilter: type[DataFilter], + datafilter_kwargs: dict[str, Any], devices: Optional[list[str]] = None, processor_apply_kwargs: Optional[dict[str, Any]] = None, on_error: OnErrorOptions = "stop", - skip_if_columns_exist: bool = True, - constant_gpu: bool = False + skip_if_columns_exist: bool = True ) -> None: if processor_apply_kwargs is None: processor_apply_kwargs = {} @@ -54,7 +52,7 @@ def add_datafilter( stage = FilterPipelineStage( 'datafilter', filter_class=datafilter, filter_kwargs=datafilter_kwargs, processor_apply_kwargs=processor_apply_kwargs, - skip_if_columns_exist=skip_if_columns_exist, constant_gpu=constant_gpu + skip_if_columns_exist=skip_if_columns_exist ) elif len(devices) == 0: new_kwargs = datafilter_kwargs.copy() @@ -62,7 +60,7 @@ def add_datafilter( stage = FilterPipelineStage( 'datafilter', filter_class=datafilter, filter_kwargs=new_kwargs, processor_apply_kwargs=processor_apply_kwargs, - skip_if_columns_exist=skip_if_columns_exist, constant_gpu=constant_gpu + skip_if_columns_exist=skip_if_columns_exist ) else: stage = FilterPipelineStage( @@ -71,10 +69,9 @@ def add_datafilter( "devices": devices, "datafilter_class": datafilter, "datafilter_params": datafilter_kwargs - }, + }, processor_apply_kwargs=processor_apply_kwargs, - skip_if_columns_exist=skip_if_columns_exist, - constant_gpu=constant_gpu + skip_if_columns_exist=skip_if_columns_exist ) self.stages.append( diff --git a/DPF/pipelines/pipeline_stages.py b/DPF/pipelines/pipeline_stages.py index c2068ea..347e9b0 100644 --- a/DPF/pipelines/pipeline_stages.py +++ b/DPF/pipelines/pipeline_stages.py @@ -80,7 +80,7 @@ def __init__( self.processor_apply_kwargs = {} self.skip_if_columns_exist = skip_if_columns_exist - + self.constant_gpu = constant_gpu if constant_gpu: self.filter_obj = self.filter_class(**self.filter_kwargs) From 20baea4f60a60b1d158b457f354bc7ee10fd0572 Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 29 Jul 2024 18:43:16 +0300 Subject: [PATCH 14/16] fix: back to original version --- DPF/pipelines/pipeline_stages.py | 15 +++------------ 1 file changed, 3 insertions(+), 12 deletions(-) diff --git a/DPF/pipelines/pipeline_stages.py b/DPF/pipelines/pipeline_stages.py index 347e9b0..4de7968 100644 --- a/DPF/pipelines/pipeline_stages.py +++ b/DPF/pipelines/pipeline_stages.py @@ -68,8 +68,7 @@ def __init__( filter_class: Union[type[DataFilter], type[ColumnFilter], type[MultiGPUDataFilter]], filter_kwargs: dict[str, Any], processor_apply_kwargs: Optional[dict[str, Any]] = None, - skip_if_columns_exist: bool = True, - constant_gpu: bool = False + skip_if_columns_exist: bool = True ): self.filter_type = filter_type self.filter_class = filter_class @@ -81,20 +80,12 @@ def __init__( self.skip_if_columns_exist = skip_if_columns_exist - self.constant_gpu = constant_gpu - if constant_gpu: - self.filter_obj = self.filter_class(**self.filter_kwargs) - - @property def stage_name(self) -> str: return f"FilterPipelineStage(filter_class={self.filter_class}, filter_kwargs={self.filter_kwargs})" def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None: - if self.constant_gpu: - filter_obj = self.filter_obj - else: - filter_obj = self.filter_class(**self.filter_kwargs) + filter_obj = self.filter_class(**self.filter_kwargs) columns_to_be_added = filter_obj.result_columns columns_intersection = set(processor.columns).intersection(set(columns_to_be_added)) @@ -141,4 +132,4 @@ def stage_name(self) -> str: def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None: transforms = self.transforms_class(**self.transforms_kwargs) - processor.apply_transform(transforms, **self.processor_apply_kwargs) # type: ignore + processor.apply_transform(transforms, **self.processor_apply_kwargs) # type: ignore \ No newline at end of file From 836c67e13a253126f7614d4bba3f7709b77eef58 Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 29 Jul 2024 18:50:30 +0300 Subject: [PATCH 15/16] fix: fix code style errors --- DPF/filters/images/complexity_filter.py | 3 ++- DPF/filters/videos/raft_filter.py | 2 +- DPF/filters/videos/structural_dynamics_filter.py | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/DPF/filters/images/complexity_filter.py b/DPF/filters/images/complexity_filter.py index fda60f5..9ddffbb 100644 --- a/DPF/filters/images/complexity_filter.py +++ b/DPF/filters/images/complexity_filter.py @@ -4,13 +4,14 @@ import numpy as np import torch -from segment_anything import SamAutomaticMaskGenerator, sam_model_registry +from segment_anything import SamAutomaticMaskGenerator, sam_model_registry # type: ignore from DPF.utils import read_image_rgb_from_bytes from ...types import ModalityToDataMapping from .img_filter import ImageFilter + WEIGHTS_URL = {'vit_h': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth', 'vit_l': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth', 'vit_b': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth'} diff --git a/DPF/filters/videos/raft_filter.py b/DPF/filters/videos/raft_filter.py index 72e38c8..0a44b8c 100644 --- a/DPF/filters/videos/raft_filter.py +++ b/DPF/filters/videos/raft_filter.py @@ -30,7 +30,7 @@ def transform_frame(frame: MatLike, target_size: tuple[int, int]) -> Tensor: def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: - h, w = frame.shape[:2] + h, w = frame.shape[:2] # type: ignore aspect_ratio = w / h if h <= w: new_height = min_side_size diff --git a/DPF/filters/videos/structural_dynamics_filter.py b/DPF/filters/videos/structural_dynamics_filter.py index d9cfd56..4dc70d1 100644 --- a/DPF/filters/videos/structural_dynamics_filter.py +++ b/DPF/filters/videos/structural_dynamics_filter.py @@ -16,7 +16,7 @@ def transform_keep_ar(frame: MatLike, min_side_size: int) -> Tensor: - h, w = frame.shape[:2] + h, w = frame.shape[:2] # type: ignore aspect_ratio = w / h if h <= w: new_height = min_side_size From 1597aad80fdf9ff7f99d7ada1ed19507f6cdbb59 Mon Sep 17 00:00:00 2001 From: boomb0om Date: Mon, 29 Jul 2024 19:09:05 +0300 Subject: [PATCH 16/16] fix code style --- DPF/filters/images/complexity_filter.py | 6 ++++-- DPF/pipelines/pipeline_stages.py | 2 +- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/DPF/filters/images/complexity_filter.py b/DPF/filters/images/complexity_filter.py index 9ddffbb..d79c2c2 100644 --- a/DPF/filters/images/complexity_filter.py +++ b/DPF/filters/images/complexity_filter.py @@ -4,14 +4,16 @@ import numpy as np import torch -from segment_anything import SamAutomaticMaskGenerator, sam_model_registry # type: ignore +from segment_anything import ( # type: ignore + SamAutomaticMaskGenerator, + sam_model_registry, +) from DPF.utils import read_image_rgb_from_bytes from ...types import ModalityToDataMapping from .img_filter import ImageFilter - WEIGHTS_URL = {'vit_h': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_h_4b8939.pth', 'vit_l': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_l_0b3195.pth', 'vit_b': 'https://dl.fbaipublicfiles.com/segment_anything/sam_vit_b_01ec64.pth'} diff --git a/DPF/pipelines/pipeline_stages.py b/DPF/pipelines/pipeline_stages.py index 4de7968..dc1f01b 100644 --- a/DPF/pipelines/pipeline_stages.py +++ b/DPF/pipelines/pipeline_stages.py @@ -132,4 +132,4 @@ def stage_name(self) -> str: def run(self, processor: DatasetProcessor, logger: logging.Logger) -> None: transforms = self.transforms_class(**self.transforms_kwargs) - processor.apply_transform(transforms, **self.processor_apply_kwargs) # type: ignore \ No newline at end of file + processor.apply_transform(transforms, **self.processor_apply_kwargs) # type: ignore