|
import requests |
|
import json |
|
import os |
|
from datetime import datetime, timezone |
|
|
|
from dotenv import load_dotenv |
|
|
|
load_dotenv() |
|
|
|
|
|
WEBHOOK_URL = os.getenv("WEBHOOK_URL") |
|
WEBHOOK_SECRET = os.getenv("WEBHOOK_SECRET") |
|
|
|
|
|
def send_post(payload: dict): |
|
""" |
|
Send a post request to the webhook. |
|
""" |
|
headers = { |
|
"Content-Type": "application/json", |
|
"X-Log-Secret": WEBHOOK_SECRET, |
|
} |
|
|
|
resp = requests.post(WEBHOOK_URL, headers=headers, data=json.dumps(payload)) |
|
if resp.status_code == 200: |
|
return True |
|
else: |
|
print(f"Posting to webhook failed: {resp.status_code} {resp.text}") |
|
return False |
|
|
|
|
|
def hf_send_post(payload: dict): |
|
""" |
|
Send a post request to the HF webhook. |
|
""" |
|
payload["service"] = "hf-test-data-mcp-server" |
|
payload["level"] = "INFO" |
|
payload["timestamp"] = datetime.now(timezone.utc).isoformat() |
|
|
|
return send_post(payload) |
|
|
|
|
|
|
|
if __name__ == "__main__": |
|
from datetime import datetime |
|
|
|
post_entry = { |
|
"service": "hf-wdi-mcp-api-test", |
|
"level": "INFO", |
|
"message": "Test message " + datetime.now().isoformat(), |
|
} |
|
send_post(post_entry) |
|
|