Spaces:
Sleeping
Sleeping
File size: 16,909 Bytes
3af4ccb 27fe713 3af4ccb |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 375 376 377 378 379 380 381 382 383 384 385 386 387 388 389 390 391 392 393 394 395 396 397 398 399 400 401 402 403 404 405 406 407 408 409 410 411 412 413 414 415 416 417 418 419 420 421 422 423 424 425 426 427 428 429 430 431 432 433 434 435 436 437 438 439 440 441 442 443 444 445 446 447 448 449 450 451 452 453 454 455 456 457 458 459 460 461 462 463 464 465 466 |
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any, Tuple
import asyncio
import uuid
import json
from datetime import datetime
from enum import Enum
app = FastAPI(title="Multiplayer Checkers Backend")
# CORS middleware for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "https://pachekers.netlify.app"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Game Models
class Player(int, Enum):
ONE = 1
TWO = 2
class PieceType(str, Enum):
REGULAR = "regular"
KING = "king"
class Piece(BaseModel):
player: Player
type: PieceType = PieceType.REGULAR
class Move(BaseModel):
from_pos: List[int] = [] # [row, col]
to: List[int] = [] # [row, col]
isJump: bool = False
jumpedPieceCoords: Optional[List[int]] = None
class GameMove(BaseModel):
player: Player
move: Move
class GameState(BaseModel):
board: List[List[Optional[Piece]]]
currentPlayer: Player
lastMove: Optional[Move] = None
winner: Optional[Player] = None
isMultiJumpPhase: bool = False
multiJumpPieceCoords: Optional[List[int]] = None
opponentDisconnected: bool = False
class MatchmakingRequest(BaseModel):
preferredPieceColor: Optional[str] = None
class MatchmakingCancel(BaseModel):
searchId: str
class LeaveGameRequest(BaseModel):
player: Player
# WebSocket Message Types
class WebSocketMessage(BaseModel):
type: str
payload: Optional[Dict[str, Any]] = None
# Global storage (in production, use Redis or database)
class GameManager:
def __init__(self):
self.games: Dict[str, GameState] = {}
self.game_connections: Dict[str, Dict[Player, WebSocket]] = {}
self.matchmaking_queue: List[str] = []
self.search_sessions: Dict[str, str] = {} # searchId -> playerId
self.player_searches: Dict[str, str] = {} # playerId -> searchId
self.pending_matches: Dict[str, Dict[str, Any]] = {} # searchId -> match details
def create_initial_board(self) -> List[List[Optional[Piece]]]:
"""Create the initial 8x8 checkers board"""
board = [[None for _ in range(8)] for _ in range(8)]
# Place Player.ONE pieces (bottom)
for row in range(5, 8):
for col in range(8):
if (row + col) % 2 == 1: # Dark squares only
board[row][col] = Piece(player=Player.ONE)
# Place Player.TWO pieces (top)
for row in range(0, 3):
for col in range(8):
if (row + col) % 2 == 1: # Dark squares only
board[row][col] = Piece(player=Player.TWO)
return board
def create_game(self, room_id: str) -> GameState:
"""Create a new game with initial state"""
game_state = GameState(
board=self.create_initial_board(),
currentPlayer=Player.ONE
)
self.games[room_id] = game_state
self.game_connections[room_id] = {}
return game_state
def is_valid_move(self, game_state: GameState, move: Move, player: Player) -> bool:
"""Basic move validation (simplified for this implementation)"""
from_row, from_col = move.from_pos
to_row, to_col = move.to
# Check if it's the player's turn
if game_state.currentPlayer != player:
return False
# Check bounds
if not (0 <= from_row < 8 and 0 <= from_col < 8 and 0 <= to_row < 8 and 0 <= to_col < 8):
return False
# Check if there's a piece at the from position belonging to the player
piece = game_state.board[from_row][from_col]
if not piece or piece.player != player:
return False
# Check if destination is empty
if game_state.board[to_row][to_col] is not None:
return False
# Check if move is to a dark square (valid checkers squares)
if (to_row + to_col) % 2 == 0:
return False
# Basic move distance validation
row_diff = abs(to_row - from_row)
col_diff = abs(to_col - from_col)
if row_diff != col_diff: # Must be diagonal
return False
if row_diff == 1: # Regular move
return True
elif row_diff == 2: # Jump move
# Check if there's an opponent piece to jump over
mid_row = (from_row + to_row) // 2
mid_col = (from_col + to_col) // 2
mid_piece = game_state.board[mid_row][mid_col]
if mid_piece and mid_piece.player != player:
move.isJump = True
move.jumpedPieceCoords = [mid_row, mid_col]
return True
return False
def apply_move(self, game_state: GameState, move: Move, player: Player):
"""Apply a move to the game state"""
from_row, from_col = move.from_pos
to_row, to_col = move.to
# Move the piece
piece = game_state.board[from_row][from_col]
game_state.board[from_row][from_col] = None
game_state.board[to_row][to_col] = piece
# Handle jump
if move.isJump and move.jumpedPieceCoords:
jump_row, jump_col = move.jumpedPieceCoords
game_state.board[jump_row][jump_col] = None
# Check for king promotion
if piece:
if (player == Player.ONE and to_row == 0) or (player == Player.TWO and to_row == 7):
piece.type = PieceType.KING
# Update game state
game_state.lastMove = move
game_state.currentPlayer = Player.TWO if player == Player.ONE else Player.ONE
# Check for winner (simplified - just check if opponent has no pieces)
self.check_winner(game_state)
def check_winner(self, game_state: GameState):
"""Check if there's a winner"""
player_one_pieces = 0
player_two_pieces = 0
for row in game_state.board:
for piece in row:
if piece:
if piece.player == Player.ONE:
player_one_pieces += 1
else:
player_two_pieces += 1
if player_one_pieces == 0:
game_state.winner = Player.TWO
elif player_two_pieces == 0:
game_state.winner = Player.ONE
async def broadcast_game_update(self, room_id: str):
"""Broadcast game state to all connected players"""
if room_id in self.game_connections:
game_state = self.games.get(room_id)
if game_state:
message = {
"type": "GAME_UPDATE",
"payload": game_state.dict()
}
disconnected = []
for player, websocket in self.game_connections[room_id].items():
try:
await websocket.send_text(json.dumps(message))
except:
disconnected.append(player)
# Clean up disconnected players
for player in disconnected:
del self.game_connections[room_id][player]
game_manager = GameManager()
# REST API Endpoints
@app.post("/api/matchmaking/find")
async def find_match(request: MatchmakingRequest = MatchmakingRequest()):
"""Find a match for multiplayer game"""
search_id = str(uuid.uuid4())
player_id = str(uuid.uuid4())
game_manager.search_sessions[search_id] = player_id
game_manager.player_searches[player_id] = search_id
if len(game_manager.matchmaking_queue) > 0:
# Match found with waiting player (this is the second player)
player1_player_id = game_manager.matchmaking_queue.pop(0) # First player from queue
room_id = str(uuid.uuid4())
# Create new game
game_manager.create_game(room_id)
# Assign roles
player1_role = Player.ONE # First player gets Player.ONE
player2_role = Player.TWO # Second player (current caller) gets Player.TWO
# Get the first player's search ID
player1_search_id = game_manager.player_searches.get(player1_player_id)
# Store match details for first player to be retrieved via polling
if player1_search_id:
game_manager.pending_matches[player1_search_id] = {
"roomId": room_id,
"assignedPlayer": player1_role.value
}
# Clean up first player's search sessions
del game_manager.search_sessions[player1_search_id]
del game_manager.player_searches[player1_player_id]
# Clean up second player's search sessions
del game_manager.search_sessions[search_id]
del game_manager.player_searches[player_id]
# Return match details immediately to second player
return {
"status": "matched",
"roomId": room_id,
"assignedPlayer": player2_role.value
}
else:
# Add to queue (this is the first player)
game_manager.matchmaking_queue.append(player_id)
return {
"status": "searching",
"searchId": search_id
}
@app.get("/api/matchmaking/check/{search_id}")
async def check_match_status(search_id: str):
"""Check if a match has been found for a specific search ID"""
# Check if match details are available for this search ID
if search_id in game_manager.pending_matches:
# Retrieve match details
match_details = game_manager.pending_matches[search_id]
# Remove the entry as it's a one-time fetch
del game_manager.pending_matches[search_id]
return {
"status": "matched",
"roomId": match_details["roomId"],
"assignedPlayer": match_details["assignedPlayer"]
}
else:
# Check if search is still valid/pending
if search_id in game_manager.search_sessions:
return {"status": "searching"}
else:
return {"status": "not_found"}
@app.post("/api/matchmaking/cancel")
async def cancel_matchmaking(request: MatchmakingCancel):
"""Cancel matchmaking search"""
search_id = request.searchId
if search_id in game_manager.search_sessions:
player_id = game_manager.search_sessions[search_id]
# Remove from queue if present
if player_id in game_manager.matchmaking_queue:
game_manager.matchmaking_queue.remove(player_id)
# Clean up search sessions
del game_manager.search_sessions[search_id]
if player_id in game_manager.player_searches:
del game_manager.player_searches[player_id]
return {"status": "cancelled"}
else:
raise HTTPException(status_code=404, detail="Search not found")
@app.post("/api/game/{room_id}/move")
async def make_move(room_id: str, move_request: GameMove):
"""Submit a player move"""
if room_id not in game_manager.games:
raise HTTPException(status_code=404, detail="Game room not found")
game_state = game_manager.games[room_id]
if not game_manager.is_valid_move(game_state, move_request.move, move_request.player):
raise HTTPException(status_code=400, detail="Invalid move")
# Apply the move
game_manager.apply_move(game_state, move_request.move, move_request.player)
# Broadcast update to connected players
await game_manager.broadcast_game_update(room_id)
return {"status": "success"}
@app.get("/api/game/{room_id}/state")
async def get_game_state(room_id: str, player: int):
"""Get current game state (polling method - WebSockets recommended)"""
if room_id not in game_manager.games:
raise HTTPException(status_code=404, detail="Game not found")
game_state = game_manager.games[room_id]
return game_state.dict()
@app.post("/api/game/{room_id}/leave")
async def leave_game(room_id: str, request: LeaveGameRequest):
"""Leave the game"""
if room_id not in game_manager.games:
raise HTTPException(status_code=404, detail="Game room not found")
game_state = game_manager.games[room_id]
# Set opponent as winner
game_state.winner = Player.TWO if request.player == Player.ONE else Player.ONE
game_state.opponentDisconnected = True
# Broadcast update
await game_manager.broadcast_game_update(room_id)
# Notify remaining player
if room_id in game_manager.game_connections:
for player, websocket in game_manager.game_connections[room_id].items():
if player != request.player:
try:
await websocket.send_text(json.dumps({
"type": "OPPONENT_LEFT"
}))
except:
pass
return {"status": "opponent_won_by_forfeit"}
# WebSocket Endpoints
@app.websocket("/ws/game/{room_id}/{player_id}")
async def websocket_game_endpoint(websocket: WebSocket, room_id: str, player_id: int):
"""WebSocket connection for real-time game updates"""
await websocket.accept()
player = Player(player_id)
# Add to connections
if room_id not in game_manager.game_connections:
game_manager.game_connections[room_id] = {}
game_manager.game_connections[room_id][player] = websocket
# Send initial game state
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
await websocket.send_text(json.dumps({
"type": "GAME_UPDATE",
"payload": game_state.dict()
}))
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "MAKE_MOVE":
try:
move_data = message["payload"]["move"]
move = Move(
from_pos=move_data["from_pos"],
to=move_data["to"],
isJump=move_data.get("isJump", False),
jumpedPieceCoords=move_data.get("jumpedPieceCoords")
)
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
if game_manager.is_valid_move(game_state, move, player):
game_manager.apply_move(game_state, move, player)
await game_manager.broadcast_game_update(room_id)
else:
await websocket.send_text(json.dumps({
"type": "ERROR",
"payload": {"message": "Invalid move"}
}))
except Exception as e:
await websocket.send_text(json.dumps({
"type": "ERROR",
"payload": {"message": f"Move processing error: {str(e)}"}
}))
elif message["type"] == "LEAVE_MATCH":
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
game_state.winner = Player.TWO if player == Player.ONE else Player.ONE
game_state.opponentDisconnected = True
await game_manager.broadcast_game_update(room_id)
break
except WebSocketDisconnect:
# Handle disconnect
if room_id in game_manager.game_connections and player in game_manager.game_connections[room_id]:
del game_manager.game_connections[room_id][player]
# Notify opponent of disconnect
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
game_state.opponentDisconnected = True
await game_manager.broadcast_game_update(room_id)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
# Root endpoint
@app.get("/")
async def root():
return {"message": "Multiplayer Checkers Backend API", "version": "1.0.0"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000) |