190 lines
6.3 KiB
Python
190 lines
6.3 KiB
Python
import asyncio
|
|
import logging
|
|
import secrets
|
|
import time
|
|
from contextlib import suppress
|
|
from urllib.parse import urljoin, urlparse
|
|
|
|
import jwt
|
|
from aiohttp import ClientError, ClientSession, ClientTimeout, hdrs, web
|
|
from homeassistant.components.http import HomeAssistantView
|
|
from homeassistant.components.media_player import async_process_play_media_url
|
|
from homeassistant.core import HomeAssistant
|
|
from homeassistant.helpers import network
|
|
from homeassistant.helpers.aiohttp_client import async_get_clientsession
|
|
|
|
_LOGGER = logging.getLogger(__name__)
|
|
|
|
MIME_TYPES = {
|
|
"flac": "audio/x-flac",
|
|
"aac": "audio/aac",
|
|
"he.aac": "audio/aac",
|
|
"mp3": "audio/mpeg",
|
|
# https://www.rfc-editor.org/rfc/rfc4337.txt
|
|
"flac.mp4": "audio/mp4",
|
|
"aac.mp4": "audio/mp4",
|
|
"he.aac.mp4": "audio/mp4",
|
|
# application/vnd.apple.mpegurl
|
|
"m3u8": "application/x-mpegURL",
|
|
"ts": "video/MP2T",
|
|
"gif": "image/gif",
|
|
"mp4": "video/mp4",
|
|
}
|
|
|
|
|
|
def get_ext(url: str) -> str:
|
|
return urlparse(url).path.split(".")[-1]
|
|
|
|
|
|
def get_url(url: str, ext: str = None, expires: int = 3600) -> str:
|
|
assert StreamView.hass_url
|
|
assert url.startswith(("http://", "https://", "/")), url
|
|
|
|
ext = ext.replace("-", ".") if ext else get_ext(url)
|
|
assert ext in MIME_TYPES, ext
|
|
|
|
# using token for security reason
|
|
payload: dict[str, str | int] = {"url": url}
|
|
if expires:
|
|
payload["exp"] = int(time.time()) + expires
|
|
token = jwt.encode(payload, StreamView.key, "HS256")
|
|
return f"{StreamView.hass_url}/api/yandex_station/{token}.{ext}"
|
|
|
|
|
|
async def get_hls(session: ClientSession, url: str) -> str:
|
|
async with session.get(url) as r:
|
|
lines = (await r.text()).splitlines()
|
|
for i, item in enumerate(lines):
|
|
item = item.strip()
|
|
if not item or item.startswith("#"):
|
|
continue
|
|
# should use r.url, not url, because redirects
|
|
item = urljoin(str(r.url), item)
|
|
lines[i] = get_url(item)
|
|
return "\n".join(lines)
|
|
|
|
|
|
def copy_headers(headers: dict, names: tuple) -> dict:
|
|
return {k: v for k in names if (v := headers.get(k))}
|
|
|
|
|
|
CONTENT_TYPES = {
|
|
"audio/aac": "aac",
|
|
"audio/mpeg": "mp3",
|
|
"audio/x-flac": "flac",
|
|
"application/vnd.apple.mpegurl": "m3u8",
|
|
"application/x-mpegURL": "m3u8",
|
|
}
|
|
|
|
REQUEST_HEADERS = (hdrs.RANGE,)
|
|
RESPONSE_HEADERS = (hdrs.ACCEPT_RANGES, hdrs.CONTENT_LENGTH, hdrs.CONTENT_RANGE)
|
|
STREAM_TIMEOUT = ClientTimeout(sock_connect=10, sock_read=10)
|
|
|
|
|
|
async def get_content_type(session: ClientSession, url: str) -> str | None:
|
|
try:
|
|
async with session.head(url) as r:
|
|
if r.content_type.startswith("text/html"):
|
|
# fix Icecast bug - return text/html on HEAD
|
|
# https://github.com/AlexxIT/YandexStation/issues/696
|
|
async with session.get(url) as r2:
|
|
return CONTENT_TYPES.get(r2.content_type)
|
|
return CONTENT_TYPES.get(r.content_type)
|
|
except Exception as e:
|
|
_LOGGER.debug(f"Can't get content type: {repr(e)}")
|
|
return None
|
|
|
|
|
|
class StreamView(HomeAssistantView):
|
|
requires_auth = False
|
|
|
|
url = "/api/yandex_station/{token:[\\w-]+.[\\w-]+.[\\w-]+}.{ext}"
|
|
name = "api:yandex_station"
|
|
|
|
hass: HomeAssistant = None
|
|
hass_url: str = None
|
|
key: str = None
|
|
|
|
def __init__(self, hass: HomeAssistant):
|
|
self.session = async_get_clientsession(hass)
|
|
|
|
StreamView.hass = hass
|
|
StreamView.key = secrets.token_hex()
|
|
|
|
try:
|
|
StreamView.hass_url = network.get_url(hass, allow_external=False)
|
|
_LOGGER.debug(f"Локальный адрес Home Assistant: {StreamView.hass_url}")
|
|
except Exception as e:
|
|
_LOGGER.warning(f"Ошибка получения локального адреса Home Assistant: {e}")
|
|
|
|
def get_url(self, url: str) -> str:
|
|
if url[0] != "/":
|
|
return url
|
|
return async_process_play_media_url(self.hass, url)
|
|
|
|
async def head(self, request: web.Request, token: str, ext: str):
|
|
try:
|
|
data = jwt.decode(token, StreamView.key, "HS256")
|
|
except jwt.InvalidTokenError:
|
|
return web.HTTPNotFound()
|
|
|
|
_LOGGER.debug(f"Stream.{ext} HEAD {data}")
|
|
|
|
url = self.get_url(data["url"])
|
|
|
|
headers = copy_headers(request.headers, REQUEST_HEADERS)
|
|
async with self.session.head(url, headers=headers) as r:
|
|
headers = copy_headers(r.headers, RESPONSE_HEADERS)
|
|
headers[hdrs.CONTENT_TYPE] = MIME_TYPES[ext]
|
|
return web.Response(status=r.status, headers=headers)
|
|
|
|
async def get(self, request: web.Request, token: str, ext: str):
|
|
try:
|
|
data = jwt.decode(token, StreamView.key, "HS256")
|
|
except jwt.InvalidTokenError:
|
|
return web.HTTPNotFound()
|
|
|
|
_LOGGER.debug(f"Stream.{ext} GET {data}")
|
|
|
|
url = self.get_url(data["url"])
|
|
|
|
try:
|
|
if ext == "m3u8":
|
|
body = await get_hls(self.session, url)
|
|
return web.Response(
|
|
body=body,
|
|
headers={
|
|
hdrs.ACCESS_CONTROL_ALLOW_HEADERS: "*",
|
|
hdrs.ACCESS_CONTROL_ALLOW_ORIGIN: "*",
|
|
hdrs.CONTENT_TYPE: MIME_TYPES[ext],
|
|
},
|
|
)
|
|
|
|
headers = copy_headers(request.headers, REQUEST_HEADERS)
|
|
async with self.session.get(
|
|
url, headers=headers, timeout=STREAM_TIMEOUT
|
|
) as r:
|
|
headers = copy_headers(r.headers, RESPONSE_HEADERS)
|
|
headers[hdrs.CONTENT_TYPE] = MIME_TYPES[ext]
|
|
|
|
if ext == "ts":
|
|
headers[hdrs.ACCESS_CONTROL_ALLOW_HEADERS] = "*"
|
|
headers[hdrs.ACCESS_CONTROL_ALLOW_ORIGIN] = "*"
|
|
|
|
response = web.StreamResponse(status=r.status, headers=headers)
|
|
response.force_close()
|
|
|
|
await response.prepare(request)
|
|
|
|
try:
|
|
while data := await r.content.readany():
|
|
await response.write(data)
|
|
except ClientError as e:
|
|
_LOGGER.debug(f"Streaming client error: {repr(e)}")
|
|
except TimeoutError as e:
|
|
_LOGGER.debug(f"Streaming timeout: {repr(e)}")
|
|
|
|
return response
|
|
except:
|
|
pass
|