Spaces:
Sleeping
Sleeping
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks | |
from fastapi.middleware.cors import CORSMiddleware | |
from pydantic import BaseModel | |
from typing import Optional, List, Dict, Any, Tuple | |
import asyncio | |
import uuid | |
import json | |
from datetime import datetime | |
from enum import Enum | |
app = FastAPI(title="Multiplayer Checkers Backend") | |
# CORS middleware for frontend integration | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["http://localhost:5173", "https://pachekers.netlify.app"], | |
allow_credentials=True, | |
allow_methods=["*"], | |
allow_headers=["*"], | |
) | |
# Game Models | |
class Player(int, Enum): | |
ONE = 1 | |
TWO = 2 | |
class PieceType(str, Enum): | |
REGULAR = "regular" | |
KING = "king" | |
class Piece(BaseModel): | |
player: Player | |
type: PieceType = PieceType.REGULAR | |
class Move(BaseModel): | |
from_pos: List[int] = [] # [row, col] | |
to: List[int] = [] # [row, col] | |
isJump: bool = False | |
jumpedPieceCoords: Optional[List[int]] = None | |
class GameMove(BaseModel): | |
player: Player | |
move: Move | |
class GameState(BaseModel): | |
board: List[List[Optional[Piece]]] | |
currentPlayer: Player | |
lastMove: Optional[Move] = None | |
winner: Optional[Player] = None | |
isMultiJumpPhase: bool = False | |
multiJumpPieceCoords: Optional[List[int]] = None | |
opponentDisconnected: bool = False | |
class MatchmakingRequest(BaseModel): | |
preferredPieceColor: Optional[str] = None | |
class MatchmakingCancel(BaseModel): | |
searchId: str | |
class LeaveGameRequest(BaseModel): | |
player: Player | |
# WebSocket Message Types | |
class WebSocketMessage(BaseModel): | |
type: str | |
payload: Optional[Dict[str, Any]] = None | |
# Global storage (in production, use Redis or database) | |
class GameManager: | |
def __init__(self): | |
self.games: Dict[str, GameState] = {} | |
self.game_connections: Dict[str, Dict[Player, WebSocket]] = {} | |
self.matchmaking_queue: List[str] = [] | |
self.search_sessions: Dict[str, str] = {} # searchId -> playerId | |
self.player_searches: Dict[str, str] = {} # playerId -> searchId | |
self.pending_matches: Dict[str, Dict[str, Any]] = {} # searchId -> match details | |
def create_initial_board(self) -> List[List[Optional[Piece]]]: | |
"""Create the initial 8x8 checkers board""" | |
board = [[None for _ in range(8)] for _ in range(8)] | |
# Place Player.ONE pieces (bottom) | |
for row in range(5, 8): | |
for col in range(8): | |
if (row + col) % 2 == 1: # Dark squares only | |
board[row][col] = Piece(player=Player.ONE) | |
# Place Player.TWO pieces (top) | |
for row in range(0, 3): | |
for col in range(8): | |
if (row + col) % 2 == 1: # Dark squares only | |
board[row][col] = Piece(player=Player.TWO) | |
return board | |
def create_game(self, room_id: str) -> GameState: | |
"""Create a new game with initial state""" | |
game_state = GameState( | |
board=self.create_initial_board(), | |
currentPlayer=Player.ONE | |
) | |
self.games[room_id] = game_state | |
self.game_connections[room_id] = {} | |
return game_state | |
def is_valid_move(self, game_state: GameState, move: Move, player: Player) -> bool: | |
"""Basic move validation (simplified for this implementation)""" | |
from_row, from_col = move.from_pos | |
to_row, to_col = move.to | |
# Check if it's the player's turn | |
if game_state.currentPlayer != player: | |
return False | |
# Check bounds | |
if not (0 <= from_row < 8 and 0 <= from_col < 8 and 0 <= to_row < 8 and 0 <= to_col < 8): | |
return False | |
# Check if there's a piece at the from position belonging to the player | |
piece = game_state.board[from_row][from_col] | |
if not piece or piece.player != player: | |
return False | |
# Check if destination is empty | |
if game_state.board[to_row][to_col] is not None: | |
return False | |
# Check if move is to a dark square (valid checkers squares) | |
if (to_row + to_col) % 2 == 0: | |
return False | |
# Basic move distance validation | |
row_diff = abs(to_row - from_row) | |
col_diff = abs(to_col - from_col) | |
if row_diff != col_diff: # Must be diagonal | |
return False | |
if row_diff == 1: # Regular move | |
return True | |
elif row_diff == 2: # Jump move | |
# Check if there's an opponent piece to jump over | |
mid_row = (from_row + to_row) // 2 | |
mid_col = (from_col + to_col) // 2 | |
mid_piece = game_state.board[mid_row][mid_col] | |
if mid_piece and mid_piece.player != player: | |
move.isJump = True | |
move.jumpedPieceCoords = [mid_row, mid_col] | |
return True | |
return False | |
def apply_move(self, game_state: GameState, move: Move, player: Player): | |
"""Apply a move to the game state""" | |
from_row, from_col = move.from_pos | |
to_row, to_col = move.to | |
# Move the piece | |
piece = game_state.board[from_row][from_col] | |
game_state.board[from_row][from_col] = None | |
game_state.board[to_row][to_col] = piece | |
# Handle jump | |
if move.isJump and move.jumpedPieceCoords: | |
jump_row, jump_col = move.jumpedPieceCoords | |
game_state.board[jump_row][jump_col] = None | |
# Check for king promotion | |
if piece: | |
if (player == Player.ONE and to_row == 0) or (player == Player.TWO and to_row == 7): | |
piece.type = PieceType.KING | |
# Update game state | |
game_state.lastMove = move | |
game_state.currentPlayer = Player.TWO if player == Player.ONE else Player.ONE | |
# Check for winner (simplified - just check if opponent has no pieces) | |
self.check_winner(game_state) | |
def check_winner(self, game_state: GameState): | |
"""Check if there's a winner""" | |
player_one_pieces = 0 | |
player_two_pieces = 0 | |
for row in game_state.board: | |
for piece in row: | |
if piece: | |
if piece.player == Player.ONE: | |
player_one_pieces += 1 | |
else: | |
player_two_pieces += 1 | |
if player_one_pieces == 0: | |
game_state.winner = Player.TWO | |
elif player_two_pieces == 0: | |
game_state.winner = Player.ONE | |
async def broadcast_game_update(self, room_id: str): | |
"""Broadcast game state to all connected players""" | |
if room_id in self.game_connections: | |
game_state = self.games.get(room_id) | |
if game_state: | |
message = { | |
"type": "GAME_UPDATE", | |
"payload": game_state.dict() | |
} | |
disconnected = [] | |
for player, websocket in self.game_connections[room_id].items(): | |
try: | |
await websocket.send_text(json.dumps(message)) | |
except: | |
disconnected.append(player) | |
# Clean up disconnected players | |
for player in disconnected: | |
del self.game_connections[room_id][player] | |
game_manager = GameManager() | |
# REST API Endpoints | |
async def find_match(request: MatchmakingRequest = MatchmakingRequest()): | |
"""Find a match for multiplayer game""" | |
search_id = str(uuid.uuid4()) | |
player_id = str(uuid.uuid4()) | |
game_manager.search_sessions[search_id] = player_id | |
game_manager.player_searches[player_id] = search_id | |
if len(game_manager.matchmaking_queue) > 0: | |
# Match found with waiting player (this is the second player) | |
player1_player_id = game_manager.matchmaking_queue.pop(0) # First player from queue | |
room_id = str(uuid.uuid4()) | |
# Create new game | |
game_manager.create_game(room_id) | |
# Assign roles | |
player1_role = Player.ONE # First player gets Player.ONE | |
player2_role = Player.TWO # Second player (current caller) gets Player.TWO | |
# Get the first player's search ID | |
player1_search_id = game_manager.player_searches.get(player1_player_id) | |
# Store match details for first player to be retrieved via polling | |
if player1_search_id: | |
game_manager.pending_matches[player1_search_id] = { | |
"roomId": room_id, | |
"assignedPlayer": player1_role.value | |
} | |
# Clean up first player's search sessions | |
del game_manager.search_sessions[player1_search_id] | |
del game_manager.player_searches[player1_player_id] | |
# Clean up second player's search sessions | |
del game_manager.search_sessions[search_id] | |
del game_manager.player_searches[player_id] | |
# Return match details immediately to second player | |
return { | |
"status": "matched", | |
"roomId": room_id, | |
"assignedPlayer": player2_role.value | |
} | |
else: | |
# Add to queue (this is the first player) | |
game_manager.matchmaking_queue.append(player_id) | |
return { | |
"status": "searching", | |
"searchId": search_id | |
} | |
async def check_match_status(search_id: str): | |
"""Check if a match has been found for a specific search ID""" | |
# Check if match details are available for this search ID | |
if search_id in game_manager.pending_matches: | |
# Retrieve match details | |
match_details = game_manager.pending_matches[search_id] | |
# Remove the entry as it's a one-time fetch | |
del game_manager.pending_matches[search_id] | |
return { | |
"status": "matched", | |
"roomId": match_details["roomId"], | |
"assignedPlayer": match_details["assignedPlayer"] | |
} | |
else: | |
# Check if search is still valid/pending | |
if search_id in game_manager.search_sessions: | |
return {"status": "searching"} | |
else: | |
return {"status": "not_found"} | |
async def cancel_matchmaking(request: MatchmakingCancel): | |
"""Cancel matchmaking search""" | |
search_id = request.searchId | |
if search_id in game_manager.search_sessions: | |
player_id = game_manager.search_sessions[search_id] | |
# Remove from queue if present | |
if player_id in game_manager.matchmaking_queue: | |
game_manager.matchmaking_queue.remove(player_id) | |
# Clean up search sessions | |
del game_manager.search_sessions[search_id] | |
if player_id in game_manager.player_searches: | |
del game_manager.player_searches[player_id] | |
return {"status": "cancelled"} | |
else: | |
raise HTTPException(status_code=404, detail="Search not found") | |
async def make_move(room_id: str, move_request: GameMove): | |
"""Submit a player move""" | |
if room_id not in game_manager.games: | |
raise HTTPException(status_code=404, detail="Game room not found") | |
game_state = game_manager.games[room_id] | |
if not game_manager.is_valid_move(game_state, move_request.move, move_request.player): | |
raise HTTPException(status_code=400, detail="Invalid move") | |
# Apply the move | |
game_manager.apply_move(game_state, move_request.move, move_request.player) | |
# Broadcast update to connected players | |
await game_manager.broadcast_game_update(room_id) | |
return {"status": "success"} | |
async def get_game_state(room_id: str, player: int): | |
"""Get current game state (polling method - WebSockets recommended)""" | |
if room_id not in game_manager.games: | |
raise HTTPException(status_code=404, detail="Game not found") | |
game_state = game_manager.games[room_id] | |
return game_state.dict() | |
async def leave_game(room_id: str, request: LeaveGameRequest): | |
"""Leave the game""" | |
if room_id not in game_manager.games: | |
raise HTTPException(status_code=404, detail="Game room not found") | |
game_state = game_manager.games[room_id] | |
# Set opponent as winner | |
game_state.winner = Player.TWO if request.player == Player.ONE else Player.ONE | |
game_state.opponentDisconnected = True | |
# Broadcast update | |
await game_manager.broadcast_game_update(room_id) | |
# Notify remaining player | |
if room_id in game_manager.game_connections: | |
for player, websocket in game_manager.game_connections[room_id].items(): | |
if player != request.player: | |
try: | |
await websocket.send_text(json.dumps({ | |
"type": "OPPONENT_LEFT" | |
})) | |
except: | |
pass | |
return {"status": "opponent_won_by_forfeit"} | |
# WebSocket Endpoints | |
async def websocket_game_endpoint(websocket: WebSocket, room_id: str, player_id: int): | |
"""WebSocket connection for real-time game updates""" | |
await websocket.accept() | |
player = Player(player_id) | |
# Add to connections | |
if room_id not in game_manager.game_connections: | |
game_manager.game_connections[room_id] = {} | |
game_manager.game_connections[room_id][player] = websocket | |
# Send initial game state | |
if room_id in game_manager.games: | |
game_state = game_manager.games[room_id] | |
await websocket.send_text(json.dumps({ | |
"type": "GAME_UPDATE", | |
"payload": game_state.dict() | |
})) | |
try: | |
while True: | |
# Receive message from client | |
data = await websocket.receive_text() | |
message = json.loads(data) | |
if message["type"] == "MAKE_MOVE": | |
try: | |
move_data = message["payload"]["move"] | |
move = Move( | |
from_pos=move_data["from_pos"], | |
to=move_data["to"], | |
isJump=move_data.get("isJump", False), | |
jumpedPieceCoords=move_data.get("jumpedPieceCoords") | |
) | |
if room_id in game_manager.games: | |
game_state = game_manager.games[room_id] | |
if game_manager.is_valid_move(game_state, move, player): | |
game_manager.apply_move(game_state, move, player) | |
await game_manager.broadcast_game_update(room_id) | |
else: | |
await websocket.send_text(json.dumps({ | |
"type": "ERROR", | |
"payload": {"message": "Invalid move"} | |
})) | |
except Exception as e: | |
await websocket.send_text(json.dumps({ | |
"type": "ERROR", | |
"payload": {"message": f"Move processing error: {str(e)}"} | |
})) | |
elif message["type"] == "LEAVE_MATCH": | |
if room_id in game_manager.games: | |
game_state = game_manager.games[room_id] | |
game_state.winner = Player.TWO if player == Player.ONE else Player.ONE | |
game_state.opponentDisconnected = True | |
await game_manager.broadcast_game_update(room_id) | |
break | |
except WebSocketDisconnect: | |
# Handle disconnect | |
if room_id in game_manager.game_connections and player in game_manager.game_connections[room_id]: | |
del game_manager.game_connections[room_id][player] | |
# Notify opponent of disconnect | |
if room_id in game_manager.games: | |
game_state = game_manager.games[room_id] | |
game_state.opponentDisconnected = True | |
await game_manager.broadcast_game_update(room_id) | |
# Health check endpoint | |
async def health_check(): | |
return {"status": "healthy", "timestamp": datetime.now().isoformat()} | |
# Root endpoint | |
async def root(): | |
return {"message": "Multiplayer Checkers Backend API", "version": "1.0.0"} | |
if __name__ == "__main__": | |
import uvicorn | |
uvicorn.run(app, host="0.0.0.0", port=8000) |