1244 lines
48 KiB
Python
1244 lines
48 KiB
Python
from __future__ import annotations
|
|
from typing import Optional, Any
|
|
import os
|
|
import io
|
|
import time
|
|
from datetime import datetime
|
|
|
|
import requests
|
|
import tqdm
|
|
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
|
|
import sys
|
|
from urllib import parse
|
|
|
|
from . import base_api
|
|
|
|
|
|
class FileStation(base_api.BaseApi):
|
|
|
|
def __init__(self,
|
|
ip_address: str,
|
|
port: str,
|
|
username: str,
|
|
password: str,
|
|
secure: bool = False,
|
|
cert_verify: bool = False,
|
|
dsm_version: int = 7,
|
|
debug: bool = True,
|
|
otp_code: Optional[str] = None,
|
|
interactive_output: bool = True
|
|
) -> None:
|
|
|
|
super(FileStation, self).__init__(ip_address, port, username, password, secure, cert_verify,
|
|
dsm_version, debug, otp_code, 'FileStation')
|
|
|
|
self._dir_taskid: str = ''
|
|
self._dir_taskid_list: list[str] = []
|
|
self._md5_calc_taskid: str = ''
|
|
self._md5_calc_taskid_list: list[str] = []
|
|
self._search_taskid: str = ''
|
|
self._search_taskid_list: list[str] = []
|
|
self._copy_move_taskid: str = ''
|
|
self._copy_move_taskid_list: list[str] = []
|
|
self._delete_taskid: str = ''
|
|
self._delete_taskid_list: list[str] = []
|
|
self._extract_taskid: str = ''
|
|
self._extract_taskid_list: list[str] = []
|
|
self._compress_taskid: str = ''
|
|
self._compress_taskid_list: list[str] = []
|
|
|
|
self.session.get_api_list('FileStation')
|
|
|
|
self.file_station_list: Any = self.session.app_api_list
|
|
|
|
self.interactive_output: bool = interactive_output
|
|
|
|
def get_info(self) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Info'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'get'}
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_list_share(self,
|
|
additional: Optional[str | list[str]] = None,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
onlywritable: bool = False
|
|
) -> dict[str, object] | str:
|
|
|
|
api_name = 'SYNO.FileStation.List'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list_share'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
req_param['additional'] = additional
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_file_list(self,
|
|
folder_path: Optional[str] = None,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
pattern: Optional[str] = None,
|
|
filetype: Optional[str] = None,
|
|
goto_path: Optional[str] = None,
|
|
additional: Optional[str | list[str]] = None) -> dict[str, object] | str:
|
|
|
|
api_name = 'SYNO.FileStation.List'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if folder_path is None:
|
|
return 'Enter a valid folder_path'
|
|
|
|
if filetype is not None:
|
|
req_param['filetype'] = str(req_param['filetype']).lower()
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
req_param['additional'] = additional
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_file_info(self,
|
|
path: Optional[str] = None,
|
|
additional: Optional[str | list[str]] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.List'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'getinfo'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = str(additional).replace("'", '"')
|
|
|
|
req_param['additional'] = additional
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
# TODO all working if specify extension check if correct [pattern, extension]
|
|
# it works if you put extension='...'
|
|
|
|
def search_start(self,
|
|
folder_path: Optional[str] = None,
|
|
recursive: Optional[bool] = None,
|
|
pattern: Optional[str] = None,
|
|
extension: Optional[str] = None,
|
|
filetype: Optional[str] = None,
|
|
size_from: Optional[int] = None,
|
|
size_to: Optional[int] = None,
|
|
mtime_from: Optional[str | int] = None,
|
|
mtime_to: Optional[str | int] = None,
|
|
crtime_from: Optional[str | int] = None,
|
|
crtime_to: Optional[str | int] = None,
|
|
atime_from: Optional[str | int] = None,
|
|
atime_to: Optional[str | int] = None,
|
|
owner: Optional[str] = None,
|
|
group: Optional[str] = None
|
|
) -> dict[str, object] | str:
|
|
|
|
api_name = 'SYNO.FileStation.Search'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start', 'folder_path': ''}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param'] and 'time' not in key:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
if 'time' in key:
|
|
if val is not None:
|
|
try:
|
|
date = time.strptime(val, "%Y-%m-%d %H:%M:%S")
|
|
timestamp = time.mktime(date)
|
|
req_param[key] = '"' + str(timestamp) + '"'
|
|
except ValueError:
|
|
try:
|
|
datetime.fromtimestamp(int(val)).strftime('%Y-%m-%d %H:%M:%S')
|
|
req_param[key] = '"' + val + '"'
|
|
except ValueError:
|
|
return 'Enter the correct Date Time format "YYY-MM-DD HH:MM:SS" or Unix timestamp'
|
|
|
|
if folder_path is None:
|
|
return 'Enter a valid folder_path'
|
|
else:
|
|
req_param['folder_path'] = '"' + folder_path + '"'
|
|
|
|
if filetype is not None:
|
|
req_param['filetype'] = '"' + filetype + '"'
|
|
|
|
response = self.request_data(api_name, api_path, req_param)
|
|
|
|
taskid = response['data']['taskid']
|
|
self._search_taskid = '"{}"'.format(taskid)
|
|
self._search_taskid_list.append('"' + response['data']['taskid'] + '"')
|
|
|
|
message = ('You can now check the status of request with '
|
|
'get_search_list() , your id is: ' + self._search_taskid)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": taskid}
|
|
|
|
return output
|
|
|
|
def get_search_list(self,
|
|
task_id: str,
|
|
filetype: Optional[str] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
offset: Optional[int] = None,
|
|
additional: Optional[str | list[str]] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Search'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list', 'taskid': ''}
|
|
|
|
if task_id is None:
|
|
return 'Enter a correct taskid, choose one of the following: ' + str(self._search_taskid_list)
|
|
else:
|
|
req_param['taskid'] = task_id
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional', 'task_id']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if filetype is not None:
|
|
req_param['filetype'] = str(filetype).lower()
|
|
|
|
if additional is None:
|
|
additional = ['size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = '","'.join(additional)
|
|
|
|
req_param['additional'] = '["' + additional + '"]'
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_search_task(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Search'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': self._search_taskid}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid, choose between ' + str(self._search_taskid_list)
|
|
|
|
self._search_taskid_list.remove(taskid)
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_all_search_task(self) -> str:
|
|
api_name = 'SYNO.FileStation.Search'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': ''}
|
|
|
|
assert len(self._search_taskid_list), 'Task list is empty' + str(self._search_taskid_list)
|
|
|
|
for task_id in self._search_taskid_list:
|
|
req_param['taskid'] = task_id
|
|
self.request_data(api_name, api_path, req_param)
|
|
|
|
self._search_taskid_list = []
|
|
|
|
return 'All task are stopped'
|
|
|
|
def get_mount_point_list(self,
|
|
mount_type: Optional[str] = None,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
additional: Optional[str | list[str]] = None
|
|
) -> dict[str, object] | str:
|
|
|
|
api_name = 'SYNO.FileStation.VirtualFolder'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
if mount_type is not None:
|
|
req_param['type'] = mount_type
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional', 'mount_type']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
req_param['additional'] = additional
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_favorite_list(self,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
status_filter: Optional[str] = None,
|
|
additional: Optional[str | list[str]] = None
|
|
) -> dict[str, object] | str:
|
|
|
|
api_name = 'SYNO.FileStation.Favorite'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'additional']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
req_param['additional'] = additional
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def add_a_favorite(self,
|
|
path: str,
|
|
name: Optional[str] = None,
|
|
index: Optional[int] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Favorite'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'add'}
|
|
|
|
if path is None:
|
|
return 'Enter a valid path'
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def delete_a_favorite(self, path: Optional[str] = None) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Favorite'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'delete'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def clear_broken_favorite(self) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Favorite'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'clear_broken'}
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def edit_favorite_name(self, path: str, new_name: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Favorite'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'edit'}
|
|
|
|
if path is None:
|
|
return 'Enter a valid path'
|
|
else:
|
|
req_param['path'] = path
|
|
|
|
if new_name is None:
|
|
return 'Enter a valid new_name'
|
|
else:
|
|
req_param['new_name'] = new_name
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def replace_all_favorite(self, path: str | list[str], name: str | list[str]):
|
|
api_name = 'SYNO.FileStation.Favorite'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'edit'}
|
|
|
|
if type(path) is list:
|
|
path = ','.join(path)
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
if type(name) is list:
|
|
name = ','.join(name)
|
|
req_param['name'] = name
|
|
elif name is not None:
|
|
req_param['name'] = name
|
|
else:
|
|
return 'Enter a valid name'
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def start_dir_size_calc(self, path: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.DirSize'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
|
|
|
|
response_id = '"{}"'.format(taskid)
|
|
self._dir_taskid = response_id
|
|
self._dir_taskid_list.append(response_id)
|
|
|
|
message = ('You can now check the status of request '
|
|
'with get_dir_status() , your id is: '
|
|
+ response_id)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": taskid}
|
|
|
|
return output
|
|
|
|
def stop_dir_size_calc(self, taskid: str) -> str:
|
|
api_name = 'SYNO.FileStation.DirSize'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': taskid}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid choose between: ' + str(self._dir_taskid_list)
|
|
else:
|
|
req_param['taskid'] = '"' + taskid + '"'
|
|
|
|
self.request_data(api_name, api_path, req_param)
|
|
self._dir_taskid_list.remove('"' + taskid + '"')
|
|
|
|
return 'The task has been stopped'
|
|
|
|
def get_dir_status(self, taskid: Optional[str] = None) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.DirSize'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'status', 'taskid': taskid}
|
|
|
|
if taskid is None and self._dir_taskid != '':
|
|
return 'Choose a taskid from this list: ' + str(self._dir_taskid)
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def start_md5_calc(self, file_path: str) -> str | dict[str, object]:
|
|
api_name = 'SYNO.FileStation.MD5'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start'}
|
|
|
|
if file_path is None:
|
|
return 'Enter a correct file_path'
|
|
else:
|
|
req_param['file_path'] = file_path
|
|
|
|
self._md5_calc_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
|
|
self._md5_calc_taskid_list.append(self._md5_calc_taskid)
|
|
|
|
message = ('You can now check the status of request with '
|
|
'get_md5_status() , your id is: ' + self._md5_calc_taskid)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": self._md5_calc_taskid}
|
|
|
|
return output
|
|
|
|
def get_md5_status(self, taskid: Optional[str] = None) -> str | dict[str, object]:
|
|
api_name = 'SYNO.FileStation.MD5'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'status'}
|
|
|
|
if taskid is None and self._md5_calc_taskid != '':
|
|
req_param['taskid'] = '"{taskid}"'.format(taskid=self._md5_calc_taskid)
|
|
elif taskid is not None:
|
|
req_param['taskid'] = '"{taskid}"'.format(taskid=taskid)
|
|
else:
|
|
return 'Did you run start_md5_calc() first? No task id found! ' + str(self._md5_calc_taskid)
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_md5_calc(self, taskid: str) -> str:
|
|
api_name = 'SYNO.FileStation.DirSize'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop', 'taskid': taskid}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid choose between: ' + str(self._md5_calc_taskid_list)
|
|
else:
|
|
req_param['taskid'] = '"' + taskid + '"'
|
|
|
|
self.request_data(api_name, api_path, req_param)
|
|
self._md5_calc_taskid_list.remove(taskid)
|
|
|
|
return 'The task has been stopped'
|
|
|
|
def check_permissions(self,
|
|
path: str,
|
|
filename: str,
|
|
overwrite: Optional[bool] = None,
|
|
create_only: Optional[bool] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.CheckPermission'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'write'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if path is None:
|
|
return 'Enter a valid path'
|
|
|
|
if filename is None:
|
|
return 'Enter a valid name'
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def upload_file(self,
|
|
dest_path: str,
|
|
file_path: str,
|
|
create_parents: bool = True,
|
|
overwrite: bool = True,
|
|
verify: bool = False,
|
|
progress_bar: bool = True
|
|
) -> str | tuple[int, dict[str, object]]:
|
|
api_name = 'SYNO.FileStation.Upload'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
filename = os.path.basename(file_path)
|
|
|
|
session = requests.session()
|
|
|
|
with open(file_path, 'rb') as payload:
|
|
url = ('%s%s' % (self.base_url, api_path)) + '?api=%s&version=%s&method=upload&_sid=%s' % (
|
|
api_name, info['minVersion'], self._sid)
|
|
|
|
encoder = MultipartEncoder({
|
|
'path': dest_path,
|
|
'create_parents': str(create_parents).lower(),
|
|
'overwrite': str(overwrite).lower(),
|
|
'files': (filename, payload, 'application/octet-stream')
|
|
})
|
|
|
|
if progress_bar:
|
|
bar = tqdm.tqdm(desc='Upload Progress',
|
|
total=encoder.len,
|
|
dynamic_ncols=True,
|
|
unit='B',
|
|
unit_scale=True,
|
|
unit_divisor=1024
|
|
)
|
|
|
|
monitor = MultipartEncoderMonitor(encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
|
|
|
|
r = session.post(
|
|
url,
|
|
data=monitor,
|
|
verify=verify,
|
|
headers={"X-SYNO-TOKEN": self.session._syno_token, 'Content-Type': monitor.content_type}
|
|
)
|
|
|
|
else:
|
|
r = session.post(
|
|
url,
|
|
data=encoder,
|
|
verify=verify,
|
|
headers={"X-SYNO-TOKEN": self.session._syno_token, 'Content-Type': encoder.content_type}
|
|
)
|
|
|
|
session.close()
|
|
if r.status_code != 200 or not r.json()['success']:
|
|
return r.status_code, r.json()
|
|
|
|
return r.json()
|
|
|
|
def get_shared_link_info(self, link_id: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Sharing'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'getinfo'}
|
|
|
|
if link_id is None:
|
|
return 'Enter a valid id'
|
|
else:
|
|
req_param['id'] = link_id
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_shared_link_list(self,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
force_clean: Optional[bool] = None
|
|
) -> dict[str, object] | str:
|
|
|
|
api_name = 'SYNO.FileStation.Sharing'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def create_sharing_link(self,
|
|
path: str,
|
|
password: Optional[str] = None,
|
|
date_expired: Optional[str | int] = None,
|
|
date_available: Optional[str | int] = None,
|
|
expire_times: int = 0
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Sharing'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'create'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if path is None:
|
|
return 'Enter a valid path'
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def delete_shared_link(self, link_id: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Sharing'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'delete'}
|
|
|
|
if link_id is None:
|
|
return 'Enter a valid id'
|
|
else:
|
|
req_param['id'] = link_id
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def clear_invalid_shared_link(self) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Sharing'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'clear_invalid'}
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def edit_shared_link(self,
|
|
link_id: str,
|
|
password: Optional[str] = None,
|
|
date_expired: Optional[str | int] = None,
|
|
date_available: Optional[str | int] = None,
|
|
expire_times: int = 0
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Sharing'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'edit'}
|
|
|
|
if link_id is None:
|
|
return 'Enter a valid id'
|
|
else:
|
|
req_param['id'] = link_id
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def create_folder(self,
|
|
folder_path: str | list[str],
|
|
name: str | list[str],
|
|
force_parent: Optional[bool] = None,
|
|
additional: Optional[str | list[str]] = None
|
|
) -> str | dict[str, object]:
|
|
api_name = 'SYNO.FileStation.CreateFolder'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'create'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'folder_path', 'additional', 'name']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if type(folder_path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in folder_path]
|
|
folder_path = new_path
|
|
folder_path = '[' + ','.join(folder_path) + ']'
|
|
req_param['folder_path'] = folder_path
|
|
elif folder_path is not None:
|
|
req_param['folder_path'] = folder_path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
if type(name) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in name]
|
|
name = new_path
|
|
name = '[' + ','.join(name) + ']'
|
|
req_param['name'] = name
|
|
elif name is not None:
|
|
req_param['name'] = '"' + name + '"'
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
req_param['additional'] = additional
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def rename_folder(self,
|
|
path: str | list[str],
|
|
name: str | list[str],
|
|
additional: Optional[str | list[str]] = None,
|
|
search_taskid: Optional[str] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Rename'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'rename'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid folder path (folder path only ex. "/home/Drive/Downloads")'
|
|
|
|
if type(name) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in name]
|
|
name = new_path
|
|
name = '[' + ','.join(name) + ']'
|
|
req_param['name'] = name
|
|
elif name is not None:
|
|
req_param['name'] = name
|
|
else:
|
|
return 'Enter a valid new folder name (new folder name only ex. "New Folder")'
|
|
|
|
if additional is None:
|
|
additional = ['real_path', 'size', 'owner', 'time']
|
|
|
|
if type(additional) is list:
|
|
additional = ','.join(additional)
|
|
|
|
req_param['additional'] = additional
|
|
|
|
if search_taskid is not None:
|
|
req_param['search_taskid'] = search_taskid
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def start_copy_move(self,
|
|
path: str | list[str],
|
|
dest_folder_path: str | list[str],
|
|
overwrite: Optional[bool] = None,
|
|
remove_src: Optional[bool] = None,
|
|
accurate_progress: Optional[bool] = None,
|
|
search_taskid: Optional[str] = None
|
|
) -> str | dict[str, object]:
|
|
api_name = 'SYNO.FileStation.CopyMove'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
if type(dest_folder_path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in dest_folder_path]
|
|
dest_folder_path = new_path
|
|
dest_folder_path = '[' + ','.join(dest_folder_path) + ']'
|
|
req_param['name'] = dest_folder_path
|
|
elif dest_folder_path is not None:
|
|
req_param['dest_folder_path'] = dest_folder_path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'path', 'additional',
|
|
'dest_folder_path', 'new_path']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
self._copy_move_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
|
|
self._dir_taskid_list.append(self._copy_move_taskid)
|
|
|
|
message = ('You can now check the status of request with '
|
|
'get_copy_move_status() , your id is: '
|
|
+ self._copy_move_taskid)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": self._copy_move_taskid}
|
|
|
|
return output
|
|
|
|
def get_copy_move_status(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.CopyMove'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'status'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid choose between ' + str(self._copy_move_taskid_list)
|
|
else:
|
|
req_param['taskid'] = '"' + taskid + '"'
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_copy_move_task(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.CopyMove'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid choose between ' + str(self._copy_move_taskid_list)
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
self._copy_move_taskid_list.remove(taskid)
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def start_delete_task(self,
|
|
path: str | list[str],
|
|
accurate_progress: Optional[bool] = None,
|
|
recursive: Optional[bool] = None,
|
|
search_taskid: Optional[str] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Delete'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'path', 'new_path']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
self._delete_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
|
|
self._delete_taskid_list.append(self._delete_taskid)
|
|
|
|
message = ('You can now check the status of request with '
|
|
'get_delete_status() , task id is: '
|
|
+ self._delete_taskid)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": self._delete_taskid}
|
|
|
|
return output
|
|
|
|
def get_delete_status(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Delete'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'status'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid, choose between ' + str(self._delete_taskid_list)
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_delete_task(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Delete'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid, choose between ' + str(self._delete_taskid_list)
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
self._delete_taskid_list.remove('"' + taskid + '"')
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def delete_blocking_function(self,
|
|
path: str,
|
|
recursive: Optional[bool] = None,
|
|
search_taskid: Optional[str] = None) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Delete'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'delete'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param', 'path', 'new_path']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
'This function will stop your script until done! Do not interrupt '
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def start_extract_task(self,
|
|
file_path: str,
|
|
dest_folder_path: str,
|
|
overwrite: Optional[bool] = None,
|
|
keep_dir: Optional[bool] = None,
|
|
create_subfolder: Optional[bool] = None,
|
|
codepage: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
item_id: Optional[str] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Extract'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start', 'file_path': file_path,
|
|
'dest_folder_path': dest_folder_path}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if file_path is None:
|
|
return 'Enter a valid file_path'
|
|
|
|
if dest_folder_path is None:
|
|
return 'Enter a valid dest_folder_path'
|
|
|
|
self._extract_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
|
|
self._extract_taskid_list.append(self._extract_taskid)
|
|
|
|
message = ('You can now check the status of request with '
|
|
'get_extract_status() , your id is: '
|
|
+ self._extract_taskid)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": self._extract_taskid}
|
|
|
|
return output
|
|
|
|
def get_extract_status(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Extract'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'status'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid, choose between ' + str(self._extract_taskid_list)
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_extract_task(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Extract'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid, choose between ' + str(self._extract_taskid_list)
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
self._extract_taskid_list.remove(taskid)
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_file_list_of_archive(self,
|
|
file_path: str,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
codepage: Optional[str] = None,
|
|
password: Optional[str] = None,
|
|
item_id: Optional[str] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Extract'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if file_path is None:
|
|
return 'Enter a valid file_path'
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def start_file_compression(self,
|
|
path: str | list[str],
|
|
dest_file_path: str,
|
|
level: Optional[int] = None,
|
|
mode: Optional[str] = None,
|
|
compress_format: Optional[str] = None,
|
|
password: Optional[str] = None
|
|
) -> dict[str, object] | str | tuple[str]:
|
|
api_name = 'SYNO.FileStation.Compress'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'start'}
|
|
|
|
if type(path) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in path]
|
|
path = new_path
|
|
path = '[' + ','.join(path) + ']'
|
|
req_param['path'] = path
|
|
elif path is not None:
|
|
req_param['path'] = path
|
|
else:
|
|
return 'Enter a valid path'
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'compress_format', '_password', '_api_path',
|
|
'req_param', 'path', 'new_path']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if dest_file_path is None:
|
|
return 'Enter a valid dest_file_path'
|
|
|
|
if compress_format is not None:
|
|
req_param['format'] = compress_format
|
|
|
|
if password is not None:
|
|
req_param['_password'] = password
|
|
|
|
self._compress_taskid = self.request_data(api_name, api_path, req_param)['data']['taskid']
|
|
|
|
message = ('You can now check the status of request with '
|
|
'get_compress_status() , your id is: '
|
|
+ self._compress_taskid)
|
|
if self.interactive_output:
|
|
output = message
|
|
else:
|
|
output = {"message": message, "taskid": self._compress_taskid}
|
|
|
|
return output
|
|
|
|
def get_compress_status(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Compress'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'status'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid'
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def stop_compress_task(self, taskid: str) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.Compress'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'stop'}
|
|
|
|
if taskid is None:
|
|
return 'Enter a valid taskid'
|
|
else:
|
|
req_param['taskid'] = taskid
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_list_of_all_background_task(self,
|
|
offset: Optional[int] = None,
|
|
limit: Optional[int] = None,
|
|
sort_by: Optional[str] = None,
|
|
sort_direction: Optional[str] = None,
|
|
api_filter: Optional[str] = None
|
|
) -> dict[str, object] | str:
|
|
api_name = 'SYNO.FileStation.BackgroundTask'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
req_param = {'version': info['maxVersion'], 'method': 'list'}
|
|
|
|
for key, val in locals().items():
|
|
if key not in ['self', 'api_name', 'info', 'api_path', 'req_param']:
|
|
if val is not None:
|
|
req_param[str(key)] = val
|
|
|
|
if type(api_filter) is list:
|
|
new_path = []
|
|
[new_path.append('"' + x + '"') for x in api_filter]
|
|
api_filter = new_path
|
|
api_filter = '[' + ','.join(api_filter) + ']'
|
|
req_param['api_filter'] = api_filter
|
|
|
|
return self.request_data(api_name, api_path, req_param)
|
|
|
|
def get_file(self,
|
|
path: str,
|
|
mode: str,
|
|
dest_path: str = ".",
|
|
chunk_size: int = 8192,
|
|
verify: bool = False
|
|
) -> Optional[str]:
|
|
|
|
api_name = 'SYNO.FileStation.Download'
|
|
info = self.file_station_list[api_name]
|
|
api_path = info['path']
|
|
|
|
if path is None:
|
|
return 'Enter a valid path'
|
|
|
|
session = requests.session()
|
|
|
|
url = ('%s%s' % (self.base_url, api_path)) + '?api=%s&version=%s&method=download&path=%s&mode=%s&_sid=%s' % (
|
|
api_name, info['maxVersion'], parse.quote_plus(path), mode, self._sid)
|
|
|
|
if mode is None:
|
|
return 'Enter a valid mode (open / download)'
|
|
|
|
if mode == r'open':
|
|
with session.get(url, stream=True, verify=verify, headers={"X-SYNO-TOKEN": self.session._syno_token}) as r:
|
|
r.raise_for_status()
|
|
for chunk in r.iter_content(chunk_size=chunk_size):
|
|
if chunk: # filter out keep-alive new chunks
|
|
sys.stdout.buffer.write(chunk)
|
|
|
|
if mode == r'download':
|
|
with session.get(url, stream=True, verify=verify, headers={"X-SYNO-TOKEN": self.session._syno_token}) as r:
|
|
r.raise_for_status()
|
|
if not os.path.isdir(dest_path):
|
|
os.makedirs(dest_path)
|
|
with open(dest_path + "/" + os.path.basename(path), 'wb') as f:
|
|
for chunk in r.iter_content(chunk_size=chunk_size):
|
|
if chunk: # filter out keep-alive new chunks
|
|
f.write(chunk)
|
|
|
|
if mode == r'serve':
|
|
with session.get(url, stream=True, verify=verify, headers={"X-SYNO-TOKEN": self.session._syno_token}) as r:
|
|
r.raise_for_status()
|
|
return io.BytesIO(r.content)
|
|
|
|
# TODO SYNO.FileStation.Thumb to be done
|