Spaces:
Sleeping
Sleeping
""" | |
Minimal implementation of matrix-vector multiplication using FlexChunk format. | |
Includes only direct (single-process) multiplication and chunking functions. | |
Ref: T9, T10, T13 | |
""" | |
import os | |
import math | |
import time | |
import numpy as np | |
import scipy.sparse as sparse | |
from typing import List, Optional | |
from flex_chunk import FlexChunk, save_chunk, load_chunk | |
def prepare_chunks(matrix: sparse.csr_matrix, | |
num_chunks: int, | |
storage_dir: str, | |
verbose: bool = False) -> List[str]: | |
""" | |
Prepare chunks from a sparse matrix for processing. | |
Ref: T4, T9 | |
Args: | |
matrix: Sparse matrix to split into chunks | |
num_chunks: Number of chunks to create | |
storage_dir: Directory to store chunks | |
verbose: Whether to print debug information | |
Returns: | |
List of paths to the created chunks | |
""" | |
if not sparse.isspmatrix_csr(matrix): | |
matrix = matrix.tocsr() | |
# Ensure the storage directory exists | |
os.makedirs(storage_dir, exist_ok=True) | |
os.makedirs(os.path.join(storage_dir, "chunks"), exist_ok=True) | |
# [T9] Divide data into independent processing units | |
rows_per_chunk = max(1, math.ceil(matrix.shape[0] / num_chunks)) | |
# Create and save chunks | |
chunk_paths = [] | |
for i in range(num_chunks): | |
start_row = i * rows_per_chunk | |
end_row = min((i + 1) * rows_per_chunk, matrix.shape[0]) | |
if start_row >= matrix.shape[0]: | |
break | |
# Extract the submatrix for this chunk | |
chunk_matrix = matrix[start_row:end_row, :] | |
# [T4] Preserve data structure in chunks | |
chunk = FlexChunk.from_csr_matrix( | |
matrix=chunk_matrix, | |
start_row=start_row, | |
end_row=end_row | |
) | |
# Save chunk to file | |
chunk_path = os.path.join(storage_dir, "chunks", f"chunk_{i}.bin") | |
save_chunk(chunk, chunk_path) | |
chunk_paths.append(chunk_path) | |
if verbose: | |
print(f"Created chunk {i}: rows {start_row}-{end_row}, nnz: {chunk.nnz}, saved to {chunk_path}") | |
# Also save matrix dimensions for later use | |
info_path = os.path.join(storage_dir, "matrix_info.npy") | |
np.save(info_path, np.array([matrix.shape[0], matrix.shape[1]], dtype=np.int64)) | |
if verbose: | |
print(f"Matrix chunks prepared and saved to {storage_dir}") | |
print(f"Total chunks: {len(chunk_paths)}") | |
print(f"Matrix shape: {matrix.shape}") | |
return chunk_paths | |
def load_chunks(storage_dir: str, verbose: bool = False) -> List[FlexChunk]: | |
""" | |
Load precomputed chunks from storage directory. | |
Ref: T4, T13 | |
Args: | |
storage_dir: Directory containing saved chunks | |
verbose: Whether to print debug information | |
Returns: | |
List of loaded FlexChunk objects | |
""" | |
chunks_dir = os.path.join(storage_dir, "chunks") | |
if not os.path.exists(chunks_dir): | |
raise ValueError(f"Chunks directory {chunks_dir} does not exist") | |
# Find all chunk files | |
chunk_files = sorted([f for f in os.listdir(chunks_dir) if f.startswith("chunk_") and f.endswith(".bin")], | |
key=lambda x: int(x.split('_')[1].split('.')[0])) | |
if not chunk_files: | |
raise ValueError(f"No chunk files found in {chunks_dir}") | |
# [T4] Restore structural representation from storage | |
chunks = [] | |
for chunk_file in chunk_files: | |
chunk_path = os.path.join(chunks_dir, chunk_file) | |
chunks.append(load_chunk(chunk_path)) | |
if verbose: | |
print(f"Loaded {len(chunks)} chunks from {storage_dir}") | |
print(f"Matrix shape: ({chunks[-1].end_row}, {chunks[0].n_cols})") | |
return chunks | |
def matrix_vector_multiply(chunks: List[FlexChunk], | |
vector: np.ndarray, | |
verbose: bool = False) -> np.ndarray: | |
""" | |
Multiply a sparse matrix with a vector using direct mode and precomputed chunks. | |
Ref: T5, T10, T13 | |
Args: | |
chunks: List of FlexChunk objects representing the matrix | |
vector: Vector to multiply with | |
verbose: Whether to print debug information | |
Returns: | |
Result vector from the multiplication | |
""" | |
start_time = time.time() | |
if verbose: | |
print("Starting matrix-vector multiplication (direct mode)") | |
# Convert vector to numpy array if needed | |
vector = np.asarray(vector) | |
# Validate chunks | |
if not chunks: | |
raise ValueError("No chunks provided for multiplication") | |
# Check vector dimensions | |
if vector.shape[0] != chunks[0].n_cols: | |
raise ValueError(f"Vector length {vector.shape[0]} does not match matrix columns {chunks[0].n_cols}") | |
# Calculate result size based on the end row of the last chunk | |
result_size = max(chunk.end_row for chunk in chunks) | |
# Initialize result vector | |
result = np.zeros(result_size, dtype=vector.dtype) | |
# [T13] Direct computation through optimized pathways | |
for i, chunk in enumerate(chunks): | |
if verbose: | |
print(f"Processing chunk {i} with {chunk.nnz} non-zeros") | |
# [T5] Skip processing for empty chunks | |
if chunk.nnz == 0: | |
continue | |
# Multiply chunk with vector | |
chunk_result = chunk.process_with_vector(vector) | |
# [T10] Map results to output coordinates | |
result[chunk.start_row:chunk.end_row] = chunk_result | |
if verbose: | |
elapsed = time.time() - start_time | |
print(f"Direct multiplication completed in {elapsed:.4f}s") | |
return result | |
def process_matrix_file(storage_dir: str, | |
vector: np.ndarray, | |
verbose: bool = False) -> np.ndarray: | |
""" | |
Convenience function to load chunks from storage and multiply with vector. | |
Ref: T13 | |
Args: | |
storage_dir: Directory containing saved chunks | |
vector: Vector to multiply with | |
verbose: Whether to print debug information | |
Returns: | |
Result vector from the multiplication | |
""" | |
# [T13] Optimize data processing flow | |
chunks = load_chunks(storage_dir, verbose=verbose) | |
# Perform multiplication | |
return matrix_vector_multiply(chunks, vector, verbose=verbose) |