from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, List, Dict, Any, Tuple import asyncio import uuid import json from datetime import datetime from enum import Enum app = FastAPI(title="Multiplayer Checkers Backend") # CORS middleware for frontend integration app.add_middleware( CORSMiddleware, allow_origins=["http://localhost:5173", "https://pachekers.netlify.app"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Game Models class Player(int, Enum): ONE = 1 TWO = 2 class PieceType(str, Enum): REGULAR = "regular" KING = "king" class Piece(BaseModel): player: Player type: PieceType = PieceType.REGULAR class Move(BaseModel): from_pos: List[int] = [] # [row, col] to: List[int] = [] # [row, col] isJump: bool = False jumpedPieceCoords: Optional[List[int]] = None class GameMove(BaseModel): player: Player move: Move class GameState(BaseModel): board: List[List[Optional[Piece]]] currentPlayer: Player lastMove: Optional[Move] = None winner: Optional[Player] = None isMultiJumpPhase: bool = False multiJumpPieceCoords: Optional[List[int]] = None opponentDisconnected: bool = False class MatchmakingRequest(BaseModel): preferredPieceColor: Optional[str] = None class MatchmakingCancel(BaseModel): searchId: str class LeaveGameRequest(BaseModel): player: Player # WebSocket Message Types class WebSocketMessage(BaseModel): type: str payload: Optional[Dict[str, Any]] = None # Global storage (in production, use Redis or database) class GameManager: def __init__(self): self.games: Dict[str, GameState] = {} self.game_connections: Dict[str, Dict[Player, WebSocket]] = {} self.matchmaking_queue: List[str] = [] self.search_sessions: Dict[str, str] = {} # searchId -> playerId self.player_searches: Dict[str, str] = {} # playerId -> searchId self.pending_matches: Dict[str, Dict[str, Any]] = {} # searchId -> match details def create_initial_board(self) -> List[List[Optional[Piece]]]: """Create the initial 8x8 checkers board""" board = [[None for _ in range(8)] for _ in range(8)] # Place Player.ONE pieces (bottom) for row in range(5, 8): for col in range(8): if (row + col) % 2 == 1: # Dark squares only board[row][col] = Piece(player=Player.ONE) # Place Player.TWO pieces (top) for row in range(0, 3): for col in range(8): if (row + col) % 2 == 1: # Dark squares only board[row][col] = Piece(player=Player.TWO) return board def create_game(self, room_id: str) -> GameState: """Create a new game with initial state""" game_state = GameState( board=self.create_initial_board(), currentPlayer=Player.ONE ) self.games[room_id] = game_state self.game_connections[room_id] = {} return game_state def is_valid_move(self, game_state: GameState, move: Move, player: Player) -> bool: """Basic move validation (simplified for this implementation)""" from_row, from_col = move.from_pos to_row, to_col = move.to # Check if it's the player's turn if game_state.currentPlayer != player: return False # Check bounds if not (0 <= from_row < 8 and 0 <= from_col < 8 and 0 <= to_row < 8 and 0 <= to_col < 8): return False # Check if there's a piece at the from position belonging to the player piece = game_state.board[from_row][from_col] if not piece or piece.player != player: return False # Check if destination is empty if game_state.board[to_row][to_col] is not None: return False # Check if move is to a dark square (valid checkers squares) if (to_row + to_col) % 2 == 0: return False # Basic move distance validation row_diff = abs(to_row - from_row) col_diff = abs(to_col - from_col) if row_diff != col_diff: # Must be diagonal return False if row_diff == 1: # Regular move return True elif row_diff == 2: # Jump move # Check if there's an opponent piece to jump over mid_row = (from_row + to_row) // 2 mid_col = (from_col + to_col) // 2 mid_piece = game_state.board[mid_row][mid_col] if mid_piece and mid_piece.player != player: move.isJump = True move.jumpedPieceCoords = [mid_row, mid_col] return True return False def apply_move(self, game_state: GameState, move: Move, player: Player): """Apply a move to the game state""" from_row, from_col = move.from_pos to_row, to_col = move.to # Move the piece piece = game_state.board[from_row][from_col] game_state.board[from_row][from_col] = None game_state.board[to_row][to_col] = piece # Handle jump if move.isJump and move.jumpedPieceCoords: jump_row, jump_col = move.jumpedPieceCoords game_state.board[jump_row][jump_col] = None # Check for king promotion if piece: if (player == Player.ONE and to_row == 0) or (player == Player.TWO and to_row == 7): piece.type = PieceType.KING # Update game state game_state.lastMove = move game_state.currentPlayer = Player.TWO if player == Player.ONE else Player.ONE # Check for winner (simplified - just check if opponent has no pieces) self.check_winner(game_state) def check_winner(self, game_state: GameState): """Check if there's a winner""" player_one_pieces = 0 player_two_pieces = 0 for row in game_state.board: for piece in row: if piece: if piece.player == Player.ONE: player_one_pieces += 1 else: player_two_pieces += 1 if player_one_pieces == 0: game_state.winner = Player.TWO elif player_two_pieces == 0: game_state.winner = Player.ONE async def broadcast_game_update(self, room_id: str): """Broadcast game state to all connected players""" if room_id in self.game_connections: game_state = self.games.get(room_id) if game_state: message = { "type": "GAME_UPDATE", "payload": game_state.dict() } disconnected = [] for player, websocket in self.game_connections[room_id].items(): try: await websocket.send_text(json.dumps(message)) except: disconnected.append(player) # Clean up disconnected players for player in disconnected: del self.game_connections[room_id][player] game_manager = GameManager() # REST API Endpoints @app.post("/api/matchmaking/find") async def find_match(request: MatchmakingRequest = MatchmakingRequest()): """Find a match for multiplayer game""" search_id = str(uuid.uuid4()) player_id = str(uuid.uuid4()) game_manager.search_sessions[search_id] = player_id game_manager.player_searches[player_id] = search_id if len(game_manager.matchmaking_queue) > 0: # Match found with waiting player (this is the second player) player1_player_id = game_manager.matchmaking_queue.pop(0) # First player from queue room_id = str(uuid.uuid4()) # Create new game game_manager.create_game(room_id) # Assign roles player1_role = Player.ONE # First player gets Player.ONE player2_role = Player.TWO # Second player (current caller) gets Player.TWO # Get the first player's search ID player1_search_id = game_manager.player_searches.get(player1_player_id) # Store match details for first player to be retrieved via polling if player1_search_id: game_manager.pending_matches[player1_search_id] = { "roomId": room_id, "assignedPlayer": player1_role.value } # Clean up first player's search sessions del game_manager.search_sessions[player1_search_id] del game_manager.player_searches[player1_player_id] # Clean up second player's search sessions del game_manager.search_sessions[search_id] del game_manager.player_searches[player_id] # Return match details immediately to second player return { "status": "matched", "roomId": room_id, "assignedPlayer": player2_role.value } else: # Add to queue (this is the first player) game_manager.matchmaking_queue.append(player_id) return { "status": "searching", "searchId": search_id } @app.get("/api/matchmaking/check/{search_id}") async def check_match_status(search_id: str): """Check if a match has been found for a specific search ID""" # Check if match details are available for this search ID if search_id in game_manager.pending_matches: # Retrieve match details match_details = game_manager.pending_matches[search_id] # Remove the entry as it's a one-time fetch del game_manager.pending_matches[search_id] return { "status": "matched", "roomId": match_details["roomId"], "assignedPlayer": match_details["assignedPlayer"] } else: # Check if search is still valid/pending if search_id in game_manager.search_sessions: return {"status": "searching"} else: return {"status": "not_found"} @app.post("/api/matchmaking/cancel") async def cancel_matchmaking(request: MatchmakingCancel): """Cancel matchmaking search""" search_id = request.searchId if search_id in game_manager.search_sessions: player_id = game_manager.search_sessions[search_id] # Remove from queue if present if player_id in game_manager.matchmaking_queue: game_manager.matchmaking_queue.remove(player_id) # Clean up search sessions del game_manager.search_sessions[search_id] if player_id in game_manager.player_searches: del game_manager.player_searches[player_id] return {"status": "cancelled"} else: raise HTTPException(status_code=404, detail="Search not found") @app.post("/api/game/{room_id}/move") async def make_move(room_id: str, move_request: GameMove): """Submit a player move""" if room_id not in game_manager.games: raise HTTPException(status_code=404, detail="Game room not found") game_state = game_manager.games[room_id] if not game_manager.is_valid_move(game_state, move_request.move, move_request.player): raise HTTPException(status_code=400, detail="Invalid move") # Apply the move game_manager.apply_move(game_state, move_request.move, move_request.player) # Broadcast update to connected players await game_manager.broadcast_game_update(room_id) return {"status": "success"} @app.get("/api/game/{room_id}/state") async def get_game_state(room_id: str, player: int): """Get current game state (polling method - WebSockets recommended)""" if room_id not in game_manager.games: raise HTTPException(status_code=404, detail="Game not found") game_state = game_manager.games[room_id] return game_state.dict() @app.post("/api/game/{room_id}/leave") async def leave_game(room_id: str, request: LeaveGameRequest): """Leave the game""" if room_id not in game_manager.games: raise HTTPException(status_code=404, detail="Game room not found") game_state = game_manager.games[room_id] # Set opponent as winner game_state.winner = Player.TWO if request.player == Player.ONE else Player.ONE game_state.opponentDisconnected = True # Broadcast update await game_manager.broadcast_game_update(room_id) # Notify remaining player if room_id in game_manager.game_connections: for player, websocket in game_manager.game_connections[room_id].items(): if player != request.player: try: await websocket.send_text(json.dumps({ "type": "OPPONENT_LEFT" })) except: pass return {"status": "opponent_won_by_forfeit"} # WebSocket Endpoints @app.websocket("/ws/game/{room_id}/{player_id}") async def websocket_game_endpoint(websocket: WebSocket, room_id: str, player_id: int): """WebSocket connection for real-time game updates""" await websocket.accept() player = Player(player_id) # Add to connections if room_id not in game_manager.game_connections: game_manager.game_connections[room_id] = {} game_manager.game_connections[room_id][player] = websocket # Send initial game state if room_id in game_manager.games: game_state = game_manager.games[room_id] await websocket.send_text(json.dumps({ "type": "GAME_UPDATE", "payload": game_state.dict() })) try: while True: # Receive message from client data = await websocket.receive_text() message = json.loads(data) if message["type"] == "MAKE_MOVE": try: move_data = message["payload"]["move"] move = Move( from_pos=move_data["from_pos"], to=move_data["to"], isJump=move_data.get("isJump", False), jumpedPieceCoords=move_data.get("jumpedPieceCoords") ) if room_id in game_manager.games: game_state = game_manager.games[room_id] if game_manager.is_valid_move(game_state, move, player): game_manager.apply_move(game_state, move, player) await game_manager.broadcast_game_update(room_id) else: await websocket.send_text(json.dumps({ "type": "ERROR", "payload": {"message": "Invalid move"} })) except Exception as e: await websocket.send_text(json.dumps({ "type": "ERROR", "payload": {"message": f"Move processing error: {str(e)}"} })) elif message["type"] == "LEAVE_MATCH": if room_id in game_manager.games: game_state = game_manager.games[room_id] game_state.winner = Player.TWO if player == Player.ONE else Player.ONE game_state.opponentDisconnected = True await game_manager.broadcast_game_update(room_id) break except WebSocketDisconnect: # Handle disconnect if room_id in game_manager.game_connections and player in game_manager.game_connections[room_id]: del game_manager.game_connections[room_id][player] # Notify opponent of disconnect if room_id in game_manager.games: game_state = game_manager.games[room_id] game_state.opponentDisconnected = True await game_manager.broadcast_game_update(room_id) # Health check endpoint @app.get("/health") async def health_check(): return {"status": "healthy", "timestamp": datetime.now().isoformat()} # Root endpoint @app.get("/") async def root(): return {"message": "Multiplayer Checkers Backend API", "version": "1.0.0"} if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)