import re from io import BytesIO import requests import tweepy import os from dotenv import load_dotenv load_dotenv() LINKEDIN_CLIENT_ID = os.getenv("LINKEDIN_CLIENT_ID") LINKEDIN_CLIENT_SECRET = os.getenv("LINKEDIN_CLIENT_SECRET") TWITTER_CLIENT_ID = os.getenv("TWITTER_CLIENT_ID") TWITTER_CLIENT_SECRET = os.getenv("TWITTER_CLIENT_SECRET") def extract_image_url(entry): """Extract an image URL from an RSS feed entry.""" for enclosure in entry.get('enclosures', []): if enclosure.get('type', '').startswith('image/'): return enclosure.get('url') for media in entry.get('media_content', []): if media.get('type', '').startswith('image/'): return media.get('url') for thumbnail in entry.get('media_thumbnail', []): if thumbnail.get('url'): return thumbnail.get('url') if 'image' in entry: image = entry['image'] if isinstance(image, dict) and 'url' in image: return image['url'] elif isinstance(image, list): for img in image: if 'url' in img: return img['url'] if 'itunes_image' in entry: return entry['itunes_image'].get('href') for field in ['description', 'summary', 'content']: if field in entry: content = entry[field] if isinstance(content, list): content = content[0].get('value', '') elif isinstance(content, dict): content = content.get('value', '') else: content = str(content) match = re.search(r']+src=["\'](.*?)["\']', content, re.I) if match: return match.group(1) return None def post_to_linkedin(post): """Post content to LinkedIn with optional image.""" if post['status'] not in ['pending', 'posting']: return access_token = post['access_token'] print("linkedin_access_token",access_token) linkedin_id = post['linkedin_id'] image_url = post.get('image_url') headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json', } if image_url: response = requests.get(image_url, timeout=10) if response.status_code == 200: image_content = response.content register_url = 'https://api.linkedin.com/v2/assets?action=registerUpload' register_body = { 'registerUploadRequest': { 'recipes': ['urn:li:digitalmediaRecipe:feedshare-image'], 'owner': f'urn:li:person:{linkedin_id}', 'serviceRelationships': [ {'relationshipType': 'OWNER', 'identifier': 'urn:li:userGeneratedContent'} ] } } register_response = requests.post(register_url, headers=headers, json=register_body) if register_response.status_code == 200: upload_data = register_response.json()['value'] upload_url = upload_data['uploadMechanism']['com.linkedin.digitalmedia.uploading.MediaUploadHttpRequest']['uploadUrl'] asset = upload_data['asset'] upload_headers = {'Authorization': f'Bearer {access_token}'} upload_response = requests.put(upload_url, headers=upload_headers, data=image_content) if upload_response.status_code == 201: api_url = 'https://api.linkedin.com/v2/ugcPosts' post_body = { 'author': f'urn:li:person:{linkedin_id}', 'lifecycleState': 'PUBLISHED', 'specificContent': { 'com.linkedin.ugc.ShareContent': { 'shareCommentary': {'text': post['text']}, 'shareMediaCategory': 'IMAGE', 'media': [{'status': 'READY', 'media': asset}] } }, 'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'} } response = requests.post(api_url, headers=headers, json=post_body) post['status'] = 'posted' if response.status_code == 201 else 'failed' print(f"LinkedIn post attempt: {response.status_code} - {response.text}") else: print(f"Image upload failed: {upload_response.status_code}") else: print(f"Upload registration failed: {register_response.status_code}") else: print(f"Image download failed: {response.status_code}") if post['status'] != 'posted': api_url = 'https://api.linkedin.com/v2/ugcPosts' post_body = { 'author': f'urn:li:person:{linkedin_id}', 'lifecycleState': 'PUBLISHED', 'specificContent': { 'com.linkedin.ugc.ShareContent': { 'shareCommentary': {'text': post['text']}, 'shareMediaCategory': 'NONE' } }, 'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'} } response = requests.post(api_url, headers=headers, json=post_body) post['status'] = 'posted' if response.status_code == 201 else 'failed' print(f"LinkedIn text-only post: {response.status_code} - {response.text}") else: api_url = 'https://api.linkedin.com/v2/ugcPosts' post_body = { 'author': f'urn:li:person:{linkedin_id}', 'lifecycleState': 'PUBLISHED', 'specificContent': { 'com.linkedin.ugc.ShareContent': { 'shareCommentary': {'text': post['text']}, 'shareMediaCategory': 'NONE' } }, 'visibility': {'com.linkedin.ugc.MemberNetworkVisibility': 'PUBLIC'} } response = requests.post(api_url, headers=headers, json=post_body) post['status'] = 'posted' if response.status_code == 201 else 'failed' print(f"LinkedIn post attempt: {response.status_code} - {response.text}") def post_to_twitter(post): """Post content to Twitter with optional image.""" if post['status'] not in ['pending', 'posting']: return client = tweepy.Client( consumer_key=TWITTER_CLIENT_ID, consumer_secret=TWITTER_CLIENT_SECRET, access_token=post['access_token'], access_token_secret=post['access_token_secret'] ) print("access_token_secret",client.access_token_secret) image_url = post.get('image_url') if image_url: response = requests.get(image_url, timeout=10) if response.status_code == 200: image_content = BytesIO(response.content) try: api = tweepy.API(tweepy.OAuth1UserHandler( TWITTER_CLIENT_ID, TWITTER_CLIENT_SECRET, post['access_token'], post['access_token_secret'] )) media = api.media_upload(filename='image', file=image_content) client.create_tweet(text=post['text'], media_ids=[media.media_id]) post['status'] = 'posted' print("Twitter post with image successful") except tweepy.TweepyException as e: print(f"Twitter image post error: {e}") try: client.create_tweet(text=post['text']) post['status'] = 'posted' print("Twitter text-only post successful") except tweepy.TweepyException as e: post['status'] = 'failed' print(f"Twitter text-only error: {e}") except Exception as e: print(f"Media upload error: {e}") else: print(f"Image download failed: {response.status_code}") try: client.create_tweet(text=post['text']) post['status'] = 'posted' print("Twitter text-only post successful") except tweepy.TweepyException as e: post['status'] = 'failed' print(f"Twitter text-only error: {e}") else: try: client.create_tweet(text=post['text']) post['status'] = 'posted' print("Twitter post successful") except tweepy.TweepyException as e: post['status'] = 'failed' print(f"Twitter error: {e}")