diff --git a/coscmd/cos_client.py b/coscmd/cos_client.py index 712396f..996c60c 100644 --- a/coscmd/cos_client.py +++ b/coscmd/cos_client.py @@ -24,6 +24,8 @@ from wsgiref.handlers import format_date_time import qcloud_cos +from coscmd.cos_rename import CosMoveConfig + if sys.version > '3': from coscmd.cos_global import Version from coscmd.cos_auth import CosS3Auth @@ -1466,7 +1468,7 @@ def single_download(self, cos_path, local_path, _http_headers='{}', **kwargs): http_headers = yaml.safe_load(http_headers) http_headers = mapped(http_headers) except Exception as e: - logger.warn("Http_haeder parse error.") + logger.warn("Http_header parse error.") logger.warn(to_unicode(e)) return -1 try: @@ -1571,6 +1573,48 @@ def download_file(self, cos_path, local_path, _http_headers='{}', **kwargs): except Exception as e: logger.warn(to_unicode(e)) + def move_file(self, source_path, dst_path, _http_headers='{}', **kwargs): + move_source = {} + try: + _source_path = source_path.split('/') + source_tmp_path = _source_path[0].split('.') + source_key = '/'.join(_source_path[1:]) + move_source['Bucket'] = source_tmp_path[0] + if len(source_tmp_path) == 5 and source_tmp_path[1] == 'cos': + source_region = source_tmp_path[2] + elif len(source_tmp_path) == 4: + source_region = source_tmp_path[1] + else: + raise Exception("Parse Region Error") + move_source['Key'] = source_key + move_source['RawPath'] = move_source['Bucket'] + ".cos." + \ + source_region + \ + ".myqcloud.com/" + move_source['Key'] + logger.info(u"move log move_source={move_source}".format(move_source=move_source)) + source_path = '/' + '/'.join(_source_path[1:]) + url = self._conf.uri(quote(to_printable_str(dst_path)) + "?rename") + try: + _http_headers = yaml.safe_load(_http_headers) + except Exception as e: + logger.warn("Http_haeder parse error.") + logger.warn(to_unicode(e)) + return -1 + logger.info(u"Move cos://{source_bucket}/{source_path} => cos://{dst_bucket}/{dst_path}".format( + source_bucket=source_tmp_path[0], + source_path=source_path, + dst_bucket=source_tmp_path[0], + dst_path=dst_path + )) + if not CosMoveConfig.move_object(self, source_path, url, _http_headers): + return 0 + else: + return -1 + except Exception as e: + logger.warn(to_unicode(e)) + logger.warn(u"MoveSource is invalid: {movesource}".format( + movesource=source_path)) + return -1 + def restore_folder(self, cos_path, **kwargs): self._inner_threadpool = SimpleThreadPool(self._conf._max_thread) _success_num = 0 @@ -1759,11 +1803,16 @@ def get_object_acl(self, cos_path): return False return False - def create_bucket(self): + def create_bucket(self, ofs): url = self._conf.uri(path='') self._have_finished = 0 try: - rt = self._session.put(url=url, auth=CosS3Auth(self._conf)) + data = ''' + + {ofs} + +'''.format(ofs=ofs) + rt = self._session.put(url=url, auth=CosS3Auth(self._conf), data=data) logger.debug(u"put resp, status code: {code}, headers: {headers}, text: {text}".format( code=rt.status_code, headers=rt.headers, @@ -1775,6 +1824,7 @@ def create_bucket(self): else: logger.warn(response_info(rt)) return False + self._session.get() except Exception as e: logger.warn(str(e)) return False @@ -2059,3 +2109,4 @@ def op_int(self): if __name__ == "__main__": pass + diff --git a/coscmd/cos_cmd.py b/coscmd/cos_cmd.py index d09486a..9625433 100644 --- a/coscmd/cos_cmd.py +++ b/coscmd/cos_cmd.py @@ -380,6 +380,8 @@ def move(args): client = CosS3Client(conf) Interface = client.op_int() _, args.cos_path = concat_path(args.source_path, args.cos_path) + logger.info(u"====move log source_path={source_path}, cos_path={cos_path}".format( + source_path=args.source_path, cos_path=args.cos_path)) while args.cos_path.startswith('/'): args.cos_path = args.cos_path[1:] if not isinstance(args.source_path, text_type): @@ -397,20 +399,32 @@ def move(args): kwargs['delete'] = False kwargs['move'] = True if args.recursive: - _, args.cos_path = concat_path(args.source_path, args.cos_path) - if args.cos_path.endswith('/') is False: - args.cos_path += '/' if args.cos_path.startswith('/'): args.cos_path = args.cos_path[1:] - if not Interface.copy_folder(args.source_path, args.cos_path, args.headers, **kwargs): - return 0 + if args.ofs: + #融合桶 头部添加 + if not Interface.move_file(args.source_path, args.cos_path, args.headers, **kwargs): + return 0 + else: + return 1 else: - return 1 + if args.cos_path.endswith('/') is False: + args.cos_path += '/' + if not Interface.copy_folder(args.source_path, args.cos_path, args.headers, **kwargs): + return 0 + else: + return 1 else: - if not Interface.copy_file(args.source_path, args.cos_path, args.headers, **kwargs): - return 0 + if args.ofs: + if not Interface.move_file(args.source_path, args.cos_path, args.headers, **kwargs): + return 0 + else: + return 1 else: - return -1 + if not Interface.copy_file(args.source_path, args.cos_path, args.headers, **kwargs): + return 0 + else: + return -1 except Exception as e: logger.warn(e) return -2 @@ -588,8 +602,12 @@ def create_bucket(args): try: conf = load_conf() client = CosS3Client(conf) + if args.ofs: + ofs = "OFS" + else: + ofs = "" Interface = client.op_int() - if Interface.create_bucket(): + if Interface.create_bucket(ofs): return 0 else: logger.warn("Create bucket fail") @@ -725,6 +743,7 @@ def command_thread(): desc = """an easy-to-use but powerful command-line tool. try \'coscmd -h\' to get more informations. try \'coscmd sub-command -h\' to learn all command usage, likes \'coscmd upload -h\'""" + #ArgumentParser命令行解析的主要入口点,add_argument()方法为解析器填充可选参数和位置参数的动作 parser = ArgumentParser(description=desc) parser.add_argument('-d', '--debug', help="Debug mode", action="store_true", default=False) parser.add_argument('-s', '--silence', help="Silence mode", action="store_true", default=False) @@ -735,13 +754,16 @@ def command_thread(): parser.add_argument('--log_size', help='specify max log size in MB (default 1MB)', type=int, default=128) parser.add_argument('--log_backup_count', help='specify log backup num', type=int, default=1) + #add_subparsers()方法去创建子命令 sub_parser = parser.add_subparsers() + #sub_parser.add_parser()添加子命令 parser_config = sub_parser.add_parser("config", help="Config your information at first") parser_config.add_argument('-a', '--secret_id', help='Specify your secret id', type=str, required=True) parser_config.add_argument('-s', '--secret_key', help='Specify your secret key', type=str, required=True) parser_config.add_argument('-t', '--token', help='Set x-cos-security-token header', type=str, default="") parser_config.add_argument('-b', '--bucket', help='Specify your bucket', type=str, required=True) + #add_mutually_exclusive_gruop()方法也接受一个required参数,表示在互斥组中至少有一个参数是需要的 group = parser_config.add_mutually_exclusive_group(required=True) group.add_argument('-r', '--region', help='Specify your region', type=str) group.add_argument('-e', '--endpoint', help='Specify COS endpoint', type=str) @@ -820,6 +842,7 @@ def command_thread(): parser_move.add_argument('-H', '--headers', help="Specify HTTP headers", type=str, default='{}') parser_move.add_argument('-d', '--directive', help="if Overwrite headers", type=str, choices=['Copy', 'Replaced'], default="Copy") parser_move.add_argument('-r', '--recursive', help="Copy files recursively", action="store_true", default=False) + parser_move.add_argument('-o', "--ofs", help="Move bucket files", action="store_true", default=False) parser_move.add_argument('--include', help='Specify filter rules, separated by commas; Example: *.txt,*.docx,*.ppt', type=str, default="*") parser_move.add_argument('--ignore', help='Specify ignored rules, separated by commas; Example: *.txt,*.docx,*.ppt', type=str, default="") parser_move.set_defaults(func=Op.move) @@ -855,6 +878,7 @@ def command_thread(): parser_signurl.set_defaults(func=Op.signurl) parser_create_bucket = sub_parser.add_parser("createbucket", help='Create bucket') + parser_create_bucket.add_argument('-o', "--ofs", help="create ofs bucket", action="store_true", default=False) parser_create_bucket.set_defaults(func=Op.create_bucket) parser_delete_bucket = sub_parser.add_parser("deletebucket", help='Delete bucket') @@ -970,3 +994,5 @@ def _main(): _main() global res sys.exit(res) + + diff --git a/coscmd/cos_rename.py b/coscmd/cos_rename.py new file mode 100644 index 0000000..a9a95be --- /dev/null +++ b/coscmd/cos_rename.py @@ -0,0 +1,138 @@ +# -*- coding=utf-8 +import json +import threading + +from qcloud_cos import CosServiceError, CosClientError, CosS3Client +from qcloud_cos.cos_comm import check_object_content_length, get_content_md5, client_can_retry, format_values +from requests import Request, Timeout, __version__ + +from .cos_auth import CosS3Auth +from .cos_comm import * + +# python 3.10报错"module 'collections' has no attribute 'Iterable'",这里先规避 +if sys.version_info.major >= 3 and sys.version_info.minor >= 10: + import collections.abc + collections.Iterable = collections.abc.Iterable + +logger = logging.getLogger(__name__) + + +class CosMoveConfig(object): + __built_in_sessions = None # 内置的静态连接池,多个Client间共享使用 + + def __init__(self, conf, retry=1, session=None): + """初始化client对象 + + :param conf(CosConfig): 用户的配置. + :param retry(int): 失败重试的次数. + :param session(object): http session. + """ + self._conf = conf + self._retry = retry # 重试的次数,分片上传时可适当增大 + + if not CosMoveConfig.__built_in_sessions: + with threading.Lock(): + if not CosMoveConfig.__built_in_sessions: # 加锁后double check + CosMoveConfig.__built_in_sessions = self.generate_built_in_connection_pool( + self._conf._pool_connections, self._conf._pool_maxsize) + + if session is None: + self._session = CosMoveConfig.__built_in_sessions + else: + self._session = session + + def set_built_in_connection_pool_max_size(self, PoolConnections, PoolMaxSize): + """设置SDK内置的连接池的连接大小,并且重新绑定到client中""" + if not CosS3Client.__built_in_sessions: + return + + if CosS3Client.__built_in_sessions.get_adapter('http://')._pool_connections == PoolConnections \ + and CosS3Client.__built_in_sessions.get_adapter('http://')._pool_maxsize == PoolMaxSize: + return + + # 判断之前是否绑定到内置连接池 + rebound = False + if self._session and self._session is CosS3Client.__built_in_sessions: + rebound = True + + # 重新生成内置连接池 + CosS3Client.__built_in_sessions.close() + CosS3Client.__built_in_sessions = self.generate_built_in_connection_pool(PoolConnections, PoolMaxSize) + + # 重新绑定到内置连接池 + if rebound: + self._session = CosS3Client.__built_in_sessions + logger.info("rebound built-in connection pool success. maxsize=%d,%d" % (PoolConnections, PoolMaxSize)) + + def generate_built_in_connection_pool(self, PoolConnections, PoolMaxSize): + """生成SDK内置的连接池,此连接池是client间共用的""" + built_in_sessions = requests.session() + built_in_sessions.mount('http://', requests.adapters.HTTPAdapter(pool_connections=PoolConnections, pool_maxsize=PoolMaxSize)) + built_in_sessions.mount('https://', requests.adapters.HTTPAdapter(pool_connections=PoolConnections, pool_maxsize=PoolMaxSize)) + logger.info("generate built-in connection pool success. maxsize=%d,%d" % (PoolConnections, PoolMaxSize)) + return built_in_sessions + + def get_conf(self): + """获取配置""" + return self._conf + + def get_auth(self, Method, Bucket, Key, Expired=300, Headers={}, Params={}, SignHost=None): + """获取签名 + + :param Method(string): http method,如'PUT','GET'. + :param Bucket(string): 存储桶名称. + :param Key(string): 请求COS的路径. + :param Expired(int): 签名有效时间,单位为s. + :param headers(dict): 签名中的http headers. + :param params(dict): 签名中的http params. + :param SignHost(bool): 是否将host算入签名. + :return (string): 计算出的V5签名. + + .. code-block:: python + + config = CosConfig(Region=region, SecretId=secret_id, SecretKey=secret_key, Token=token) # 获取配置对象 + client = CosS3Client(config) + # 获取上传请求的签名 + auth_string = client.get_auth( + Method='PUT', + Bucket='bucket', + Key='test.txt', + Expired=600, + Headers={'header1': 'value1'}, + Params={'param1': 'value1'} + ) + print (auth_string) + """ + + # python中默认参数只会初始化一次,这里重新生成可变对象实例避免多线程访问问题 + if not Headers: + Headers = dict() + if not Params: + Params = dict() + + url = self._conf.uri(bucket=Bucket, path=Key) + r = Request(Method, url, headers=Headers, params=Params) + auth = CosS3Auth(self._conf, Key, Params, Expired, SignHost) + return auth(r).headers['Authorization'] + + def move_object(self, source_path, url, _http_headers='{}', EnableMD5=False): + http_headers = _http_headers + logger.info("put object, url=:{url} ,headers=:{headers}".format( + url=url, + headers=http_headers)) + if EnableMD5: + md5_str = get_content_md5(url) + if md5_str: + http_headers['Content-MD5'] = md5_str + http_headers['x-cos-rename-source'] = source_path + logger.info("put object, url=:{url} ,headers=:{headers}".format( + url=url, + headers=http_headers)) + rt = self._session.put(url=url, auth=CosS3Auth(self._conf), headers=http_headers, timeout=self._timeout) + if rt.status_code == 200: + return 0 + else: + raise Exception(response_info(rt)) + +if __name__ == "__main__": + pass \ No newline at end of file diff --git a/coscmd/test01.py b/coscmd/test01.py new file mode 100644 index 0000000..e69de29