Spaces:
Sleeping
Sleeping
import boto3 | |
import botocore | |
import gradio as gr | |
import os | |
from dotenv import load_dotenv | |
import pandas as pd | |
load_dotenv() | |
ACCOUNT_ID = os.getenv('AWS_ACCOUNT_ID', '') | |
AWS_ACCESS_KEY_ID = os.getenv('AWS_ACCESS_KEY_ID', '') | |
AWS_SECRET_ACCESS_KEY = os.getenv('AWS_SECRET_ACCESS_KEY', '') | |
AWS_DEFAULT_REGION = os.getenv('AWS_DEFAULT_REGION', '') | |
AWS_S3_BUCKET_NAME = os.getenv('AWS_S3_BUCKET_NAME', '') | |
is_dev = os.environ.get('ENV', 'DEV') != 'PROD' | |
job_no = '' | |
selected_file_index = 0 | |
s3_client = boto3.client( | |
service_name='s3', | |
region_name=AWS_DEFAULT_REGION, | |
aws_access_key_id=AWS_ACCESS_KEY_ID, | |
aws_secret_access_key=AWS_SECRET_ACCESS_KEY | |
) | |
def auth(username, password): | |
u = os.environ.get('USERNAME') | |
p = os.environ.get('PASSWORD') | |
return (username == u and password == p) | |
def set_key(key, prefix=''): | |
return f'{prefix}/{key}' if len(prefix) > 0 else key | |
# Function to list files in S3 bucket | |
def list_object_in_bucket(bucket_name, prefix=''): | |
response = s3_client.list_objects_v2(Bucket=bucket_name, Prefix=prefix) | |
return [obj['Key'] for obj in response.get('Contents', [])] | |
# Function to list images in a specific S3 bucket | |
def list_images_in_bucket(bucket_name): | |
response = s3_client.list_objects_v2(Bucket=bucket_name) | |
return [obj['Key'] for obj in response.get('Contents', []) if obj['Key'].lower().endswith(('.png', '.jpg', '.jpeg'))] | |
# Function to get image URLs | |
def get_image_urls(bucket_name, image_keys): | |
return [s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': key}) for key in image_keys] | |
# Function to upload files to S3 | |
def upload_files(files, progress=gr.Progress()): | |
global job_no | |
req_id = os.environ.get('USERNAME') | |
try: | |
for file in progress.tqdm(files, desc="Uploading..."): | |
file_name = os.path.basename(file.name) | |
with open(file.name, 'rb') as f: | |
print(f' - Uploading "{file_name}" to "{job_no}"...') | |
# s3_client.upload_fileobj(f, AWS_S3_BUCKET_NAME, file_name) | |
key = set_key(file_name, job_no) | |
s3_client.put_object( | |
Body=f, | |
Bucket=AWS_S3_BUCKET_NAME, | |
Key=key, | |
Metadata={ | |
'x-request-id': req_id, | |
}) | |
print('Done') | |
except botocore.exceptions.ClientError as error: | |
if error.response['Error']['Code'] == 'LimitExceededException': | |
print('API call limit exceeded; backing off and retrying...') | |
else: | |
print(error) | |
except botocore.exceptions.ParamValidationError as error: | |
print('The parameters you provided are incorrect: {}'.format(error)) | |
return display_images(job_no) | |
# Function to fetch and display images | |
def display_images(job_no): | |
# image_keys = list_images_in_bucket(AWS_S3_BUCKET_NAME) | |
# image_urls = get_image_urls(AWS_S3_BUCKET_NAME, image_keys) | |
# # print(image_keys) | |
# # print(image_urls) | |
# return None, image_urls, image_keys | |
return None, \ | |
pd.DataFrame({ | |
"keys": list_object_in_bucket(AWS_S3_BUCKET_NAME, job_no) | |
}) | |
# Function to delete a file from S3 | |
def delete_image(file_key, search_fpath): | |
try: | |
if isinstance(file_key, list): | |
for k in file_key: | |
print(f'Removing "{k}" from list...') | |
s3_client.delete_object(Bucket=AWS_S3_BUCKET_NAME, Key=k) | |
else: | |
print(f'Removing "{file_key}"...') | |
s3_client.delete_object(Bucket=AWS_S3_BUCKET_NAME, Key=file_key) | |
except Exception as e: | |
print(f'Error delete_image: {str(e)}') | |
image_keys = list_object_in_bucket(AWS_S3_BUCKET_NAME, search_fpath) | |
if len(image_keys) == 0: | |
image_keys = [] | |
return pd.DataFrame({ "keys": image_keys }), image_keys | |
def update_dropdown_image(search_fpath, evt: gr.SelectData): | |
# print(f"You selected {evt.value} at {evt.index} from {evt.target}") | |
# print(f"You selected {evt.value['image']['orig_name']}") | |
return set_key(evt.value['image']['orig_name'], search_fpath) | |
# --------------------------------------------------------------------------- | |
# Function to list images in a specific S3 bucket | |
def list_files_in_bucket(bucket_name): | |
response = s3_client.list_objects_v2(Bucket=bucket_name) | |
return [obj['Key'] for obj in response.get('Contents', [])] | |
# Function to get image URLs | |
def get_files_urls(bucket_name, file_keys): | |
return [s3_client.generate_presigned_url('get_object', Params={'Bucket': bucket_name, 'Key': key}) for key in file_keys] | |
def onchange_job_no(v): | |
global job_no | |
job_no = v | |
if len(v) > 0: | |
isinteractive = True | |
isempty = False | |
else: | |
isinteractive = False | |
isempty = True | |
return gr.Button("ยืนยัน job no.", interactive=isinteractive), \ | |
gr.File(label="Upload Images", file_count="multiple", height=400, interactive=(not isempty and not isinteractive)), \ | |
gr.Button("Upload Files", interactive=(not isempty and not isinteractive)) | |
def onclick_confirm_job_no(): | |
global job_no | |
return gr.Button("ยืนยัน job no.", interactive=False), \ | |
gr.File(label="Upload Images", file_count="multiple", height=400, interactive=True), \ | |
gr.Button("Upload Files", interactive=True), \ | |
job_no | |
def onselect_file(files, evt: gr.SelectData): | |
global selected_file_index | |
selected_file_index = evt.index[0] | |
fpath = files.iloc[selected_file_index] | |
file_urls = get_files_urls(AWS_S3_BUCKET_NAME, fpath) | |
return file_urls[0] | |
def onclick_search_by_key(prefix): | |
image_keys = list_object_in_bucket(AWS_S3_BUCKET_NAME, prefix) | |
gallery = get_image_urls(AWS_S3_BUCKET_NAME, image_keys) | |
return pd.DataFrame({ "keys": image_keys }), gallery | |
# Create a Gradio interface | |
with gr.Blocks() as demo: | |
gr.Markdown("# **Data collector**") | |
with gr.Tabs(): | |
with gr.Tab('Upload'): | |
with gr.Column(): | |
gr.Markdown("""## **1. กำหนด job no.** | |
เพื่อใช้ในการอ้างอิงหรือเชื่อมโยงกลุ่มของข้อมูลที่ทำการ upload ขึ้นมา | |
""") | |
job_no = gr.Textbox('', label='Job no.') | |
set_job_no_btn = gr.Button("ยืนยัน job no.", interactive=False) | |
with gr.Column(): | |
gr.Markdown("""## **2. Upload files** | |
1. โยนไฟล์ (drag/drop) ไปยัง ***ไฟล์ list ทาง panel ทางด้านซ้าย*** ได้มากกว่า 1 ไฟล์ | |
2. กด **Upload Files** | |
""") | |
file_uploader = gr.File(label="Upload Images", file_count="multiple", height=400, interactive=False) | |
upload_btn = gr.Button("Upload Files", interactive=False) | |
with gr.Tab('Preview'): | |
with gr.Row(): | |
with gr.Column(scale=2): | |
search_fpath = gr.Textbox('', label='Job no.') | |
search_btn = gr.Button("Search") | |
files_in_bucket = gr.Dataframe(label='Files', height=600) | |
with gr.Column(scale=4): | |
with gr.Tabs(): | |
with gr.Tab('Image'): | |
image = gr.Image() | |
with gr.Tab('Gallery'): | |
gr.Markdown(""" | |
กดที่ **Refresh Gallery** เพื่อดูข้อมูลของ job no. นั้นๆ | |
""") | |
img_gallery = gr.Gallery(label="Images from S3", columns=9, height=600, interactive=True) | |
gr.Markdown("""### ***Delete images*** | |
กรณีต้องการลบไฟล์ ท่านสามารถเลือกภาพที่สนใจ แล้วกด **Delete Selected Files** แต่ถ้าต้องการลบมากกว่า 1 file ให้ใส่ชื่อไฟล์ คั่นด้วย comma (,) | |
""") | |
img_selected_image = gr.Dropdown(choices=[], label="Select Image to Delete", allow_custom_value=True) | |
img_delete_btn = gr.Button("Delete Selected Files") | |
job_no.change(fn=onchange_job_no, inputs=[job_no], outputs=[set_job_no_btn, file_uploader, upload_btn]) | |
set_job_no_btn.click(fn=onclick_confirm_job_no, inputs=[], outputs=[set_job_no_btn, file_uploader, upload_btn, search_fpath]) | |
upload_btn.click(fn=upload_files, inputs=[file_uploader], outputs=[file_uploader, files_in_bucket])# img_gallery, img_selected_image]) | |
img_delete_btn.click(fn=delete_image, inputs=[img_selected_image, search_fpath], outputs=[img_gallery, img_selected_image]) | |
img_gallery.select(fn=update_dropdown_image, inputs=[search_fpath], outputs=img_selected_image) | |
search_btn.click(fn=onclick_search_by_key, inputs=[search_fpath], outputs=[files_in_bucket, img_gallery]) | |
files_in_bucket.select(fn=onselect_file, inputs=[files_in_bucket], outputs=[image]) | |
# Launch the Gradio app | |
demo.launch(auth=auth if not is_dev else None) |