以下为根据个人需求,参考OpenList让DeepSeek编写的文件上传python脚本
简要说明
包含的功能
- 创建文件夹
- 分片上传文件
- 分享文件夹
- 获取分享链接
- cookie存储在quark.cookie.txt文件中即可
基本变量说明:
- PARENT_DIR_ID:创建文件夹的父文件夹id,如果在根目录下创建文件夹,输入0
- FOLDER_NAME:要创建的文件夹名
- FILE_PATH:要上传的文件
- 上传频繁会被拉黑
脚本代码
import base64
import hashlib
import io
import json
import logging
import mimetypes
import os
import time
import requests
from datetime import datetime, timezone
from typing import Dict, List, Tuple, Optional, Callable
# 配置日志
log = logging.getLogger(__name__)
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
try:
from colorama import init, Fore, Style
init(autoreset=True)
COLOR_SUPPORT = True
except ImportError:
COLOR_SUPPORT = False
print("未安装colorama库,将显示无颜色输出。安装命令: pip install colorama")
class QuarkOrUC:
def __init__(self, cookie: str, parent_dir_id: str):
"""
初始化Quark网盘客户端
:param cookie: 完整的认证Cookie字符串
:param parent_dir_id: 默认父目录ID
"""
self.cookie = cookie
self.parent_dir_id = parent_dir_id
# 配置信息
self.config = {
"api": "https://drive.quark.cn/1/clouddrive",
"referer": "https://pan.quark.cn",
"origin": "https://pan.quark.cn",
"pr": "ucpro",
"ua": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/137.0.0.0 Safari/537.36"
}
def colored_print(self, message: str, color: str = "white") -> None:
"""带颜色打印消息"""
if not COLOR_SUPPORT:
print(message)
return
colors = {
"red": Fore.RED,
"green": Fore.GREEN,
"yellow": Fore.YELLOW,
"blue": Fore.BLUE,
"cyan": Fore.CYAN,
"white": Fore.WHITE,
}
print(colors.get(color, Fore.WHITE) + message + Style.RESET_ALL)
def request(self, pathname: str, method: str, data: Optional[Dict] = None,
query_params: Optional[Dict] = None) -> Tuple[Optional[Dict], Optional[Exception]]:
"""
发送API请求
:param pathname: API路径
:param method: HTTP方法 (GET/POST)
:param data: POST请求的数据
:param query_params: 额外的查询参数
:return: (响应数据, 错误对象)
"""
url = self.config['api'] + pathname
headers = {
"Cookie": self.cookie,
"Accept": "application/json, text/plain, */*",
"Referer": self.config['referer'],
"User-Agent": self.config['ua'],
"Origin": self.config['origin'],
"Priority": "u=1, i"
}
# 基础查询参数
params = {"pr": self.config['pr'], "fr": "pc"}
# 添加额外的查询参数
if query_params:
params.update(query_params)
try:
if method.upper() == "GET":
response = requests.get(url, headers=headers, params=params)
elif method.upper() == "POST":
headers["Content-Type"] = "application/json"
response = requests.post(url, headers=headers, params=params, json=data)
else:
raise ValueError(f"不支持的HTTP方法: {method}")
response.raise_for_status()
json_resp = response.json()
# 更新cookie
if '__puus' in response.cookies:
self.cookie = f"{self.cookie}; __puus={response.cookies['__puus']}"
# 检查错误响应
if json_resp.get('status', 200) >= 400 or json_resp.get('code', 0) != 0:
error_msg = json_resp.get('message', '未知错误')
log.error(f"API错误: {error_msg}")
return None, Exception(error_msg)
return json_resp, None
except requests.exceptions.RequestException as e:
log.error(f"请求失败: {str(e)}")
return None, e
except Exception as e:
log.error(f"处理响应时出错: {str(e)}")
return None, e
def create_folder(self, folder_name: str) -> Tuple[Optional[Dict], Optional[Exception]]:
"""
创建文件夹
:param folder_name: 文件夹名称
:return: (响应数据, 错误对象)
"""
data = {
"pdir_fid": self.parent_dir_id,
"file_name": folder_name,
"dir_path": "",
"dir_init_lock": False
}
return self.request("/file", "POST", data)
def create_share(self, fid_list: List[str], title: str,
expired_type: int = 1, url_type: int = 1) -> Tuple[Optional[Dict], Optional[Exception]]:
"""
创建分享任务
:param fid_list: 要分享的文件/文件夹ID列表
:param title: 分享标题
:param expired_type: 过期类型 (1: 永久)
:param url_type: URL类型 (1: 标准链接)
:return: (响应数据, 错误对象)
"""
data = {
"fid_list": fid_list,
"title": title,
"url_type": url_type,
"expired_type": expired_type
}
return self.request("/share", "POST", data)
def check_share_task(self, task_id: str) -> Tuple[Optional[Dict], Optional[Exception]]:
"""
检查分享任务状态
:param task_id: 分享任务ID
:return: (响应数据, 错误对象)
"""
query_params = {
"task_id": task_id,
"retry_index": 0
}
return self.request("/task", "GET", query_params=query_params)
def get_share_info(self, share_id: str) -> Tuple[Optional[Dict], Optional[Exception]]:
"""
获取分享链接信息
:param share_id: 分享ID
:return: (响应数据, 错误对象)
"""
data = {"share_id": share_id}
return self.request("/share/password", "POST", data)
def up_pre(self, file_name: str, mimetype: str, size: int, parent_id: str) -> Tuple[Optional[Dict], Optional[Exception]]:
"""
预上传请求
:param file_name: 文件名
:param mimetype: MIME类型
:param size: 文件大小
:param parent_id: 父目录ID
:return: (预上传响应, 错误对象)
"""
now = int(time.time() * 1000)
data = {
"ccp_hash_update": True,
"dir_name": '',
"file_name": file_name,
"format_type": mimetype,
"l_created_at": now,
"l_updated_at": now,
"pdir_fid": parent_id,
"size": size
}
return self.request("/file/upload/pre", "POST", data)
def up_hash(self, md5_hash: str, sha1_hash: str, task_id: str) -> Tuple[bool, Optional[Exception]]:
"""
提交文件哈希验证
:param md5_hash: MD5哈希值
:param sha1_hash: SHA1哈希值
:param task_id: 上传任务ID
:return: (是否已完成上传, 错误对象)
"""
data = {
"md5": md5_hash,
"sha1": sha1_hash,
"task_id": task_id
}
resp, err = self.request("/file/update/hash", "POST", data)
if err:
return False, err
return (resp.get('data', {}).get('finish', False), None)
def up_part(self, pre: Dict, mime_type: str, part_number: int, chunk_data: bytes) -> Tuple[Optional[str], Optional[Exception]]:
"""
上传文件分片
:param pre: 预上传响应数据
:param mime_type: MIME类型
:param part_number: 分片编号
:param chunk_data: 分片数据
:return: (ETag, 错误对象)
"""
# 使用 timezone-aware datetime
now = datetime.now(timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT')
auth_meta = f"PUT\n\n{mime_type}\n{now}\nx-oss-date:{now}\nx-oss-user-agent:aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit\n/{pre['data']['bucket']}/{pre['data']['obj_key']}?partNumber={part_number}&uploadId={pre['data']['upload_id']}"
auth_data = {
"auth_info": pre['data']['auth_info'],
"auth_meta": auth_meta,
"task_id": pre['data']['task_id']
}
# 获取上传授权
auth_resp, err = self.request("/file/upload/auth", "POST", auth_data)
if err:
return None, err
# 执行分块上传
upload_url = f"https://{pre['data']['bucket']}.{pre['data']['upload_url'][7:]}/{pre['data']['obj_key']}"
headers = {
"Authorization": auth_resp['data']['auth_key'],
"Content-Type": mime_type,
"Referer": "https://pan.quark.cn/",
"x-oss-date": now,
"x-oss-user-agent": "aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit"
}
params = {
"partNumber": str(part_number),
"uploadId": pre['data']['upload_id']
}
try:
response = requests.put(
upload_url,
headers=headers,
params=params,
data=chunk_data,
timeout=30
)
response.raise_for_status()
return response.headers.get('ETag'), None
except requests.exceptions.RequestException as e:
log.error(f"分片上传失败: {str(e)}")
return None, e
def up_commit(self, pre: Dict, etags: List[str]) -> Optional[Exception]:
"""
提交分片上传完成
:param pre: 预上传响应数据
:param etags: 所有分片的ETag列表
:return: 错误对象
"""
# 使用 timezone-aware datetime
now = datetime.now(timezone.utc).strftime('%a, %d %b %Y %H:%M:%S GMT')
# 构建XML body
xml_parts = []
for i, etag in enumerate(etags, 1):
xml_parts.append(f"<Part><PartNumber>{i}</PartNumber><ETag>{etag}</ETag></Part>")
xml_body = f"""<?xml version="1.0" encoding="UTF-8"?>
<CompleteMultipartUpload>
{"".join(xml_parts)}
</CompleteMultipartUpload>"""
# 计算Content-MD5
md5 = hashlib.md5()
md5.update(xml_body.encode('utf-8'))
content_md5 = base64.b64encode(md5.digest()).decode('utf-8')
# 处理回调数据
callback_json = json.dumps(pre['data']['callback'])
callback_b64 = base64.b64encode(callback_json.encode('utf-8')).decode('utf-8')
# 构建授权元数据
auth_meta = f"POST\n{content_md5}\napplication/xml\n{now}\nx-oss-callback:{callback_b64}\nx-oss-date:{now}\nx-oss-user-agent:aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit\n/{pre['data']['bucket']}/{pre['data']['obj_key']}?uploadId={pre['data']['upload_id']}"
auth_data = {
"auth_info": pre['data']['auth_info'],
"auth_meta": auth_meta,
"task_id": pre['data']['task_id']
}
# 获取提交授权
auth_resp, err = self.request("/file/upload/auth", "POST", auth_data)
if err:
return err
# 执行提交请求
upload_url = f"https://{pre['data']['bucket']}.{pre['data']['upload_url'][7:]}/{pre['data']['obj_key']}"
headers = {
"Authorization": auth_resp['data']['auth_key'],
"Content-MD5": content_md5,
"Content-Type": "application/xml",
"Referer": "https://pan.quark.cn/",
"x-oss-callback": callback_b64,
"x-oss-date": now,
"x-oss-user-agent": "aliyun-sdk-js/6.6.1 Chrome 98.0.4758.80 on Windows 10 64-bit"
}
params = {"uploadId": pre['data']['upload_id']}
try:
response = requests.post(
upload_url,
headers=headers,
params=params,
data=xml_body,
timeout=30
)
response.raise_for_status()
return None
except requests.exceptions.RequestException as e:
log.error(f"提交上传失败: {str(e)}")
return e
def up_finish(self, pre: Dict) -> Optional[Exception]:
"""
完成上传流程
:param pre: 预上传响应数据
:return: 错误对象
"""
data = {
"obj_key": pre['data']['obj_key'],
"task_id": pre['data']['task_id']
}
_, err = self.request("/file/upload/finish", "POST", data)
time.sleep(1) # 等待服务器处理
return err
def upload_file(self, file_name: str, file_data: bytes,
parent_dir_id: Optional[str] = None,
progress_callback: Optional[Callable] = None) -> Tuple[Optional[str], Optional[Exception]]:
"""
完整的文件上传流程
:param file_name: 文件名
:param file_data: 文件二进制数据
:param parent_dir_id: 父目录ID(默认为初始化时设置的目录)
:param progress_callback: 进度回调函数
:return: (文件ID, 错误对象) - 成功时返回文件ID,失败时返回错误
"""
# 使用默认父目录ID(如果未提供)
if parent_dir_id is None:
parent_dir_id = self.parent_dir_id
# 自动检测MIME类型
mime_type, _ = mimetypes.guess_type(file_name)
if not mime_type:
mime_type = "application/octet-stream"
file_size = len(file_data)
file_stream = io.BytesIO(file_data)
# 步骤1: 计算文件哈希
md5_hash = hashlib.md5()
sha1_hash = hashlib.sha1()
file_stream.seek(0)
# 计算哈希
while True:
chunk = file_stream.read(8192)
if not chunk:
break
md5_hash.update(chunk)
sha1_hash.update(chunk)
md5_hex = md5_hash.hexdigest()
sha1_hex = sha1_hash.hexdigest()
# 重置文件流指针
file_stream.seek(0)
# 步骤2: 预上传请求
pre_resp, err = self.up_pre(file_name, mime_type, file_size, parent_dir_id)
if err:
return None, f"预上传失败: {str(err)}"
# 步骤3: 提交哈希验证
finished, err = self.up_hash(md5_hex, sha1_hex, pre_resp['data']['task_id'])
if err:
return None, f"哈希验证失败: {str(err)}"
if finished:
self.colored_print("服务器已有相同文件,跳过上传", "yellow")
return None, "服务器已有相同文件"
# 步骤4: 分块上传
part_size = pre_resp['metadata']['part_size']
etags = []
part_number = 1
while True:
chunk = file_stream.read(part_size)
if not chunk:
break
etag, err = self.up_part(
pre_resp,
mime_type,
part_number,
chunk
)
if err:
return None, f"分片{part_number}上传失败: {str(err)}"
etags.append(etag)
part_number += 1
# 更新进度
if progress_callback:
progress = min(100, int(file_stream.tell() / file_size * 100))
progress_callback(progress)
# 步骤5: 提交上传
err = self.up_commit(pre_resp, etags)
if err:
return None, f"提交上传失败: {str(err)}"
# 步骤6: 完成上传
err = self.up_finish(pre_resp)
if err:
return None, f"完成上传失败: {str(err)}"
# 返回上传文件的ID
file_id = pre_resp.get('data', {}).get('fid')
if not file_id:
return None, "未能获取上传文件ID"
return file_id, None
def load_cookie_from_file(file_path: str) -> str:
"""从文件加载Cookie"""
try:
with open(file_path, "r") as f:
return f.read().strip()
except Exception as e:
raise Exception(f"加载Cookie文件失败: {str(e)}")
if __name__ == "__main__":
# 配置信息
COOKIE_FILE = "cookie.quark.txt"
PARENT_DIR_ID = "0"
FOLDER_NAME = "新建文件夹名"
FILE_PATH = "ce5.zip"
# 1. 加载Cookie
try:
cookie = load_cookie_from_file(COOKIE_FILE)
if not cookie:
raise Exception("Cookie文件为空")
except Exception as e:
print(f"❌ 错误: {str(e)}")
exit(1)
# 创建客户端实例
client = QuarkOrUC(cookie=cookie, parent_dir_id=PARENT_DIR_ID)
# 2. 创建文件夹
client.colored_print("\n🟡 正在创建文件夹...", "yellow")
create_resp, create_err = client.create_folder(FOLDER_NAME)
if create_err:
client.colored_print(f"❌ 未创建文件夹失败: {str(create_err)}", "red")
exit(1)
# 获取新创建的文件夹ID
new_folder_id = create_resp.get('data', {}).get('fid')
if not new_folder_id:
client.colored_print("❌ 未能获取新文件夹ID", "red")
exit(1)
client.colored_print(f"✅ 文件夹创建成功! ID: {new_folder_id}", "green")
# 3. 上传文件到新创建的文件夹
try:
if not os.path.exists(FILE_PATH):
raise Exception(f"文件不存在: {FILE_PATH}")
# 获取文件大小(以字节为单位)
file_size_bytes = os.path.getsize(FILE_PATH)
file_size_mb = file_size_bytes / (1024 * 1024)
with open(FILE_PATH, "rb") as f:
file_data = f.read()
# 确保读取的文件大小与实际文件大小一致
if len(file_data) != file_size_bytes:
raise Exception(f"文件读取不完整: 预期 {file_size_bytes} 字节, 实际读取 {len(file_data)} 字节")
# 进度回调函数
def progress_callback(percent):
print(f"\r🟢 上传进度: {percent}%", end="", flush=True)
file_name = os.path.basename(FILE_PATH)
client.colored_print(f"\n🟡 开始上传文件: {file_name} ({file_size_mb:.2f} MB)", "yellow")
file_id, upload_err = client.upload_file(
file_name=file_name,
file_data=file_data,
parent_dir_id=new_folder_id,
progress_callback=progress_callback
)
if upload_err:
client.colored_print(f"\n❌ 文件上传失败: {str(upload_err)}", "red")
exit(1)
else:
client.colored_print(f"\n✅ 文件上传成功! 文件ID: {file_id}", "green")
# 4. 创建分享任务
client.colored_print("\n🟡 正在创建分享任务...", "yellow")
share_resp, share_err = client.create_share(
fid_list=[new_folder_id],
title=FOLDER_NAME
)
if share_err:
client.colored_print(f"❌ 创建分享任务失败: {str(share_err)}", "red")
exit(1)
# 获取分享任务ID
task_id = share_resp.get('data', {}).get('task_id')
if not task_id:
client.colored_print("❌ 未能获取分享任务ID", "red")
exit(1)
client.colored_print(f"✅ 分享任务创建成功! 任务ID: {task_id}", "green")
# 5. 轮询分享任务状态
interval = 5 # 每次间隔5秒
client.colored_print("\n🟡 正在监测分享状态...", "yellow")
while True:
print("\r进行中...", end="", flush=True)
# 检查分享任务状态
task_resp, task_err = client.check_share_task(task_id)
if task_err:
client.colored_print(f"\n❌ 检查分享任务状态失败: {str(task_err)}", "red")
exit(1)
task_data = task_resp.get('data', {})
task_status = task_data.get('status')
# 状态2表示分享成功
if task_status == 2:
share_id = task_data.get('share_id')
if not share_id:
client.colored_print("\n❌ 分享成功但未获取到share_id", "red")
exit(1)
# 获取分享链接信息
share_info_resp, share_info_err = client.get_share_info(share_id)
if share_info_err:
client.colored_print(f"\n❌ 获取分享链接失败: {str(share_info_err)}", "red")
exit(1)
share_info_data = share_info_resp.get('data', {})
share_url = share_info_data.get('share_url', '')
passcode = share_info_data.get('passcode', '') # 注意:实际键名是pwd_id
# 如果获取到分享链接,退出循环
if share_url:
print("\n", end="") # 清除"进行中..."提示
break
# 检查任务是否失败
elif task_status in [3, 4]: # 3: 失败, 4: 取消
client.colored_print(f"\n❌ 分享任务失败: 状态码 {task_status}", "red")
exit(1)
# 等待下一次检查
time.sleep(interval)
# 6. 显示分享结果
client.colored_print("\n✅ 分享创建成功!", "green")
client.colored_print(f"📁 分享标题: {FOLDER_NAME}", "cyan")
client.colored_print(f"🔗 分享链接: {share_url}", "cyan")
if passcode:
client.colored_print(f"🔑 提取密码: {passcode}", "cyan")
except Exception as e:
client.colored_print(f"❌ 操作失败: {str(e)}", "red")
exit(1)
有没有UC网盘的上传文件脚本😁
UC网盘把api改成https://pc-api.uc.cn/1/clouddrive,referer和origin改成https://drive.uc.cn,pr改成UCBrowser应该就行了,两个是一家的