qccamapi / apis /s3.py
patharanor's picture
fix: request body does not match the expected schema
e76ee44
raw
history blame
2.08 kB
import os
import boto3
from dotenv import load_dotenv
load_dotenv()
class S3:
def __init__(self) -> None:
self.aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID', '')
self.aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY', '')
self.aws_default_region = os.getenv('AWS_DEFAULT_REGION', '')
self.client = boto3.client(
service_name='s3',
region_name=self.aws_default_region,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key
)
# Function to list images in a specific S3 bucket
def list_images_in_bucket(self, bucket_name):
response = self.client.list_objects_v2(Bucket=bucket_name)
return [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].lower().endswith(('.png', '.jpg', '.jpeg'))]
# Function to get image URLs
def get_image_urls(self, bucket_name, image_keys):
return [self.client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': key}) for key in image_keys]
# Function to upload file to S3
def upload_file(self, bucket_name, file, name):
return self.client.upload_fileobj(file, bucket_name, name)
# Function to delete a file from S3
def delete_image(self, bucket_name, file_key):
if isinstance(file_key, list):
for k in file_key:
print(f'Removing "{k}"...')
self.client.delete_object(Bucket=bucket_name, Key=k)
else:
self.client.delete_object(Bucket=bucket_name, Key=file_key)
# Function to list images in a specific S3 bucket
def list_files_in_bucket(self, bucket_name):
response = self.client.list_objects_v2(Bucket=bucket_name)
return [obj['Key'] for obj in response.get('Contents', [])]
# Function to get image URLs
def get_files_urls(self, bucket_name, file_keys):
return [self.client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': key}) for key in file_keys]