Files
pythonProject/synology_api/filestation.py
Victor Alexandrovich Tsyrenschikov ab5d6bf686 EspHome
2025-11-21 00:03:34 +05:00

1244 lines
48 KiB
Python

from __future__ import annotations
from typing import Optional, Any
import os
import io
import time
from datetime import datetime
import requests
import tqdm
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
import sys
from urllib import parse
from . import base_api
class FileStation(base_api.BaseApi):
def __init__(self,
ip_address: str,
port: str,
username: str,
password: str,
secure: bool = False,
cert_verify: bool = False,
dsm_version: int = 7,
debug: bool = True,
otp_code: Optional[str] = None,
interactive_output: bool = True
) -> None:
super(FileStation, self).__init__(ip_address, port, username, password, secure, cert_verify,
dsm_version, debug, otp_code, 'FileStation')
self._dir_taskid: str = ''
self._dir_taskid_list: list[str] = []
self._md5_calc_taskid: str = ''
self._md5_calc_taskid_list: list[str] = []
self._search_taskid: str = ''
self._search_taskid_list: list[str] = []
self._copy_move_taskid: str = ''
self._copy_move_taskid_list: list[str] = []
self._delete_taskid: str = ''
self._delete_taskid_list: list[str] = []
self._extract_taskid: str = ''
self._extract_taskid_list: list[str] = []
self._compress_taskid: str = ''
self._compress_taskid_list: list[str] = []
self.session.get_api_list('FileStation')
self.file_station_list: Any = self.session.app_api_list
self.interactive_output: bool = interactive_output
def get_info(self) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Info'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'get'}
return self.request_data(api_name, api_path, req_param)
def get_list_share(self,
additional: Optional[str | list[str]] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
onlywritable: bool = False
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.List'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list_share'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional']:
if val is not None:
req_param[str(key)] = val
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
req_param['additional'] = additional
return self.request_data(api_name, api_path, req_param)
def get_file_list(self,
folder_path: Optional[str] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
pattern: Optional[str] = None,
filetype: Optional[str] = None,
goto_path: Optional[str] = None,
additional: Optional[str | list[str]] = None) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.List'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional']:
if val is not None:
req_param[str(key)] = val
if folder_path is None:
return 'Enter a valid folder_path'
if filetype is not None:
req_param['filetype'] = str(req_param['filetype']).lower()
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
req_param['additional'] = additional
return self.request_data(api_name, api_path, req_param)
def get_file_info(self,
path: Optional[str] = None,
additional: Optional[str | list[str]] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.List'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'getinfo'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = str(additional).replace("'", '"')
req_param['additional'] = additional
return self.request_data(api_name, api_path, req_param)
# TODO all working if specify extension check if correct [pattern, extension]
# it works if you put extension='...'
def search_start(self,
folder_path: Optional[str] = None,
recursive: Optional[bool] = None,
pattern: Optional[str] = None,
extension: Optional[str] = None,
filetype: Optional[str] = None,
size_from: Optional[int] = None,
size_to: Optional[int] = None,
mtime_from: Optional[str | int] = None,
mtime_to: Optional[str | int] = None,
crtime_from: Optional[str | int] = None,
crtime_to: Optional[str | int] = None,
atime_from: Optional[str | int] = None,
atime_to: Optional[str | int] = None,
owner: Optional[str] = None,
group: Optional[str] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Search'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start', 'folder_path': ''}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param'] and 'time' not in key:
if val is not None:
req_param[str(key)] = val
if 'time' in key:
if val is not None:
try:
date = time.strptime(val, "%Y-%m-%d %H:%M:%S")
timestamp = time.mktime(date)
req_param[key] = '"' + str(timestamp) + '"'
except ValueError:
try:
datetime.fromtimestamp(int(val)).strftime('%Y-%m-%d %H:%M:%S')
req_param[key] = '"' + val + '"'
except ValueError:
return 'Enter the correct Date Time format "YYY-MM-DD HH:MM:SS" or Unix timestamp'
if folder_path is None:
return 'Enter a valid folder_path'
else:
req_param['folder_path'] = '"' + folder_path + '"'
if filetype is not None:
req_param['filetype'] = '"' + filetype + '"'
response = self.request_data(api_name, api_path, req_param)
taskid = response['data']['taskid']
self._search_taskid = '"{}"'.format(taskid)
self._search_taskid_list.append('"' + response['data']['taskid'] + '"')
message = ('You can now check the status of request with '
'get_search_list() , your id is: ' + self._search_taskid)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": taskid}
return output
def get_search_list(self,
task_id: str,
filetype: Optional[str] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
offset: Optional[int] = None,
additional: Optional[str | list[str]] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Search'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list', 'taskid': ''}
if task_id is None:
return 'Enter a correct taskid, choose one of the following: ' + str(self._search_taskid_list)
else:
req_param['taskid'] = task_id
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional', 'task_id']:
if val is not None:
req_param[str(key)] = val
if filetype is not None:
req_param['filetype'] = str(filetype).lower()
if additional is None:
additional = ['size', 'owner', 'time']
if type(additional) is list:
additional = '","'.join(additional)
req_param['additional'] = '["' + additional + '"]'
return self.request_data(api_name, api_path, req_param)
def stop_search_task(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Search'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': self._search_taskid}
if taskid is None:
return 'Enter a valid taskid, choose between ' + str(self._search_taskid_list)
self._search_taskid_list.remove(taskid)
return self.request_data(api_name, api_path, req_param)
def stop_all_search_task(self) -> str:
api_name = 'SYNO.FileStation.Search'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': ''}
assert len(self._search_taskid_list), 'Task list is empty' + str(self._search_taskid_list)
for task_id in self._search_taskid_list:
req_param['taskid'] = task_id
self.request_data(api_name, api_path, req_param)
self._search_taskid_list = []
return 'All task are stopped'
def get_mount_point_list(self,
mount_type: Optional[str] = None,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
additional: Optional[str | list[str]] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.VirtualFolder'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list'}
if mount_type is not None:
req_param['type'] = mount_type
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional', 'mount_type']:
if val is not None:
req_param[str(key)] = val
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
req_param['additional'] = additional
return self.request_data(api_name, api_path, req_param)
def get_favorite_list(self,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
status_filter: Optional[str] = None,
additional: Optional[str | list[str]] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Favorite'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional']:
if val is not None:
req_param[str(key)] = val
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
req_param['additional'] = additional
return self.request_data(api_name, api_path, req_param)
def add_a_favorite(self,
path: str,
name: Optional[str] = None,
index: Optional[int] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Favorite'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'add'}
if path is None:
return 'Enter a valid path'
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
return self.request_data(api_name, api_path, req_param)
def delete_a_favorite(self, path: Optional[str] = None) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Favorite'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'delete'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
return self.request_data(api_name, api_path, req_param)
def clear_broken_favorite(self) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Favorite'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'clear_broken'}
return self.request_data(api_name, api_path, req_param)
def edit_favorite_name(self, path: str, new_name: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Favorite'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'edit'}
if path is None:
return 'Enter a valid path'
else:
req_param['path'] = path
if new_name is None:
return 'Enter a valid new_name'
else:
req_param['new_name'] = new_name
return self.request_data(api_name, api_path, req_param)
def replace_all_favorite(self, path: str | list[str], name: str | list[str]):
api_name = 'SYNO.FileStation.Favorite'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'edit'}
if type(path) is list:
path = ','.join(path)
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid path'
if type(name) is list:
name = ','.join(name)
req_param['name'] = name
elif name is not None:
req_param['name'] = name
else:
return 'Enter a valid name'
return self.request_data(api_name, api_path, req_param)
def start_dir_size_calc(self, path: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.DirSize'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid path'
taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
response_id = '"{}"'.format(taskid)
self._dir_taskid = response_id
self._dir_taskid_list.append(response_id)
message = ('You can now check the status of request '
'with get_dir_status() , your id is: '
+ response_id)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": taskid}
return output
def stop_dir_size_calc(self, taskid: str) -> str:
api_name = 'SYNO.FileStation.DirSize'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': taskid}
if taskid is None:
return 'Enter a valid taskid choose between: ' + str(self._dir_taskid_list)
else:
req_param['taskid'] = '"' + taskid + '"'
self.request_data(api_name, api_path, req_param)
self._dir_taskid_list.remove('"' + taskid + '"')
return 'The task has been stopped'
def get_dir_status(self, taskid: Optional[str] = None) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.DirSize'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'status', 'taskid': taskid}
if taskid is None and self._dir_taskid != '':
return 'Choose a taskid from this list: ' + str(self._dir_taskid)
return self.request_data(api_name, api_path, req_param)
def start_md5_calc(self, file_path: str) -> str | dict[str, object]:
api_name = 'SYNO.FileStation.MD5'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start'}
if file_path is None:
return 'Enter a correct file_path'
else:
req_param['file_path'] = file_path
self._md5_calc_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
self._md5_calc_taskid_list.append(self._md5_calc_taskid)
message = ('You can now check the status of request with '
'get_md5_status() , your id is: ' + self._md5_calc_taskid)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": self._md5_calc_taskid}
return output
def get_md5_status(self, taskid: Optional[str] = None) -> str | dict[str, object]:
api_name = 'SYNO.FileStation.MD5'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'status'}
if taskid is None and self._md5_calc_taskid != '':
req_param['taskid'] = '"{taskid}"'.format(taskid=self._md5_calc_taskid)
elif taskid is not None:
req_param['taskid'] = '"{taskid}"'.format(taskid=taskid)
else:
return 'Did you run start_md5_calc() first? No task id found! ' + str(self._md5_calc_taskid)
return self.request_data(api_name, api_path, req_param)
def stop_md5_calc(self, taskid: str) -> str:
api_name = 'SYNO.FileStation.DirSize'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': taskid}
if taskid is None:
return 'Enter a valid taskid choose between: ' + str(self._md5_calc_taskid_list)
else:
req_param['taskid'] = '"' + taskid + '"'
self.request_data(api_name, api_path, req_param)
self._md5_calc_taskid_list.remove(taskid)
return 'The task has been stopped'
def check_permissions(self,
path: str,
filename: str,
overwrite: Optional[bool] = None,
create_only: Optional[bool] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.CheckPermission'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'write'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
if path is None:
return 'Enter a valid path'
if filename is None:
return 'Enter a valid name'
return self.request_data(api_name, api_path, req_param)
def upload_file(self,
dest_path: str,
file_path: str,
create_parents: bool = True,
overwrite: bool = True,
verify: bool = False,
progress_bar: bool = True
) -> str | tuple[int, dict[str, object]]:
api_name = 'SYNO.FileStation.Upload'
info = self.file_station_list[api_name]
api_path = info['path']
filename = os.path.basename(file_path)
session = requests.session()
with open(file_path, 'rb') as payload:
url = ('%s%s' % (self.base_url, api_path)) + '?api=%s&version=%s&method=upload&_sid=%s' % (
api_name, info['minVersion'], self._sid)
encoder = MultipartEncoder({
'path': dest_path,
'create_parents': str(create_parents).lower(),
'overwrite': str(overwrite).lower(),
'files': (filename, payload, 'application/octet-stream')
})
if progress_bar:
bar = tqdm.tqdm(desc='Upload Progress',
total=encoder.len,
dynamic_ncols=True,
unit='B',
unit_scale=True,
unit_divisor=1024
)
monitor = MultipartEncoderMonitor(encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
r = session.post(
url,
data=monitor,
verify=verify,
headers={"X-SYNO-TOKEN": self.session._syno_token, 'Content-Type': monitor.content_type}
)
else:
r = session.post(
url,
data=encoder,
verify=verify,
headers={"X-SYNO-TOKEN": self.session._syno_token, 'Content-Type': encoder.content_type}
)
session.close()
if r.status_code != 200 or not r.json()['success']:
return r.status_code, r.json()
return r.json()
def get_shared_link_info(self, link_id: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Sharing'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'getinfo'}
if link_id is None:
return 'Enter a valid id'
else:
req_param['id'] = link_id
return self.request_data(api_name, api_path, req_param)
def get_shared_link_list(self,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
force_clean: Optional[bool] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Sharing'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
return self.request_data(api_name, api_path, req_param)
def create_sharing_link(self,
path: str,
password: Optional[str] = None,
date_expired: Optional[str | int] = None,
date_available: Optional[str | int] = None,
expire_times: int = 0
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Sharing'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'create'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
if path is None:
return 'Enter a valid path'
return self.request_data(api_name, api_path, req_param)
def delete_shared_link(self, link_id: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Sharing'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'delete'}
if link_id is None:
return 'Enter a valid id'
else:
req_param['id'] = link_id
return self.request_data(api_name, api_path, req_param)
def clear_invalid_shared_link(self) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Sharing'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'clear_invalid'}
return self.request_data(api_name, api_path, req_param)
def edit_shared_link(self,
link_id: str,
password: Optional[str] = None,
date_expired: Optional[str | int] = None,
date_available: Optional[str | int] = None,
expire_times: int = 0
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Sharing'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'edit'}
if link_id is None:
return 'Enter a valid id'
else:
req_param['id'] = link_id
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
return self.request_data(api_name, api_path, req_param)
def create_folder(self,
folder_path: str | list[str],
name: str | list[str],
force_parent: Optional[bool] = None,
additional: Optional[str | list[str]] = None
) -> str | dict[str, object]:
api_name = 'SYNO.FileStation.CreateFolder'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'create'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'folder_path', 'additional', 'name']:
if val is not None:
req_param[str(key)] = val
if type(folder_path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in folder_path]
folder_path = new_path
folder_path = '[' + ','.join(folder_path) + ']'
req_param['folder_path'] = folder_path
elif folder_path is not None:
req_param['folder_path'] = folder_path
else:
return 'Enter a valid path'
if type(name) is list:
new_path = []
[new_path.append('"' + x + '"') for x in name]
name = new_path
name = '[' + ','.join(name) + ']'
req_param['name'] = name
elif name is not None:
req_param['name'] = '"' + name + '"'
else:
return 'Enter a valid path'
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
req_param['additional'] = additional
return self.request_data(api_name, api_path, req_param)
def rename_folder(self,
path: str | list[str],
name: str | list[str],
additional: Optional[str | list[str]] = None,
search_taskid: Optional[str] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Rename'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'rename'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid folder path (folder path only ex. "/home/Drive/Downloads")'
if type(name) is list:
new_path = []
[new_path.append('"' + x + '"') for x in name]
name = new_path
name = '[' + ','.join(name) + ']'
req_param['name'] = name
elif name is not None:
req_param['name'] = name
else:
return 'Enter a valid new folder name (new folder name only ex. "New Folder")'
if additional is None:
additional = ['real_path', 'size', 'owner', 'time']
if type(additional) is list:
additional = ','.join(additional)
req_param['additional'] = additional
if search_taskid is not None:
req_param['search_taskid'] = search_taskid
return self.request_data(api_name, api_path, req_param)
def start_copy_move(self,
path: str | list[str],
dest_folder_path: str | list[str],
overwrite: Optional[bool] = None,
remove_src: Optional[bool] = None,
accurate_progress: Optional[bool] = None,
search_taskid: Optional[str] = None
) -> str | dict[str, object]:
api_name = 'SYNO.FileStation.CopyMove'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid path'
if type(dest_folder_path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in dest_folder_path]
dest_folder_path = new_path
dest_folder_path = '[' + ','.join(dest_folder_path) + ']'
req_param['name'] = dest_folder_path
elif dest_folder_path is not None:
req_param['dest_folder_path'] = dest_folder_path
else:
return 'Enter a valid path'
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'path', 'additional',
'dest_folder_path', 'new_path']:
if val is not None:
req_param[str(key)] = val
self._copy_move_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
self._dir_taskid_list.append(self._copy_move_taskid)
message = ('You can now check the status of request with '
'get_copy_move_status() , your id is: '
+ self._copy_move_taskid)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": self._copy_move_taskid}
return output
def get_copy_move_status(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.CopyMove'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'status'}
if taskid is None:
return 'Enter a valid taskid choose between ' + str(self._copy_move_taskid_list)
else:
req_param['taskid'] = '"' + taskid + '"'
return self.request_data(api_name, api_path, req_param)
def stop_copy_move_task(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.CopyMove'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop'}
if taskid is None:
return 'Enter a valid taskid choose between ' + str(self._copy_move_taskid_list)
else:
req_param['taskid'] = taskid
self._copy_move_taskid_list.remove(taskid)
return self.request_data(api_name, api_path, req_param)
def start_delete_task(self,
path: str | list[str],
accurate_progress: Optional[bool] = None,
recursive: Optional[bool] = None,
search_taskid: Optional[str] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Delete'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid path'
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'path', 'new_path']:
if val is not None:
req_param[str(key)] = val
self._delete_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
self._delete_taskid_list.append(self._delete_taskid)
message = ('You can now check the status of request with '
'get_delete_status() , task id is: '
+ self._delete_taskid)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": self._delete_taskid}
return output
def get_delete_status(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Delete'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'status'}
if taskid is None:
return 'Enter a valid taskid, choose between ' + str(self._delete_taskid_list)
else:
req_param['taskid'] = taskid
return self.request_data(api_name, api_path, req_param)
def stop_delete_task(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Delete'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop'}
if taskid is None:
return 'Enter a valid taskid, choose between ' + str(self._delete_taskid_list)
else:
req_param['taskid'] = taskid
self._delete_taskid_list.remove('"' + taskid + '"')
return self.request_data(api_name, api_path, req_param)
def delete_blocking_function(self,
path: str,
recursive: Optional[bool] = None,
search_taskid: Optional[str] = None) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Delete'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'delete'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid path'
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'path', 'new_path']:
if val is not None:
req_param[str(key)] = val
'This function will stop your script until done! Do not interrupt '
return self.request_data(api_name, api_path, req_param)
def start_extract_task(self,
file_path: str,
dest_folder_path: str,
overwrite: Optional[bool] = None,
keep_dir: Optional[bool] = None,
create_subfolder: Optional[bool] = None,
codepage: Optional[str] = None,
password: Optional[str] = None,
item_id: Optional[str] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Extract'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start', 'file_path': file_path,
'dest_folder_path': dest_folder_path}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
if file_path is None:
return 'Enter a valid file_path'
if dest_folder_path is None:
return 'Enter a valid dest_folder_path'
self._extract_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
self._extract_taskid_list.append(self._extract_taskid)
message = ('You can now check the status of request with '
'get_extract_status() , your id is: '
+ self._extract_taskid)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": self._extract_taskid}
return output
def get_extract_status(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Extract'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'status'}
if taskid is None:
return 'Enter a valid taskid, choose between ' + str(self._extract_taskid_list)
else:
req_param['taskid'] = taskid
return self.request_data(api_name, api_path, req_param)
def stop_extract_task(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Extract'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop'}
if taskid is None:
return 'Enter a valid taskid, choose between ' + str(self._extract_taskid_list)
else:
req_param['taskid'] = taskid
self._extract_taskid_list.remove(taskid)
return self.request_data(api_name, api_path, req_param)
def get_file_list_of_archive(self,
file_path: str,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
codepage: Optional[str] = None,
password: Optional[str] = None,
item_id: Optional[str] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Extract'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
if file_path is None:
return 'Enter a valid file_path'
return self.request_data(api_name, api_path, req_param)
def start_file_compression(self,
path: str | list[str],
dest_file_path: str,
level: Optional[int] = None,
mode: Optional[str] = None,
compress_format: Optional[str] = None,
password: Optional[str] = None
) -> dict[str, object] | str | tuple[str]:
api_name = 'SYNO.FileStation.Compress'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'start'}
if type(path) is list:
new_path = []
[new_path.append('"' + x + '"') for x in path]
path = new_path
path = '[' + ','.join(path) + ']'
req_param['path'] = path
elif path is not None:
req_param['path'] = path
else:
return 'Enter a valid path'
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'compress_format', '_password', '_api_path',
'req_param', 'path', 'new_path']:
if val is not None:
req_param[str(key)] = val
if dest_file_path is None:
return 'Enter a valid dest_file_path'
if compress_format is not None:
req_param['format'] = compress_format
if password is not None:
req_param['_password'] = password
self._compress_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
message = ('You can now check the status of request with '
'get_compress_status() , your id is: '
+ self._compress_taskid)
if self.interactive_output:
output = message
else:
output = {"message": message, "taskid": self._compress_taskid}
return output
def get_compress_status(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Compress'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'status'}
if taskid is None:
return 'Enter a valid taskid'
else:
req_param['taskid'] = taskid
return self.request_data(api_name, api_path, req_param)
def stop_compress_task(self, taskid: str) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.Compress'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'stop'}
if taskid is None:
return 'Enter a valid taskid'
else:
req_param['taskid'] = taskid
return self.request_data(api_name, api_path, req_param)
def get_list_of_all_background_task(self,
offset: Optional[int] = None,
limit: Optional[int] = None,
sort_by: Optional[str] = None,
sort_direction: Optional[str] = None,
api_filter: Optional[str] = None
) -> dict[str, object] | str:
api_name = 'SYNO.FileStation.BackgroundTask'
info = self.file_station_list[api_name]
api_path = info['path']
req_param = {'version': info['maxVersion'], 'method': 'list'}
for key, val in locals().items():
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
if val is not None:
req_param[str(key)] = val
if type(api_filter) is list:
new_path = []
[new_path.append('"' + x + '"') for x in api_filter]
api_filter = new_path
api_filter = '[' + ','.join(api_filter) + ']'
req_param['api_filter'] = api_filter
return self.request_data(api_name, api_path, req_param)
def get_file(self,
path: str,
mode: str,
dest_path: str = ".",
chunk_size: int = 8192,
verify: bool = False
) -> Optional[str]:
api_name = 'SYNO.FileStation.Download'
info = self.file_station_list[api_name]
api_path = info['path']
if path is None:
return 'Enter a valid path'
session = requests.session()
url = ('%s%s' % (self.base_url, api_path)) + '?api=%s&version=%s&method=download&path=%s&mode=%s&_sid=%s' % (
api_name, info['maxVersion'], parse.quote_plus(path), mode, self._sid)
if mode is None:
return 'Enter a valid mode (open / download)'
if mode == r'open':
with session.get(url, stream=True, verify=verify, headers={"X-SYNO-TOKEN": self.session._syno_token}) as r:
r.raise_for_status()
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive new chunks
sys.stdout.buffer.write(chunk)
if mode == r'download':
with session.get(url, stream=True, verify=verify, headers={"X-SYNO-TOKEN": self.session._syno_token}) as r:
r.raise_for_status()
if not os.path.isdir(dest_path):
os.makedirs(dest_path)
with open(dest_path + "/" + os.path.basename(path), 'wb') as f:
for chunk in r.iter_content(chunk_size=chunk_size):
if chunk: # filter out keep-alive new chunks
f.write(chunk)
if mode == r'serve':
with session.get(url, stream=True, verify=verify, headers={"X-SYNO-TOKEN": self.session._syno_token}) as r:
r.raise_for_status()
return io.BytesIO(r.content)
# TODO SYNO.FileStation.Thumb to be done