|
1 | | -import sys |
2 | | -import py_common.utils |
3 | 1 | import re |
4 | 2 | import json |
5 | 3 | import shutil |
6 | 4 | import zipfile |
7 | 5 | import fnmatch |
8 | | -import urllib3 |
9 | 6 | import requests |
10 | | -import unicodedata |
11 | 7 | import contextlib |
12 | 8 | import urllib |
13 | | -from urllib.request import urlopen |
14 | | -import imghdr |
15 | 9 | from PIL import Image |
16 | 10 |
|
17 | 11 | import os |
18 | 12 | from os import listdir |
19 | | -from os.path import isfile, join |
20 | 13 |
|
21 | | -from bs4 import BeautifulSoup |
22 | 14 | from unidecode import unidecode |
23 | 15 |
|
24 | 16 | from py_common.Logger import Logger |
25 | | -from py_common.Production import Production |
| 17 | +import py7zr |
26 | 18 |
|
27 | 19 | ########################### |
28 | 20 | ### GLOBAL VAR AND CONS ### |
@@ -115,139 +107,124 @@ def fetch_prod_name(prod, suffix, filepath): |
115 | 107 | return path |
116 | 108 |
|
117 | 109 |
|
118 | | -def build(prod: Production, entrypath: str, desired_extentions: list): |
| 110 | + |
| 111 | +def build(prod, entrypath: str, desired_extensions: list): |
119 | 112 | ''' |
120 | | - given a prod "Production" object containing |
121 | | - all production's data, create a proper named folder, fetches all files (screenshot + rom) |
122 | | - and properly organize everything |
| 113 | + Given a prod "Production" object containing |
| 114 | + all production's data, create a properly named folder, fetch all files (screenshot + ROM), |
| 115 | + and organize everything. |
123 | 116 | ''' |
124 | | - if not os.path.exists(entrypath + prod.slug): |
125 | | - ############# |
126 | | - # PROD FILE # |
127 | | - ############# |
128 | | - # make its own folder |
129 | | - os.mkdir(entrypath + prod.slug, 0o777) |
130 | | - |
131 | | - # figuring out the suffix |
132 | | - suffix = str.lower(prod.url.split(".")[-1]) |
133 | | - if suffix not in desired_extentions: |
134 | | - suffix = "gb" |
135 | | - |
136 | | - # building the filepath |
137 | | - filepath = entrypath + prod.slug + "/" |
138 | | - |
139 | | - # download the file |
140 | | - # in case of http |
141 | | - if prod.url.startswith("http"): |
142 | | - try: |
143 | | - r = requests.get(prod.url, allow_redirects=True, |
144 | | - timeout=None, verify=False, headers=headers) |
145 | | - if r.status_code != 200: |
146 | | - logger.write("[ERR]:", str(r.status_code) + |
147 | | - ": " + prod.slug + " - " + prod.url) |
148 | | - |
149 | | - # cleaning in case of error |
150 | | - shutil.rmtree(entrypath + prod.slug) |
151 | | - return 1 |
152 | | - except ConnectionError as e: |
153 | | - logger.write("[ERR]:", str(r.status_code) + |
154 | | - ": " + prod.slug + " - " + prod.url) |
155 | | - logger.write("[ERR]:", "REASON: " + e) |
156 | | - |
157 | | - # cleaning in case of error |
158 | | - shutil.rmtree(entrypath + prod.slug) |
159 | | - return 1 |
160 | | - open(filepath + prod.slug + "." + suffix, 'wb').write(r.content) |
161 | | - else: |
162 | | - with contextlib.closing(urllib.request.urlopen(prod.url)) as r: |
163 | | - with open(filepath + prod.slug + "." + suffix, 'wb') as f: |
164 | | - shutil.copyfileobj(r, f) |
165 | | - |
166 | | - # unzip in case of zip |
167 | | - if prod.url.endswith(".zip") or prod.url.endswith(".ZIP"): |
168 | | - # download and unzip |
169 | | - try: |
170 | | - with zipfile.ZipFile(filepath + prod.slug + "." + suffix, "r") as zip_ref: |
171 | | - zip_ref.extractall(filepath + "unzippedfolder") |
| 117 | + # Create folder if not already present |
| 118 | + target_folder = os.path.join(entrypath, prod.slug) |
| 119 | + if not os.path.exists(target_folder): |
| 120 | + os.mkdir(target_folder, 0o777) |
172 | 121 |
|
173 | | - # manage all extensions, and it doesn't matter if they have uppercase or lowercase |
174 | | - path = [] # eventually the file |
| 122 | + # Extract file extension |
| 123 | + suffix = prod.url.split(".")[-1].lower() |
| 124 | + |
| 125 | + if suffix not in desired_extensions and suffix not in ["zip", "7z", "mp4"]: |
| 126 | + print(f"ERROR: {prod.slug} extension is not in {desired_extensions}") |
| 127 | + suffix = "gb" # Fallback extension |
175 | 128 |
|
176 | | - extentions = fix_extentions(desired_extentions) |
177 | | - for extension in extentions: |
178 | | - path = fetch_prod_name(prod, extension, filepath) |
179 | | - if path != []: |
180 | | - break |
| 129 | + # Build the file path |
| 130 | + filepath = os.path.join(target_folder, f"{prod.slug}.{suffix}") |
181 | 131 |
|
182 | | - # proper renaming and moving the file |
183 | | - if path != []: |
184 | | - os.rename(path[0], filepath + prod.slug + |
185 | | - "." + extension.lower()) |
| 132 | + # Download the file |
| 133 | + try: |
| 134 | + if prod.url.startswith("http"): |
| 135 | + r = requests.get(prod.url, allow_redirects=True, timeout=None, verify=False) |
| 136 | + if r.status_code != 200: |
| 137 | + raise Exception(f"HTTP Error {r.status_code}") |
| 138 | + with open(filepath, 'wb') as f: |
| 139 | + f.write(r.content) |
| 140 | + else: |
| 141 | + with contextlib.closing(urllib.request.urlopen(prod.url)) as r: |
| 142 | + with open(filepath, 'wb') as f: |
| 143 | + shutil.copyfileobj(r, f) |
| 144 | + except Exception as e: |
| 145 | + logger.write("[ERR]:", f"Error downloading {prod.slug}: {e}") |
| 146 | + shutil.rmtree(target_folder) |
| 147 | + return 1 |
| 148 | + |
| 149 | + # Unzip and handle files |
| 150 | + if suffix in ["zip", "7z"]: |
| 151 | + unzipped_path = os.path.join(target_folder, "unzippedfolder") |
| 152 | + os.makedirs(unzipped_path, exist_ok=True) |
186 | 153 |
|
187 | | - # update production object file |
188 | | - prod.files.append(prod.slug + "." + extension.lower()) |
189 | | - else: |
190 | | - logger.write( |
191 | | - "[WARN]", prod.title + " extension is not a " + prod.platform + " file.") |
192 | | - shutil.rmtree(entrypath + prod.slug) |
193 | | - return 1 |
194 | | - |
195 | | - # cleaning up unneeded files |
196 | | - shutil.rmtree(filepath + "unzippedfolder") |
197 | | - if CLEANZIP: |
198 | | - os.remove(filepath + prod.slug + "." + "zip") |
199 | | - except zipfile.BadZipFile as e: |
200 | | - logger.write("[ERR] ", str(e) + " bad zip file") |
201 | | - shutil.rmtree(entrypath + prod.slug) |
| 154 | + try: |
| 155 | + if suffix == "zip": |
| 156 | + with zipfile.ZipFile(filepath, "r") as zip_ref: |
| 157 | + zip_ref.extractall(unzipped_path) |
| 158 | + elif suffix == "7z": |
| 159 | + with py7zr.SevenZipFile(filepath, mode='r') as z: |
| 160 | + z.extractall(unzipped_path) |
| 161 | + except Exception as e: |
| 162 | + logger.write("[ERR]:", f"Failed to extract {suffix} file: {e}") |
| 163 | + shutil.rmtree(target_folder) |
202 | 164 | return 1 |
203 | | - else: |
204 | | - # it is a proper gb file -> just write the filename in its own structure field |
205 | | - pass |
206 | | - |
207 | | - # download the screenshot |
208 | | - if prod.screenshots != None and prod.screenshots != [] and prod.screenshots[0] != "None": |
209 | | - r = requests.get( |
210 | | - prod.screenshots[0], allow_redirects=True, timeout=None) |
211 | | - |
212 | | - # figuring out what kind of screenshots I am dealing with |
213 | | - screen_file_path = filepath + prod.slug + "." |
214 | | - |
215 | | - # screenshot fileext |
216 | | - screen_ext = prod.screenshots[0].split(".")[-1] |
217 | | - logger.write("[INFO]", " The screenshot is in " + |
218 | | - screen_ext + " format") |
219 | 165 |
|
220 | | - if screen_ext.lower() == "png": |
221 | | - screen_file_path += "png" |
222 | | - else: |
223 | | - screen_file_path += screen_ext |
224 | | - |
225 | | - open(screen_file_path, 'wb').write(r.content) |
| 166 | + # Search for desired extensions in the extracted folder |
| 167 | + valid_file_found = False |
| 168 | + |
| 169 | + # Recursively search all files under the unzipped path |
| 170 | + for root, _, files in os.walk(unzipped_path): |
| 171 | + for file in files: |
| 172 | + ext = file.split(".")[-1].lower() |
| 173 | + if ext in desired_extensions: |
| 174 | + extracted_file = os.path.join(root, file) |
| 175 | + final_file = os.path.join(target_folder, f"{prod.slug}.{ext}") |
| 176 | + |
| 177 | + # Move the valid file to the target folder |
| 178 | + shutil.move(extracted_file, final_file) |
| 179 | + prod.files.append(f"{prod.slug}.{ext}") |
| 180 | + |
| 181 | + valid_file_found = True |
| 182 | + break |
| 183 | + |
| 184 | + if valid_file_found: |
| 185 | + break |
226 | 186 |
|
227 | | - if screen_ext != "png": |
228 | | - im = Image.open(screen_file_path).convert("RGB") |
229 | | - im.save(filepath + prod.slug + ".png", "png") |
| 187 | + if not valid_file_found: |
| 188 | + logger.write("[WARN]:", f"No valid files with extensions {desired_extensions} found.") |
| 189 | + shutil.rmtree(target_folder) |
| 190 | + return 1 |
230 | 191 |
|
231 | | - logger.write( |
232 | | - "[INFO]", " Screenshot has been converted into a PNG file.") |
233 | | - logger.write("[INFO]", " Removing screenshot " + |
234 | | - screen_ext + " file...") |
| 192 | + # Clean up unzipped files and original archive |
| 193 | + shutil.rmtree(unzipped_path) |
| 194 | + if CLEANZIP: |
| 195 | + os.remove(filepath) |
| 196 | + else: |
| 197 | + prod.files.append(f"{prod.slug}.{suffix}") |
235 | 198 |
|
236 | | - os.remove(screen_file_path) |
| 199 | + # Handle screenshots |
| 200 | + if prod.screenshots and prod.screenshots[0] != "None": |
| 201 | + try: |
| 202 | + r = requests.get(prod.screenshots[0], allow_redirects=True, timeout=None) |
| 203 | + screen_ext = prod.screenshots[0].split(".")[-1].lower() |
| 204 | + screen_file = os.path.join(target_folder, f"{prod.slug}.{screen_ext}") |
| 205 | + with open(screen_file, 'wb') as f: |
| 206 | + f.write(r.content) |
| 207 | + |
| 208 | + # Convert to PNG if necessary |
| 209 | + if screen_ext != "png": |
| 210 | + img = Image.open(screen_file).convert("RGB") |
| 211 | + png_file = os.path.join(target_folder, f"{prod.slug}.png") |
| 212 | + img.save(png_file, "PNG") |
| 213 | + os.remove(screen_file) |
| 214 | + prod.screenshots[0] = f"{prod.slug}.png" |
| 215 | + else: |
| 216 | + prod.screenshots[0] = f"{prod.slug}.png" |
| 217 | + except Exception as e: |
| 218 | + logger.write("[ERR]:", f"Failed to download screenshot for {prod.slug}: {e}") |
| 219 | + prod.screenshots = [] |
237 | 220 |
|
238 | | - open(filepath + prod.slug + "." + "png", 'wb').write(r.content) |
239 | | - prod.screenshots[0] = prod.slug + "." + "png" |
240 | | - else: |
241 | | - prod.screenshots = [] |
242 | | - logger.write( |
243 | | - "[INFO]", "Screenshot not present for this production") |
244 | 221 | else: |
245 | | - logger.write( |
246 | | - "[WARN]", "directory already present. Skipping " + prod.slug + "...") |
| 222 | + logger.write("[WARN]:", f"Directory already exists for {prod.slug}. Skipping...") |
247 | 223 | return 1 |
248 | 224 | return 0 |
249 | 225 |
|
250 | 226 |
|
| 227 | + |
251 | 228 | def fix_extentions(desired_extentions): |
252 | 229 | ''' |
253 | 230 | given a theorical list of extensions, it returns a list containing additional correct extensions (like CGB, AGB) |
|
0 commit comments