import base64 import numpy as np import cv2 import json import pickle import itertools import requests from io import StringIO from io import BytesIO import re, os from PIL import Image from datetime import datetime import random import torch import socket r_b64_prefix = r'^data:image/.+;base64,' from PIL import ImageOps import oss2 class OSS2: def __init__(self, bucket): self.bucket = bucket def get_download(self, object_name, download_name=None, minute=5): """ 获取文件下载地址 :param object_name: OSS 路径 :param download_name: 下载重命名 :param minute: 请求超时时间 :return: 授权URL """ params = None if download_name: params = {"response-content-disposition": 'attachment;filename="{0}"'.format(quote(download_name, 'utf-8'))} url = self.bucket.sign_url('GET', object_name, minute * 60, params=params) return url def _pil_image_push_ossdir(bucket, image_pil, object_key, quality=85, format='jpeg'): import io print_with_datetime('upload object_key {}'.format(object_key)) oss_path = None for _ in range(5): try: img_byte_arr = io.BytesIO() image_pil.save(img_byte_arr, format=format, quality=quality) # 根据实际图片格式选择保存格式 img_byte_arr.seek(0) # 回到开始位置 bucket.put_object(object_key, img_byte_arr) oss_path = object_key return oss_path except Exception as e: print_with_datetime('error, upload oss failed, {}'.format(str(e))) return oss_path def resize_keep_hw_rate(image, tar_res): w, h = image.size if w > h: tar_w = tar_res tar_h = int(tar_w * h / w) elif h > w: tar_h = tar_res tar_w = int(tar_h * w / h) else: tar_w = tar_res tar_h = tar_res image = image.resize((tar_w, tar_h), Image.LANCZOS) return image def decode_img_from_url(url): import requests from io import BytesIO for _ in range(5): try: content = requests.get(url, stream=True, timeout=10).content img = Image.open(BytesIO(content)) img = ImageOps.exif_transpose(img) break except Exception as e: print_with_datetime(f"url decode error {url} {str(e)}") img = None return img def make_sub_file_folder(file_path): subdir = file_path.split(os.path.basename(file_path))[0] os.makedirs(subdir, exist_ok=True) def get_random_file_name(): timing_cur = datetime.now().timestamp() * 1000000 timing_cur = str(int(timing_cur)) + f'-{random.randint(0,99999999)}' return timing_cur def print_with_datetime(string): print(f'{datetime.now().strftime("%Y-%m-%d %H:%M:%S")}: {string}', flush=True) def b64_pil(base64Data, mode = 'RGB'): try: base64Data = re.sub(r_b64_prefix, '', base64Data) imgData = base64.b64decode(base64Data) image = Image.open(BytesIO(imgData)) except: image = None return image def pil_b64(pil_img, fmt='jpeg', quality=75): output_buffer = BytesIO() pil_img.save(output_buffer, format=fmt, quality=quality) byte_data = output_buffer.getvalue() base64_str = base64.b64encode(byte_data).decode('utf-8') if fmt == 'jpeg': fmt_out = 'jpg' else: fmt_out = fmt return f'data:image/{fmt_out};base64,' + base64_str def get_ip(): hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) return ip_address