Chandima Prabhath
patch
27fe713
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, List, Dict, Any, Tuple
import asyncio
import uuid
import json
from datetime import datetime
from enum import Enum
app = FastAPI(title="Multiplayer Checkers Backend")
# CORS middleware for frontend integration
app.add_middleware(
CORSMiddleware,
allow_origins=["http://localhost:5173", "https://pachekers.netlify.app"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Game Models
class Player(int, Enum):
ONE = 1
TWO = 2
class PieceType(str, Enum):
REGULAR = "regular"
KING = "king"
class Piece(BaseModel):
player: Player
type: PieceType = PieceType.REGULAR
class Move(BaseModel):
from_pos: List[int] = [] # [row, col]
to: List[int] = [] # [row, col]
isJump: bool = False
jumpedPieceCoords: Optional[List[int]] = None
class GameMove(BaseModel):
player: Player
move: Move
class GameState(BaseModel):
board: List[List[Optional[Piece]]]
currentPlayer: Player
lastMove: Optional[Move] = None
winner: Optional[Player] = None
isMultiJumpPhase: bool = False
multiJumpPieceCoords: Optional[List[int]] = None
opponentDisconnected: bool = False
class MatchmakingRequest(BaseModel):
preferredPieceColor: Optional[str] = None
class MatchmakingCancel(BaseModel):
searchId: str
class LeaveGameRequest(BaseModel):
player: Player
# WebSocket Message Types
class WebSocketMessage(BaseModel):
type: str
payload: Optional[Dict[str, Any]] = None
# Global storage (in production, use Redis or database)
class GameManager:
def __init__(self):
self.games: Dict[str, GameState] = {}
self.game_connections: Dict[str, Dict[Player, WebSocket]] = {}
self.matchmaking_queue: List[str] = []
self.search_sessions: Dict[str, str] = {} # searchId -> playerId
self.player_searches: Dict[str, str] = {} # playerId -> searchId
self.pending_matches: Dict[str, Dict[str, Any]] = {} # searchId -> match details
def create_initial_board(self) -> List[List[Optional[Piece]]]:
"""Create the initial 8x8 checkers board"""
board = [[None for _ in range(8)] for _ in range(8)]
# Place Player.ONE pieces (bottom)
for row in range(5, 8):
for col in range(8):
if (row + col) % 2 == 1: # Dark squares only
board[row][col] = Piece(player=Player.ONE)
# Place Player.TWO pieces (top)
for row in range(0, 3):
for col in range(8):
if (row + col) % 2 == 1: # Dark squares only
board[row][col] = Piece(player=Player.TWO)
return board
def create_game(self, room_id: str) -> GameState:
"""Create a new game with initial state"""
game_state = GameState(
board=self.create_initial_board(),
currentPlayer=Player.ONE
)
self.games[room_id] = game_state
self.game_connections[room_id] = {}
return game_state
def is_valid_move(self, game_state: GameState, move: Move, player: Player) -> bool:
"""Basic move validation (simplified for this implementation)"""
from_row, from_col = move.from_pos
to_row, to_col = move.to
# Check if it's the player's turn
if game_state.currentPlayer != player:
return False
# Check bounds
if not (0 <= from_row < 8 and 0 <= from_col < 8 and 0 <= to_row < 8 and 0 <= to_col < 8):
return False
# Check if there's a piece at the from position belonging to the player
piece = game_state.board[from_row][from_col]
if not piece or piece.player != player:
return False
# Check if destination is empty
if game_state.board[to_row][to_col] is not None:
return False
# Check if move is to a dark square (valid checkers squares)
if (to_row + to_col) % 2 == 0:
return False
# Basic move distance validation
row_diff = abs(to_row - from_row)
col_diff = abs(to_col - from_col)
if row_diff != col_diff: # Must be diagonal
return False
if row_diff == 1: # Regular move
return True
elif row_diff == 2: # Jump move
# Check if there's an opponent piece to jump over
mid_row = (from_row + to_row) // 2
mid_col = (from_col + to_col) // 2
mid_piece = game_state.board[mid_row][mid_col]
if mid_piece and mid_piece.player != player:
move.isJump = True
move.jumpedPieceCoords = [mid_row, mid_col]
return True
return False
def apply_move(self, game_state: GameState, move: Move, player: Player):
"""Apply a move to the game state"""
from_row, from_col = move.from_pos
to_row, to_col = move.to
# Move the piece
piece = game_state.board[from_row][from_col]
game_state.board[from_row][from_col] = None
game_state.board[to_row][to_col] = piece
# Handle jump
if move.isJump and move.jumpedPieceCoords:
jump_row, jump_col = move.jumpedPieceCoords
game_state.board[jump_row][jump_col] = None
# Check for king promotion
if piece:
if (player == Player.ONE and to_row == 0) or (player == Player.TWO and to_row == 7):
piece.type = PieceType.KING
# Update game state
game_state.lastMove = move
game_state.currentPlayer = Player.TWO if player == Player.ONE else Player.ONE
# Check for winner (simplified - just check if opponent has no pieces)
self.check_winner(game_state)
def check_winner(self, game_state: GameState):
"""Check if there's a winner"""
player_one_pieces = 0
player_two_pieces = 0
for row in game_state.board:
for piece in row:
if piece:
if piece.player == Player.ONE:
player_one_pieces += 1
else:
player_two_pieces += 1
if player_one_pieces == 0:
game_state.winner = Player.TWO
elif player_two_pieces == 0:
game_state.winner = Player.ONE
async def broadcast_game_update(self, room_id: str):
"""Broadcast game state to all connected players"""
if room_id in self.game_connections:
game_state = self.games.get(room_id)
if game_state:
message = {
"type": "GAME_UPDATE",
"payload": game_state.dict()
}
disconnected = []
for player, websocket in self.game_connections[room_id].items():
try:
await websocket.send_text(json.dumps(message))
except:
disconnected.append(player)
# Clean up disconnected players
for player in disconnected:
del self.game_connections[room_id][player]
game_manager = GameManager()
# REST API Endpoints
@app.post("/api/matchmaking/find")
async def find_match(request: MatchmakingRequest = MatchmakingRequest()):
"""Find a match for multiplayer game"""
search_id = str(uuid.uuid4())
player_id = str(uuid.uuid4())
game_manager.search_sessions[search_id] = player_id
game_manager.player_searches[player_id] = search_id
if len(game_manager.matchmaking_queue) > 0:
# Match found with waiting player (this is the second player)
player1_player_id = game_manager.matchmaking_queue.pop(0) # First player from queue
room_id = str(uuid.uuid4())
# Create new game
game_manager.create_game(room_id)
# Assign roles
player1_role = Player.ONE # First player gets Player.ONE
player2_role = Player.TWO # Second player (current caller) gets Player.TWO
# Get the first player's search ID
player1_search_id = game_manager.player_searches.get(player1_player_id)
# Store match details for first player to be retrieved via polling
if player1_search_id:
game_manager.pending_matches[player1_search_id] = {
"roomId": room_id,
"assignedPlayer": player1_role.value
}
# Clean up first player's search sessions
del game_manager.search_sessions[player1_search_id]
del game_manager.player_searches[player1_player_id]
# Clean up second player's search sessions
del game_manager.search_sessions[search_id]
del game_manager.player_searches[player_id]
# Return match details immediately to second player
return {
"status": "matched",
"roomId": room_id,
"assignedPlayer": player2_role.value
}
else:
# Add to queue (this is the first player)
game_manager.matchmaking_queue.append(player_id)
return {
"status": "searching",
"searchId": search_id
}
@app.get("/api/matchmaking/check/{search_id}")
async def check_match_status(search_id: str):
"""Check if a match has been found for a specific search ID"""
# Check if match details are available for this search ID
if search_id in game_manager.pending_matches:
# Retrieve match details
match_details = game_manager.pending_matches[search_id]
# Remove the entry as it's a one-time fetch
del game_manager.pending_matches[search_id]
return {
"status": "matched",
"roomId": match_details["roomId"],
"assignedPlayer": match_details["assignedPlayer"]
}
else:
# Check if search is still valid/pending
if search_id in game_manager.search_sessions:
return {"status": "searching"}
else:
return {"status": "not_found"}
@app.post("/api/matchmaking/cancel")
async def cancel_matchmaking(request: MatchmakingCancel):
"""Cancel matchmaking search"""
search_id = request.searchId
if search_id in game_manager.search_sessions:
player_id = game_manager.search_sessions[search_id]
# Remove from queue if present
if player_id in game_manager.matchmaking_queue:
game_manager.matchmaking_queue.remove(player_id)
# Clean up search sessions
del game_manager.search_sessions[search_id]
if player_id in game_manager.player_searches:
del game_manager.player_searches[player_id]
return {"status": "cancelled"}
else:
raise HTTPException(status_code=404, detail="Search not found")
@app.post("/api/game/{room_id}/move")
async def make_move(room_id: str, move_request: GameMove):
"""Submit a player move"""
if room_id not in game_manager.games:
raise HTTPException(status_code=404, detail="Game room not found")
game_state = game_manager.games[room_id]
if not game_manager.is_valid_move(game_state, move_request.move, move_request.player):
raise HTTPException(status_code=400, detail="Invalid move")
# Apply the move
game_manager.apply_move(game_state, move_request.move, move_request.player)
# Broadcast update to connected players
await game_manager.broadcast_game_update(room_id)
return {"status": "success"}
@app.get("/api/game/{room_id}/state")
async def get_game_state(room_id: str, player: int):
"""Get current game state (polling method - WebSockets recommended)"""
if room_id not in game_manager.games:
raise HTTPException(status_code=404, detail="Game not found")
game_state = game_manager.games[room_id]
return game_state.dict()
@app.post("/api/game/{room_id}/leave")
async def leave_game(room_id: str, request: LeaveGameRequest):
"""Leave the game"""
if room_id not in game_manager.games:
raise HTTPException(status_code=404, detail="Game room not found")
game_state = game_manager.games[room_id]
# Set opponent as winner
game_state.winner = Player.TWO if request.player == Player.ONE else Player.ONE
game_state.opponentDisconnected = True
# Broadcast update
await game_manager.broadcast_game_update(room_id)
# Notify remaining player
if room_id in game_manager.game_connections:
for player, websocket in game_manager.game_connections[room_id].items():
if player != request.player:
try:
await websocket.send_text(json.dumps({
"type": "OPPONENT_LEFT"
}))
except:
pass
return {"status": "opponent_won_by_forfeit"}
# WebSocket Endpoints
@app.websocket("/ws/game/{room_id}/{player_id}")
async def websocket_game_endpoint(websocket: WebSocket, room_id: str, player_id: int):
"""WebSocket connection for real-time game updates"""
await websocket.accept()
player = Player(player_id)
# Add to connections
if room_id not in game_manager.game_connections:
game_manager.game_connections[room_id] = {}
game_manager.game_connections[room_id][player] = websocket
# Send initial game state
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
await websocket.send_text(json.dumps({
"type": "GAME_UPDATE",
"payload": game_state.dict()
}))
try:
while True:
# Receive message from client
data = await websocket.receive_text()
message = json.loads(data)
if message["type"] == "MAKE_MOVE":
try:
move_data = message["payload"]["move"]
move = Move(
from_pos=move_data["from_pos"],
to=move_data["to"],
isJump=move_data.get("isJump", False),
jumpedPieceCoords=move_data.get("jumpedPieceCoords")
)
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
if game_manager.is_valid_move(game_state, move, player):
game_manager.apply_move(game_state, move, player)
await game_manager.broadcast_game_update(room_id)
else:
await websocket.send_text(json.dumps({
"type": "ERROR",
"payload": {"message": "Invalid move"}
}))
except Exception as e:
await websocket.send_text(json.dumps({
"type": "ERROR",
"payload": {"message": f"Move processing error: {str(e)}"}
}))
elif message["type"] == "LEAVE_MATCH":
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
game_state.winner = Player.TWO if player == Player.ONE else Player.ONE
game_state.opponentDisconnected = True
await game_manager.broadcast_game_update(room_id)
break
except WebSocketDisconnect:
# Handle disconnect
if room_id in game_manager.game_connections and player in game_manager.game_connections[room_id]:
del game_manager.game_connections[room_id][player]
# Notify opponent of disconnect
if room_id in game_manager.games:
game_state = game_manager.games[room_id]
game_state.opponentDisconnected = True
await game_manager.broadcast_game_update(room_id)
# Health check endpoint
@app.get("/health")
async def health_check():
return {"status": "healthy", "timestamp": datetime.now().isoformat()}
# Root endpoint
@app.get("/")
async def root():
return {"message": "Multiplayer Checkers Backend API", "version": "1.0.0"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)