petrel-oss-python-sdk2
/
petrel-sdk
/petrel-oss-python-sdk
/petrel_client
/ceph
/librgw
/rgw_client.py
import logging | |
from petrel_client.ceph.ceph import Ceph | |
from petrel_client.common.io_profile import profile | |
from petrel_client.common.exception import ObjectNotFoundError | |
from petrel_client.ceph.librgw import rgw | |
LOG = logging.getLogger(__name__) | |
class RGWClient(Ceph): | |
kb = 1024 | |
mb = 1024 * kb | |
def __init__(self, cluster, conf, *args, **kwargs): | |
LOG.debug('init RGWClient(%s)', cluster) | |
super(RGWClient, self).__init__(cluster, conf, *args, **kwargs) | |
conn_args = { | |
'conf': conf['conf'], | |
'keyring': conf['keyring'], | |
'name': conf['name'], | |
'cluster': conf['cluster'], | |
} | |
uid = conf.get('uid', 'user_id') | |
self.bucket_fs = {} | |
self._init_librgw(uid, conf['access_key'], | |
conf['secret_key'], conn_args) | |
def _init_librgw(self, uid=None, key=None, secret=None, connection_kwargs=None): | |
try: | |
self.client = rgw.LibRGWFS(uid, key, secret, **connection_kwargs) | |
self.root_fs = self.client.mount() | |
LOG.debug("The connection bulid successfully.") | |
except Exception as e: | |
LOG.error("The input parameters is invalid. %s", e) | |
raise Exception("The input parameters is invalid.", e) | |
def get(self, cluster, bucket, key, file_size=4 * mb, **kwargs): | |
try: | |
# destination_base_uri = S3Uri(filename) | |
# bucket = destination_base_uri.bucket() | |
# key = destination_base_uri.object() | |
bucket_fs = self.bucket_fs.get(bucket) | |
if not bucket_fs: | |
bucket_fs = self.client.opendir(self.root_fs, bucket) | |
self.bucket_fs[bucket] = bucket_fs | |
file_fs = self.client.open(bucket_fs, key) | |
value = self.client.read(file_fs, 0, file_size) | |
self.client.close(file_fs) | |
self.client.close(bucket_fs) | |
# log.debug('filename is: {}'.format(key)) | |
# log.debug('value size is: {} kB'.format(len(value) / self.kb)) | |
return value | |
except rgw.ObjectNotFound as err: | |
raise ObjectNotFoundError(err) | |
def put(self, cluster, bucket, key, body, **kwargs): | |
# destination_base_uri = S3Uri(filename) | |
# bucket = destination_base_uri.bucket() | |
# key = destination_base_uri.object() | |
bucket_fs = self.client.opendir(self.root_fs, bucket) | |
try: | |
file_fs = self.client.create(bucket_fs, key) | |
except rgw.ObjectExists: | |
file_fs = self.client.open(bucket_fs, key) | |
self.client.write(file_fs, 0, body) | |
self.client.close(file_fs) | |
self.client.close(bucket_fs) | |
# log.debug('filename is: {}'.format(key)) | |
# log.debug('value size is: {} kB'.format(len(body) / self.kb)) | |
return True | |