File size: 20,559 Bytes
961c6fe |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 |
# --- START OF FILE preprocess.py ---
import pandas as pd
import numpy as np
import json
import ast
from tqdm.auto import tqdm
import time
import os
import duckdb
import re # Import re for the manual regex check in debug
# --- Constants ---
PROCESSED_PARQUET_FILE_PATH = "models_processed.parquet"
HF_PARQUET_URL = 'https://huggingface.co/datasets/cfahlgren1/hub-stats/resolve/main/models.parquet'
MODEL_SIZE_RANGES = {
"Small (<1GB)": (0, 1),
"Medium (1-5GB)": (1, 5),
"Large (5-20GB)": (5, 20),
"X-Large (20-50GB)": (20, 50),
"XX-Large (>50GB)": (50, float('inf'))
}
# --- Debugging Constant ---
# <<<<<<< SET THE MODEL ID YOU WANT TO DEBUG HERE >>>>>>>
MODEL_ID_TO_DEBUG = "openvla/openvla-7b"
# Example: MODEL_ID_TO_DEBUG = "openai-community/gpt2"
# If you don't have a specific ID, the debug block will just report it's not found.
# --- Utility Functions (extract_model_file_size_gb, extract_org_from_id, process_tags_for_series, get_file_size_category - unchanged from previous correct version) ---
def extract_model_file_size_gb(safetensors_data):
try:
if pd.isna(safetensors_data): return 0.0
data_to_parse = safetensors_data
if isinstance(safetensors_data, str):
try:
if (safetensors_data.startswith('{') and safetensors_data.endswith('}')) or \
(safetensors_data.startswith('[') and safetensors_data.endswith(']')):
data_to_parse = ast.literal_eval(safetensors_data)
else: data_to_parse = json.loads(safetensors_data)
except Exception: return 0.0
if isinstance(data_to_parse, dict) and 'total' in data_to_parse:
total_bytes_val = data_to_parse['total']
try:
size_bytes = float(total_bytes_val)
return size_bytes / (1024 * 1024 * 1024)
except (ValueError, TypeError): return 0.0
return 0.0
except Exception: return 0.0
def extract_org_from_id(model_id):
if pd.isna(model_id): return "unaffiliated"
model_id_str = str(model_id)
return model_id_str.split("/")[0] if "/" in model_id_str else "unaffiliated"
def process_tags_for_series(series_of_tags_values):
processed_tags_accumulator = []
for i, tags_value_from_series in enumerate(tqdm(series_of_tags_values, desc="Standardizing Tags", leave=False, unit="row")):
temp_processed_list_for_row = []
current_value_for_error_msg = str(tags_value_from_series)[:200] # Truncate for long error messages
try:
# Order of checks is important!
# 1. Handle explicit Python lists first
if isinstance(tags_value_from_series, list):
current_tags_in_list = []
for idx_tag, tag_item in enumerate(tags_value_from_series):
try:
# Ensure item is not NaN before string conversion if it might be a float NaN in a list
if pd.isna(tag_item): continue
str_tag = str(tag_item)
stripped_tag = str_tag.strip()
if stripped_tag:
current_tags_in_list.append(stripped_tag)
except Exception as e_inner_list_proc:
print(f"ERROR processing item '{tag_item}' (type: {type(tag_item)}) within a list for row {i}. Error: {e_inner_list_proc}. Original list: {current_value_for_error_msg}")
temp_processed_list_for_row = current_tags_in_list
# 2. Handle NumPy arrays
elif isinstance(tags_value_from_series, np.ndarray):
# Convert to list, then process elements, handling potential NaNs within the array
current_tags_in_list = []
for idx_tag, tag_item in enumerate(tags_value_from_series.tolist()): # .tolist() is crucial
try:
if pd.isna(tag_item): continue # Check for NaN after converting to Python type
str_tag = str(tag_item)
stripped_tag = str_tag.strip()
if stripped_tag:
current_tags_in_list.append(stripped_tag)
except Exception as e_inner_array_proc:
print(f"ERROR processing item '{tag_item}' (type: {type(tag_item)}) within a NumPy array for row {i}. Error: {e_inner_array_proc}. Original array: {current_value_for_error_msg}")
temp_processed_list_for_row = current_tags_in_list
# 3. Handle simple None or pd.NA after lists and arrays (which might contain pd.NA elements handled above)
elif tags_value_from_series is None or pd.isna(tags_value_from_series): # Now pd.isna is safe for scalars
temp_processed_list_for_row = []
# 4. Handle strings (could be JSON-like, list-like, or comma-separated)
elif isinstance(tags_value_from_series, str):
processed_str_tags = []
# Attempt ast.literal_eval for strings that look like lists/tuples
if (tags_value_from_series.startswith('[') and tags_value_from_series.endswith(']')) or \
(tags_value_from_series.startswith('(') and tags_value_from_series.endswith(')')):
try:
evaluated_tags = ast.literal_eval(tags_value_from_series)
if isinstance(evaluated_tags, (list, tuple)): # Check if eval result is a list/tuple
# Recursively process this evaluated list/tuple, as its elements could be complex
# For simplicity here, assume elements are simple strings after eval
current_eval_list = []
for tag_item in evaluated_tags:
if pd.isna(tag_item): continue
str_tag = str(tag_item).strip()
if str_tag: current_eval_list.append(str_tag)
processed_str_tags = current_eval_list
except (ValueError, SyntaxError):
pass # If ast.literal_eval fails, let it fall to JSON or comma split
# If ast.literal_eval didn't populate, try JSON
if not processed_str_tags:
try:
json_tags = json.loads(tags_value_from_series)
if isinstance(json_tags, list):
# Similar to above, assume elements are simple strings after JSON parsing
current_json_list = []
for tag_item in json_tags:
if pd.isna(tag_item): continue
str_tag = str(tag_item).strip()
if str_tag: current_json_list.append(str_tag)
processed_str_tags = current_json_list
except json.JSONDecodeError:
# If not a valid JSON list, fall back to comma splitting as the final string strategy
processed_str_tags = [tag.strip() for tag in tags_value_from_series.split(',') if tag.strip()]
except Exception as e_json_other:
print(f"ERROR during JSON processing for string '{current_value_for_error_msg}' for row {i}. Error: {e_json_other}")
processed_str_tags = [tag.strip() for tag in tags_value_from_series.split(',') if tag.strip()] # Fallback
temp_processed_list_for_row = processed_str_tags
# 5. Fallback for other scalar types (e.g., int, float that are not NaN)
else:
# This path is for non-list, non-ndarray, non-None/NaN, non-string types.
# Or for NaNs that slipped through if they are not None or pd.NA (e.g. float('nan'))
if pd.isna(tags_value_from_series): # Catch any remaining NaNs like float('nan')
temp_processed_list_for_row = []
else:
str_val = str(tags_value_from_series).strip()
temp_processed_list_for_row = [str_val] if str_val else []
processed_tags_accumulator.append(temp_processed_list_for_row)
except Exception as e_outer_tag_proc:
print(f"CRITICAL UNHANDLED ERROR processing row {i}: value '{current_value_for_error_msg}' (type: {type(tags_value_from_series)}). Error: {e_outer_tag_proc}. Appending [].")
processed_tags_accumulator.append([])
return processed_tags_accumulator
def get_file_size_category(file_size_gb_val):
try:
numeric_file_size_gb = float(file_size_gb_val)
if pd.isna(numeric_file_size_gb): numeric_file_size_gb = 0.0
except (ValueError, TypeError): numeric_file_size_gb = 0.0
if 0 <= numeric_file_size_gb < 1: return "Small (<1GB)"
elif 1 <= numeric_file_size_gb < 5: return "Medium (1-5GB)"
elif 5 <= numeric_file_size_gb < 20: return "Large (5-20GB)"
elif 20 <= numeric_file_size_gb < 50: return "X-Large (20-50GB)"
elif numeric_file_size_gb >= 50: return "XX-Large (>50GB)"
else: return "Small (<1GB)"
def main_preprocessor():
print(f"Starting pre-processing script. Output: '{PROCESSED_PARQUET_FILE_PATH}'.")
overall_start_time = time.time()
print(f"Fetching fresh data from Hugging Face: {HF_PARQUET_URL}")
try:
fetch_start_time = time.time()
query = f"SELECT * FROM read_parquet('{HF_PARQUET_URL}')"
df_raw = duckdb.sql(query).df()
data_download_timestamp = pd.Timestamp.now(tz='UTC')
if df_raw is None or df_raw.empty: raise ValueError("Fetched data is empty or None.")
if 'id' not in df_raw.columns: raise ValueError("Fetched data must contain 'id' column.")
print(f"Fetched data in {time.time() - fetch_start_time:.2f}s. Rows: {len(df_raw)}. Downloaded at: {data_download_timestamp.strftime('%Y-%m-%d %H:%M:%S %Z')}")
except Exception as e_fetch:
print(f"ERROR: Could not fetch data from Hugging Face: {e_fetch}.")
return
df = pd.DataFrame()
print("Processing raw data...")
proc_start = time.time()
expected_cols_setup = {
'id': str, 'downloads': float, 'downloadsAllTime': float, 'likes': float,
'pipeline_tag': str, 'tags': object, 'safetensors': object
}
for col_name, target_dtype in expected_cols_setup.items():
if col_name in df_raw.columns:
df[col_name] = df_raw[col_name]
if target_dtype == float: df[col_name] = pd.to_numeric(df[col_name], errors='coerce').fillna(0.0)
elif target_dtype == str: df[col_name] = df[col_name].astype(str).fillna('')
else:
if col_name in ['downloads', 'downloadsAllTime', 'likes']: df[col_name] = 0.0
elif col_name == 'pipeline_tag': df[col_name] = ''
elif col_name == 'tags': df[col_name] = pd.Series([[] for _ in range(len(df_raw))]) # Initialize with empty lists
elif col_name == 'safetensors': df[col_name] = None # Initialize with None
elif col_name == 'id': print("CRITICAL ERROR: 'id' column missing."); return
output_filesize_col_name = 'params'
if output_filesize_col_name in df_raw.columns and pd.api.types.is_numeric_dtype(df_raw[output_filesize_col_name]):
print(f"Using pre-existing '{output_filesize_col_name}' column as file size in GB.")
df[output_filesize_col_name] = pd.to_numeric(df_raw[output_filesize_col_name], errors='coerce').fillna(0.0)
elif 'safetensors' in df.columns:
print(f"Calculating '{output_filesize_col_name}' (file size in GB) from 'safetensors' data...")
df[output_filesize_col_name] = df['safetensors'].apply(extract_model_file_size_gb)
df[output_filesize_col_name] = pd.to_numeric(df[output_filesize_col_name], errors='coerce').fillna(0.0)
else:
print(f"Cannot determine file size. Setting '{output_filesize_col_name}' to 0.0.")
df[output_filesize_col_name] = 0.0
df['data_download_timestamp'] = data_download_timestamp
print(f"Added 'data_download_timestamp' column.")
print("Categorizing models by file size...")
df['size_category'] = df[output_filesize_col_name].apply(get_file_size_category)
print("Standardizing 'tags' column...")
df['tags'] = process_tags_for_series(df['tags']) # This now uses tqdm internally
# --- START DEBUGGING BLOCK ---
# This block will execute before the main tag processing loop
if MODEL_ID_TO_DEBUG and MODEL_ID_TO_DEBUG in df['id'].values: # Check if ID exists
print(f"\n--- Pre-Loop Debugging for Model ID: {MODEL_ID_TO_DEBUG} ---")
# 1. Check the 'tags' column content after process_tags_for_series
model_specific_tags_list = df.loc[df['id'] == MODEL_ID_TO_DEBUG, 'tags'].iloc[0]
print(f"1. Tags from df['tags'] (after process_tags_for_series): {model_specific_tags_list}")
print(f" Type of tags: {type(model_specific_tags_list)}")
if isinstance(model_specific_tags_list, list):
for i, tag_item in enumerate(model_specific_tags_list):
print(f" Tag item {i}: '{tag_item}' (type: {type(tag_item)}, len: {len(str(tag_item))})")
# Detailed check for 'robotics' specifically
if 'robotics' in str(tag_item).lower():
print(f" DEBUG: Found 'robotics' substring in '{tag_item}'")
print(f" - str(tag_item).lower().strip(): '{str(tag_item).lower().strip()}'")
print(f" - Is it exactly 'robotics'?: {str(tag_item).lower().strip() == 'robotics'}")
print(f" - Ordinals: {[ord(c) for c in str(tag_item)]}")
# 2. Simulate temp_tags_joined for this specific model
if isinstance(model_specific_tags_list, list):
simulated_temp_tags_joined = '~~~'.join(str(t).lower().strip() for t in model_specific_tags_list if pd.notna(t) and str(t).strip())
else:
simulated_temp_tags_joined = ''
print(f"2. Simulated 'temp_tags_joined' for this model: '{simulated_temp_tags_joined}'")
# 3. Simulate 'has_robot' check for this model
robot_keywords = ['robot', 'robotics']
robot_pattern = '|'.join(robot_keywords)
manual_robot_check = bool(re.search(robot_pattern, simulated_temp_tags_joined, flags=re.IGNORECASE))
print(f"3. Manual regex check for 'has_robot' ('{robot_pattern}' in '{simulated_temp_tags_joined}'): {manual_robot_check}")
print(f"--- End Pre-Loop Debugging for Model ID: {MODEL_ID_TO_DEBUG} ---\n")
elif MODEL_ID_TO_DEBUG:
print(f"DEBUG: Model ID '{MODEL_ID_TO_DEBUG}' not found in DataFrame for pre-loop debugging.")
# --- END DEBUGGING BLOCK ---
print("Vectorized creation of cached tag columns...")
tag_time = time.time()
# This is the original temp_tags_joined creation:
df['temp_tags_joined'] = df['tags'].apply(
lambda tl: '~~~'.join(str(t).lower().strip() for t in tl if pd.notna(t) and str(t).strip()) if isinstance(tl, list) else ''
)
tag_map = {
'has_audio': ['audio'], 'has_speech': ['speech'], 'has_music': ['music'],
'has_robot': ['robot', 'robotics','openvla','vla'],
'has_bio': ['bio'], 'has_med': ['medic', 'medical'],
'has_series': ['series', 'time-series', 'timeseries'],
'has_video': ['video'], 'has_image': ['image', 'vision'],
'has_text': ['text', 'nlp', 'llm']
}
for col, kws in tag_map.items():
pattern = '|'.join(kws)
df[col] = df['temp_tags_joined'].str.contains(pattern, na=False, case=False, regex=True)
df['has_science'] = (
df['temp_tags_joined'].str.contains('science', na=False, case=False, regex=True) &
~df['temp_tags_joined'].str.contains('bigscience', na=False, case=False, regex=True)
)
del df['temp_tags_joined'] # Clean up temporary column
df['is_audio_speech'] = (df['has_audio'] | df['has_speech'] |
df['pipeline_tag'].str.contains('audio|speech', case=False, na=False, regex=True))
df['is_biomed'] = df['has_bio'] | df['has_med']
print(f"Vectorized tag columns created in {time.time() - tag_time:.2f}s.")
# --- POST-LOOP DIAGNOSTIC for has_robot & a specific model ---
if 'has_robot' in df.columns:
print("\n--- 'has_robot' Diagnostics (Preprocessor - Post-Loop) ---")
print(df['has_robot'].value_counts(dropna=False))
if MODEL_ID_TO_DEBUG and MODEL_ID_TO_DEBUG in df['id'].values:
model_has_robot_val = df.loc[df['id'] == MODEL_ID_TO_DEBUG, 'has_robot'].iloc[0]
print(f"Value of 'has_robot' for model '{MODEL_ID_TO_DEBUG}': {model_has_robot_val}")
if model_has_robot_val:
print(f" Original tags for '{MODEL_ID_TO_DEBUG}': {df.loc[df['id'] == MODEL_ID_TO_DEBUG, 'tags'].iloc[0]}")
if df['has_robot'].any():
print("Sample models flagged as 'has_robot':")
print(df[df['has_robot']][['id', 'tags', 'has_robot']].head(5))
else:
print("No models were flagged as 'has_robot' after processing.")
print("--------------------------------------------------------\n")
# --- END POST-LOOP DIAGNOSTIC ---
print("Adding organization column...")
df['organization'] = df['id'].apply(extract_org_from_id)
# Drop safetensors if params was calculated from it, and params didn't pre-exist as numeric
if 'safetensors' in df.columns and \
not (output_filesize_col_name in df_raw.columns and pd.api.types.is_numeric_dtype(df_raw[output_filesize_col_name])):
df = df.drop(columns=['safetensors'], errors='ignore')
final_expected_cols = [
'id', 'downloads', 'downloadsAllTime', 'likes', 'pipeline_tag', 'tags',
'params', 'size_category', 'organization',
'has_audio', 'has_speech', 'has_music', 'has_robot', 'has_bio', 'has_med',
'has_series', 'has_video', 'has_image', 'has_text', 'has_science',
'is_audio_speech', 'is_biomed',
'data_download_timestamp'
]
# Ensure all final columns exist, adding defaults if necessary
for col in final_expected_cols:
if col not in df.columns:
print(f"Warning: Final expected column '{col}' is missing! Defaulting appropriately.")
if col == 'params': df[col] = 0.0
elif col == 'size_category': df[col] = "Small (<1GB)" # Default size category
elif 'has_' in col or 'is_' in col : df[col] = False # Default boolean flags to False
elif col == 'data_download_timestamp': df[col] = pd.NaT # Default timestamp to NaT
print(f"Data processing completed in {time.time() - proc_start:.2f}s.")
try:
print(f"Saving processed data to: {PROCESSED_PARQUET_FILE_PATH}")
df_to_save = df[final_expected_cols].copy() # Ensure only expected columns are saved
df_to_save.to_parquet(PROCESSED_PARQUET_FILE_PATH, index=False, engine='pyarrow')
print(f"Successfully saved processed data.")
except Exception as e_save:
print(f"ERROR: Could not save processed data: {e_save}")
return
total_elapsed_script = time.time() - overall_start_time
print(f"Pre-processing finished. Total time: {total_elapsed_script:.2f}s. Final Parquet shape: {df_to_save.shape}")
if __name__ == "__main__":
if os.path.exists(PROCESSED_PARQUET_FILE_PATH):
print(f"Deleting existing '{PROCESSED_PARQUET_FILE_PATH}' to ensure fresh processing...")
try: os.remove(PROCESSED_PARQUET_FILE_PATH)
except OSError as e: print(f"Error deleting file: {e}. Please delete manually and rerun."); exit()
main_preprocessor()
if os.path.exists(PROCESSED_PARQUET_FILE_PATH):
print(f"\nTo verify, load parquet and check 'has_robot' and its 'tags':")
print(f"import pandas as pd; df_chk = pd.read_parquet('{PROCESSED_PARQUET_FILE_PATH}')")
print(f"print(df_chk['has_robot'].value_counts())")
if MODEL_ID_TO_DEBUG:
print(f"print(df_chk[df_chk['id'] == '{MODEL_ID_TO_DEBUG}'][['id', 'tags', 'has_robot']])")
else:
print(f"print(df_chk[df_chk['has_robot']][['id', 'tags', 'has_robot']].head())") |