Spaces:
Sleeping
Sleeping
File size: 6,441 Bytes
30b652f |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 |
"""
Minimal implementation of matrix-vector multiplication using FlexChunk format.
Includes only direct (single-process) multiplication and chunking functions.
Ref: T9, T10, T13
"""
import os
import math
import time
import numpy as np
import scipy.sparse as sparse
from typing import List, Optional
from flex_chunk import FlexChunk, save_chunk, load_chunk
def prepare_chunks(matrix: sparse.csr_matrix,
num_chunks: int,
storage_dir: str,
verbose: bool = False) -> List[str]:
"""
Prepare chunks from a sparse matrix for processing.
Ref: T4, T9
Args:
matrix: Sparse matrix to split into chunks
num_chunks: Number of chunks to create
storage_dir: Directory to store chunks
verbose: Whether to print debug information
Returns:
List of paths to the created chunks
"""
if not sparse.isspmatrix_csr(matrix):
matrix = matrix.tocsr()
# Ensure the storage directory exists
os.makedirs(storage_dir, exist_ok=True)
os.makedirs(os.path.join(storage_dir, "chunks"), exist_ok=True)
# [T9] Divide data into independent processing units
rows_per_chunk = max(1, math.ceil(matrix.shape[0] / num_chunks))
# Create and save chunks
chunk_paths = []
for i in range(num_chunks):
start_row = i * rows_per_chunk
end_row = min((i + 1) * rows_per_chunk, matrix.shape[0])
if start_row >= matrix.shape[0]:
break
# Extract the submatrix for this chunk
chunk_matrix = matrix[start_row:end_row, :]
# [T4] Preserve data structure in chunks
chunk = FlexChunk.from_csr_matrix(
matrix=chunk_matrix,
start_row=start_row,
end_row=end_row
)
# Save chunk to file
chunk_path = os.path.join(storage_dir, "chunks", f"chunk_{i}.bin")
save_chunk(chunk, chunk_path)
chunk_paths.append(chunk_path)
if verbose:
print(f"Created chunk {i}: rows {start_row}-{end_row}, nnz: {chunk.nnz}, saved to {chunk_path}")
# Also save matrix dimensions for later use
info_path = os.path.join(storage_dir, "matrix_info.npy")
np.save(info_path, np.array([matrix.shape[0], matrix.shape[1]], dtype=np.int64))
if verbose:
print(f"Matrix chunks prepared and saved to {storage_dir}")
print(f"Total chunks: {len(chunk_paths)}")
print(f"Matrix shape: {matrix.shape}")
return chunk_paths
def load_chunks(storage_dir: str, verbose: bool = False) -> List[FlexChunk]:
"""
Load precomputed chunks from storage directory.
Ref: T4, T13
Args:
storage_dir: Directory containing saved chunks
verbose: Whether to print debug information
Returns:
List of loaded FlexChunk objects
"""
chunks_dir = os.path.join(storage_dir, "chunks")
if not os.path.exists(chunks_dir):
raise ValueError(f"Chunks directory {chunks_dir} does not exist")
# Find all chunk files
chunk_files = sorted([f for f in os.listdir(chunks_dir) if f.startswith("chunk_") and f.endswith(".bin")],
key=lambda x: int(x.split('_')[1].split('.')[0]))
if not chunk_files:
raise ValueError(f"No chunk files found in {chunks_dir}")
# [T4] Restore structural representation from storage
chunks = []
for chunk_file in chunk_files:
chunk_path = os.path.join(chunks_dir, chunk_file)
chunks.append(load_chunk(chunk_path))
if verbose:
print(f"Loaded {len(chunks)} chunks from {storage_dir}")
print(f"Matrix shape: ({chunks[-1].end_row}, {chunks[0].n_cols})")
return chunks
def matrix_vector_multiply(chunks: List[FlexChunk],
vector: np.ndarray,
verbose: bool = False) -> np.ndarray:
"""
Multiply a sparse matrix with a vector using direct mode and precomputed chunks.
Ref: T5, T10, T13
Args:
chunks: List of FlexChunk objects representing the matrix
vector: Vector to multiply with
verbose: Whether to print debug information
Returns:
Result vector from the multiplication
"""
start_time = time.time()
if verbose:
print("Starting matrix-vector multiplication (direct mode)")
# Convert vector to numpy array if needed
vector = np.asarray(vector)
# Validate chunks
if not chunks:
raise ValueError("No chunks provided for multiplication")
# Check vector dimensions
if vector.shape[0] != chunks[0].n_cols:
raise ValueError(f"Vector length {vector.shape[0]} does not match matrix columns {chunks[0].n_cols}")
# Calculate result size based on the end row of the last chunk
result_size = max(chunk.end_row for chunk in chunks)
# Initialize result vector
result = np.zeros(result_size, dtype=vector.dtype)
# [T13] Direct computation through optimized pathways
for i, chunk in enumerate(chunks):
if verbose:
print(f"Processing chunk {i} with {chunk.nnz} non-zeros")
# [T5] Skip processing for empty chunks
if chunk.nnz == 0:
continue
# Multiply chunk with vector
chunk_result = chunk.process_with_vector(vector)
# [T10] Map results to output coordinates
result[chunk.start_row:chunk.end_row] = chunk_result
if verbose:
elapsed = time.time() - start_time
print(f"Direct multiplication completed in {elapsed:.4f}s")
return result
def process_matrix_file(storage_dir: str,
vector: np.ndarray,
verbose: bool = False) -> np.ndarray:
"""
Convenience function to load chunks from storage and multiply with vector.
Ref: T13
Args:
storage_dir: Directory containing saved chunks
vector: Vector to multiply with
verbose: Whether to print debug information
Returns:
Result vector from the multiplication
"""
# [T13] Optimize data processing flow
chunks = load_chunks(storage_dir, verbose=verbose)
# Perform multiplication
return matrix_vector_multiply(chunks, vector, verbose=verbose) |