Spaces:
Sleeping
Sleeping
File size: 6,441 Bytes
30b652f |
|
"""
Minimal implementation of matrix-vector multiplication using FlexChunk format.
Includes only direct (single-process) multiplication and chunking functions.
Ref: T9, T10, T13
"""
import os
import math
import time
import numpy as np
import scipy.sparse as sparse
from typing import List, Optional
from flex_chunk import FlexChunk, save_chunk, load_chunk
def prepare_chunks(matrix: sparse.csr_matrix,
num_chunks: int,
storage_dir: str,
verbose: bool = False) -> List[str]:
"""
Prepare chunks from a sparse matrix for processing.
Ref: T4, T9
Args:
matrix: Sparse matrix to split into chunks
num_chunks: Number of chunks to create
storage_dir: Directory to store chunks
verbose: Whether to print debug information
Returns:
List of paths to the created chunks
"""
if not sparse.isspmatrix_csr(matrix):
matrix = matrix.tocsr()
# Ensure the storage directory exists
os.makedirs(storage_dir, exist_ok=True)
os.makedirs(os.path.join(storage_dir, "chunks"), exist_ok=True)
# [T9] Divide data into independent processing units
rows_per_chunk = max(1, math.ceil(matrix.shape[0] / num_chunks))
# Create and save chunks
chunk_paths = []
for i in range(num_chunks):
start_row = i * rows_per_chunk
end_row = min((i + 1) * rows_per_chunk, matrix.shape[0])
if start_row >= matrix.shape[0]:
break
# Extract the submatrix for this chunk
chunk_matrix = matrix[start_row:end_row, :]
# [T4] Preserve data structure in chunks
chunk = FlexChunk.from_csr_matrix(
matrix=chunk_matrix,
start_row=start_row,
end_row=end_row
)
# Save chunk to file
chunk_path = os.path.join(storage_dir, "chunks", f"chunk_{i}.bin")
save_chunk(chunk, chunk_path)
chunk_paths.append(chunk_path)
if verbose:
print(f"Created chunk {i}: rows {start_row}-{end_row}, nnz: {chunk.nnz}, saved to {chunk_path}")
# Also save matrix dimensions for later use
info_path = os.path.join(storage_dir, "matrix_info.npy")
np.save(info_path, np.array([matrix.shape[0], matrix.shape[1]], dtype=np.int64))
if verbose:
print(f"Matrix chunks prepared and saved to {storage_dir}")
print(f"Total chunks: {len(chunk_paths)}")
print(f"Matrix shape: {matrix.shape}")
return chunk_paths
def load_chunks(storage_dir: str, verbose: bool = False) -> List[FlexChunk]:
"""
Load precomputed chunks from storage directory.
Ref: T4, T13
Args:
storage_dir: Directory containing saved chunks
verbose: Whether to print debug information
Returns:
List of loaded FlexChunk objects
"""
chunks_dir = os.path.join(storage_dir, "chunks")
if not os.path.exists(chunks_dir):
raise ValueError(f"Chunks directory {chunks_dir} does not exist")
# Find all chunk files
chunk_files = sorted([f for f in os.listdir(chunks_dir) if f.startswith("chunk_") and f.endswith(".bin")],
key=lambda x: int(x.split('_')[1].split('.')[0]))
if not chunk_files:
raise ValueError(f"No chunk files found in {chunks_dir}")
# [T4] Restore structural representation from storage
chunks = []
for chunk_file in chunk_files:
chunk_path = os.path.join(chunks_dir, chunk_file)
chunks.append(load_chunk(chunk_path))
if verbose:
print(f"Loaded {len(chunks)} chunks from {storage_dir}")
print(f"Matrix shape: ({chunks[-1].end_row}, {chunks[0].n_cols})")
return chunks
def matrix_vector_multiply(chunks: List[FlexChunk],
vector: np.ndarray,
verbose: bool = False) -> np.ndarray:
"""
Multiply a sparse matrix with a vector using direct mode and precomputed chunks.
Ref: T5, T10, T13
Args:
chunks: List of FlexChunk objects representing the matrix
vector: Vector to multiply with
verbose: Whether to print debug information
Returns:
Result vector from the multiplication
"""
start_time = time.time()
if verbose:
print("Starting matrix-vector multiplication (direct mode)")
# Convert vector to numpy array if needed
vector = np.asarray(vector)
# Validate chunks
if not chunks:
raise ValueError("No chunks provided for multiplication")
# Check vector dimensions
if vector.shape[0] != chunks[0].n_cols:
raise ValueError(f"Vector length {vector.shape[0]} does not match matrix columns {chunks[0].n_cols}")
# Calculate result size based on the end row of the last chunk
result_size = max(chunk.end_row for chunk in chunks)
# Initialize result vector
result = np.zeros(result_size, dtype=vector.dtype)
# [T13] Direct computation through optimized pathways
for i, chunk in enumerate(chunks):
if verbose:
print(f"Processing chunk {i} with {chunk.nnz} non-zeros")
# [T5] Skip processing for empty chunks
if chunk.nnz == 0:
continue
# Multiply chunk with vector
chunk_result = chunk.process_with_vector(vector)
# [T10] Map results to output coordinates
result[chunk.start_row:chunk.end_row] = chunk_result
if verbose:
elapsed = time.time() - start_time
print(f"Direct multiplication completed in {elapsed:.4f}s")
return result
def process_matrix_file(storage_dir: str,
vector: np.ndarray,
verbose: bool = False) -> np.ndarray:
"""
Convenience function to load chunks from storage and multiply with vector.
Ref: T13
Args:
storage_dir: Directory containing saved chunks
vector: Vector to multiply with
verbose: Whether to print debug information
Returns:
Result vector from the multiplication
"""
# [T13] Optimize data processing flow
chunks = load_chunks(storage_dir, verbose=verbose)
# Perform multiplication
return matrix_vector_multiply(chunks, vector, verbose=verbose) |