diff --git a/fHDHR/api/__init__.py b/fHDHR/api/__init__.py index bdec8ec..cfcdb37 100644 --- a/fHDHR/api/__init__.py +++ b/fHDHR/api/__init__.py @@ -1,5 +1,6 @@ from gevent.pywsgi import WSGIServer -from flask import Flask, send_from_directory, request, abort, Response, stream_with_context +from flask import (Flask, send_from_directory, request, + abort, Response, stream_with_context, redirect) from . import hub @@ -24,6 +25,32 @@ class HDHR_HTTP_Server(): base_url = request.headers["host"] return fhdhrhub.get_origin_html(base_url) + @app.route('/cluster') + def cluster_html(): + method = request.args.get('method', default=None, type=str) + + if method == "scan": + fhdhrhub.m_search() + + elif method == 'add': + fhdhrhub.cluster_add(request.args.get("location", default=None, type=str)) + elif method == 'del': + fhdhrhub.cluster_del(request.args.get("location", default=None, type=str)) + + elif method == 'sync': + fhdhrhub.cluster_sync(request.args.get("location", default=None, type=str)) + + elif method == 'leave': + fhdhrhub.cluster_leave() + elif method == 'disconnect': + fhdhrhub.cluster_disconnect() + + if method: + return redirect('/cluster') + + base_url = request.headers["host"] + return fhdhrhub.get_cluster_html(base_url) + @app.route('/style.css', methods=['GET']) def style_css(): return send_from_directory(fhdhrhub.config.dict["filedir"]["www_dir"], 'style.css') @@ -73,6 +100,14 @@ class HDHR_HTTP_Server(): response=station_list, mimetype='application/json') + @app.route('/cluster.json', methods=['GET']) + def cluster_json(): + base_url = request.headers["host"] + cluster_list = fhdhrhub.get_cluster_json(base_url) + return Response(status=200, + response=cluster_list, + mimetype='application/json') + @app.route('/xmltv.xml', methods=['GET']) def xmltv_xml(): base_url = request.headers["host"] diff --git a/fHDHR/api/hub/__init__.py b/fHDHR/api/hub/__init__.py index 0b25790..21ea240 100644 --- a/fHDHR/api/hub/__init__.py +++ b/fHDHR/api/hub/__init__.py @@ -38,6 +38,9 @@ class fHDHR_Hub(): def get_debug_json(self, base_url): return self.files.debug.get_debug_json(base_url) + def get_cluster_json(self, base_url): + return self.files.cluster.get_cluster_json(base_url) + def get_html_error(self, message): return self.pages.htmlerror.get_html_error(message) @@ -73,3 +76,24 @@ class fHDHR_Hub(): def get_origin_html(self, base_url): return self.pages.origin.get_origin_html(base_url) + + def get_cluster_html(self, base_url): + return self.pages.cluster.get_cluster_html(base_url) + + def m_search(self): + self.device.ssdp.m_search() + + def cluster_add(self, location): + self.device.cluster.add(location) + + def cluster_del(self, location): + self.device.cluster.remove(location) + + def cluster_sync(self, location): + self.device.cluster.sync(location) + + def cluster_leave(self): + self.device.cluster.leave() + + def cluster_disconnect(self): + self.device.cluster.disconnect() diff --git a/fHDHR/api/hub/device/__init__.py b/fHDHR/api/hub/device/__init__.py index a01c8dd..c4a082f 100644 --- a/fHDHR/api/hub/device/__init__.py +++ b/fHDHR/api/hub/device/__init__.py @@ -3,6 +3,8 @@ from .tuners import Tuners from .watch import WatchStream from .images import imageHandler from .station_scan import Station_Scan +from .ssdp import SSDPServer +from .cluster import fHDHR_Cluster class fHDHR_Device(): @@ -21,3 +23,7 @@ class fHDHR_Device(): self.images = imageHandler(settings, self.epg) self.station_scan = Station_Scan(settings, self.channels) + + self.ssdp = SSDPServer(settings) + + self.cluster = fHDHR_Cluster(settings, self.ssdp) diff --git a/fHDHR/api/hub/device/cluster.py b/fHDHR/api/hub/device/cluster.py new file mode 100644 index 0000000..4cc3a54 --- /dev/null +++ b/fHDHR/api/hub/device/cluster.py @@ -0,0 +1,142 @@ +import os +import json +import urllib.parse +import requests +from collections import OrderedDict + + +class fHDHR_Cluster(): + + def __init__(self, settings, ssdp): + self.config = settings + self.ssdp = ssdp + self.cluster_file = self.config.dict["main"]["cluster"] + self.friendlyname = self.config.dict["fhdhr"]["friendlyname"] + self.location = ('http://' + settings.dict["fhdhr"]["discovery_address"] + ':' + + str(settings.dict["fhdhr"]["port"])) + self.location_url = urllib.parse.quote(self.location) + self.cluster = self.default_cluster() + self.load_cluster() + self.startup_sync() + + def get_list(self): + return_dict = {} + for location in list(self.cluster.keys()): + if location != self.location: + return_dict[location] = { + "Joined": True + } + + detected_list = self.ssdp.detect_method.get() + for location in detected_list: + if location not in list(self.cluster.keys()): + return_dict[location] = { + "Joined": False + } + return_dict = OrderedDict(sorted(return_dict.items())) + return return_dict + + def default_cluster(self): + defdict = {} + defdict[self.location] = { + "base_url": self.location, + "name": self.friendlyname + } + return defdict + + def load_cluster(self): + if os.path.isfile(self.cluster_file): + with open(self.cluster_file, 'r') as clusterfile: + self.cluster = json.load(clusterfile) + if self.location not in list(self.cluster.keys()): + self.cluster[self.location] = self.default_cluster()[self.location] + else: + self.cluster = self.default_cluster() + + def startup_sync(self): + for location in list(self.cluster.keys()): + if location != self.location: + sync_url = location + "/cluster.json" + try: + sync_open = requests.get(sync_url) + retrieved_cluster = sync_open.json() + if self.location not in list(retrieved_cluster.keys()): + return self.leave() + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + + def save_cluster(self): + with open(self.cluster_file, 'w') as clusterfile: + clusterfile.write(json.dumps(self.cluster, indent=4)) + + def leave(self): + self.cluster = self.default_cluster() + self.save_cluster() + + def disconnect(self): + for location in list(self.cluster.keys()): + if location != self.location: + sync_url = location + "/cluster?method=del&location=" + self.location + try: + requests.get(sync_url) + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + self.leave() + + def sync(self, location): + sync_url = location + "/cluster.json" + try: + sync_open = requests.get(sync_url) + self.cluster = sync_open.json() + self.save_cluster() + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + + def push_sync(self): + for location in list(self.cluster.keys()): + if location != self.location: + sync_url = location + "/cluster?method=sync&location=" + self.location_url + try: + requests.get(sync_url) + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + + def add(self, location): + if location not in list(self.cluster.keys()): + self.cluster[location] = {"base_url": location} + + location_info_url = location + "/discover.json" + try: + location_info_req = requests.get(location_info_url) + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + del self.cluster[location] + return + location_info = location_info_req.json() + self.cluster[location]["name"] = location_info["FriendlyName"] + + cluster_info_url = location + "/cluster.json" + try: + cluster_info_req = requests.get(cluster_info_url) + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + del self.cluster[location] + return + cluster_info = cluster_info_req.json() + for cluster_key in list(cluster_info.keys()): + if cluster_key not in list(self.cluster.keys()): + self.cluster[cluster_key] = cluster_info[cluster_key] + + self.push_sync() + self.save_cluster() + + def remove(self, location): + if location in list(self.cluster.keys()): + del self.cluster[location] + sync_url = location + "/cluster?method=leave" + try: + requests.get(sync_url) + except requests.exceptions.ConnectionError: + print("Unreachable: " + location) + self.push_sync() + self.save_cluster() diff --git a/fHDHR/api/hub/device/ssdp.py b/fHDHR/api/hub/device/ssdp.py new file mode 100644 index 0000000..a3f0cbc --- /dev/null +++ b/fHDHR/api/hub/device/ssdp.py @@ -0,0 +1,207 @@ +# Adapted from https://github.com/MoshiBin/ssdpy and https://github.com/ZeWaren/python-upnp-ssdp-example +import os +import socket +import struct +import json +from multiprocessing import Process + +from fHDHR import fHDHR_VERSION + + +class fHDHR_Detect(): + + def __init__(self, settings): + self.config = settings + self.ssdp_detect_file = self.config.dict["main"]["ssdp_detect"] + self.detect_list = [] + + def set(self, location): + if location not in self.detect_list: + self.detect_list.append(location) + with open(self.ssdp_detect_file, 'w') as ssdpdetectfile: + ssdpdetectfile.write(json.dumps(self.detect_list, indent=4)) + + def get(self): + if os.path.isfile(self.ssdp_detect_file): + with open(self.ssdp_detect_file, 'r') as ssdpdetectfile: + return json.load(ssdpdetectfile) + else: + return [] + + +class SSDPServer(): + + def __init__(self, settings): + self.config = settings + + self.detect_method = fHDHR_Detect(settings) + + if settings.dict["fhdhr"]["discovery_address"]: + + self.sock = None + self.proto = "ipv4" + self.port = 1900 + self.iface = None + self.address = None + self.server = 'fHDHR/%s UPnP/1.0' % fHDHR_VERSION + + allowed_protos = ("ipv4", "ipv6") + if self.proto not in allowed_protos: + raise ValueError("Invalid proto - expected one of {}".format(allowed_protos)) + + self.nt = 'urn:schemas-upnp-org:device:MediaServer:1' + self.usn = 'uuid:' + settings.dict["main"]["uuid"] + '::' + self.nt + self.location = ('http://' + settings.dict["fhdhr"]["discovery_address"] + ':' + + str(settings.dict["fhdhr"]["port"]) + '/device.xml') + self.al = self.location + self.max_age = 1800 + self._iface = None + + if self.proto == "ipv4": + self._af_type = socket.AF_INET + self._broadcast_ip = "239.255.255.250" + self._address = (self._broadcast_ip, self.port) + self.bind_address = "0.0.0.0" + elif self.proto == "ipv6": + self._af_type = socket.AF_INET6 + self._broadcast_ip = "ff02::c" + self._address = (self._broadcast_ip, self.port, 0, 0) + self.bind_address = "::" + + self.broadcast_addy = "{}:{}".format(self._broadcast_ip, self.port) + + self.sock = socket.socket(self._af_type, socket.SOCK_DGRAM, socket.IPPROTO_UDP) + self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) + + # Bind to specific interface + if self.iface is not None: + self.sock.setsockopt(socket.SOL_SOCKET, getattr(socket, "SO_BINDTODEVICE", 25), self.iface) + + # Subscribe to multicast address + if self.proto == "ipv4": + mreq = socket.inet_aton(self._broadcast_ip) + if self.address is not None: + mreq += socket.inet_aton(self.address) + else: + mreq += struct.pack(b"@I", socket.INADDR_ANY) + self.sock.setsockopt( + socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq, + ) + # Allow multicasts on loopback devices (necessary for testing) + self.sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_LOOP, 1) + elif self.proto == "ipv6": + # In IPv6 we use the interface index, not the address when subscribing to the group + mreq = socket.inet_pton(socket.AF_INET6, self._broadcast_ip) + if self.iface is not None: + iface_index = socket.if_nametoindex(self.iface) + # Send outgoing packets from the same interface + self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_IF, iface_index) + mreq += struct.pack(b"@I", iface_index) + else: + mreq += socket.inet_pton(socket.AF_INET6, "::") + self.sock.setsockopt( + socket.IPPROTO_IPV6, socket.IPV6_JOIN_GROUP, mreq, + ) + self.sock.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_MULTICAST_LOOP, 1) + self.sock.bind((self.bind_address, self.port)) + + self.notify_payload = self.create_notify_payload() + self.msearch_payload = self.create_msearch_payload() + + print("SSDP server Starting") + + self.ssdpserve = Process(target=self.run) + self.ssdpserve.start() + + self.m_search() + + def on_recv(self, data, address): + # print("Received packet from {}: {}".format(address, data)) + + (host, port) = address + + header, payload = data.decode().split('\r\n\r\n')[:2] + + lines = header.split('\r\n') + cmd = lines[0].split(' ') + lines = map(lambda x: x.replace(': ', ':', 1), lines[1:]) + lines = filter(lambda x: len(x) > 0, lines) + + headers = [x.split(':', 1) for x in lines] + headers = dict(map(lambda x: (x[0].lower(), x[1]), headers)) + + if cmd[0] == 'M-SEARCH' and cmd[1] == '*': + # SSDP discovery + # print("Received qualifying M-SEARCH from {}".format(address)) + # print("M-SEARCH data: {}".format(headers)) + notify = self.notify_payload + # print("Created NOTIFY: {}".format(notify)) + try: + self.sock.sendto(notify, address) + except OSError: # as e: + # Most commonly: We received a multicast from an IP not in our subnet + # print("Unable to send NOTIFY to {}: {}".format(address, e)) + pass + elif cmd[0] == 'NOTIFY' and cmd[1] == '*': + # SSDP presence + # print('NOTIFY *') + # print("NOTIFY data: {}".format(headers)) + if headers["server"].startswith("fHDHR"): + if headers["location"] != self.location: + self.detect_method.set(headers["location"].split("/device.xml")[0]) + # else: + # print('Unknown SSDP command %s %s' % (cmd[0], cmd[1])) + + def m_search(self): + data = self.msearch_payload + self.sock.sendto(data, self._address) + + def create_notify_payload(self): + if self.max_age is not None and not isinstance(self.max_age, int): + raise ValueError("max_age must by of type: int") + data = ( + "NOTIFY * HTTP/1.1\r\n" + "HOST:{}\r\n" + "NT:{}\r\n" + "NTS:ssdp:alive\r\n" + "USN:{}\r\n" + "SERVER:{}\r\n" + ).format( + self._broadcast_ip, + self.nt, + self.usn, + self.server + ) + if self.location is not None: + data += "LOCATION:{}\r\n".format(self.location) + if self.al is not None: + data += "AL:{}\r\n".format(self.al) + if self.max_age is not None: + data += "Cache-Control:max-age={}\r\n".format(self.max_age) + data += "\r\n" + return data.encode("utf-8") + + def create_msearch_payload(self): + data = ( + "M-SEARCH * HTTP/1.1\r\n" + "HOST:{}\r\n" + 'MAN: "ssdp:discover"\r\n' + "ST:{}\r\n" + "MX:{}\r\n" + ).format( + self.broadcast_addy, + "ssdp:all", + 1 + ) + data += "\r\n" + return data.encode("utf-8") + + def run(self): + try: + while True: + data, address = self.sock.recvfrom(1024) + self.on_recv(data, address) + except KeyboardInterrupt: + self.sock.close() + except Exception: + self.sock.close() diff --git a/fHDHR/api/hub/files/__init__.py b/fHDHR/api/hub/files/__init__.py index 1c47fe1..0db0f7a 100644 --- a/fHDHR/api/hub/files/__init__.py +++ b/fHDHR/api/hub/files/__init__.py @@ -7,6 +7,7 @@ from .debug_json import Debug_JSON from .lineup_status_json import Lineup_Status_JSON from .xmltv_xml import xmlTV_XML from .m3u import channels_M3U +from .cluster_json import Cluster_JSON class fHDHR_Files(): @@ -26,3 +27,4 @@ class fHDHR_Files(): self.m3u = channels_M3U(settings, device) self.debug = Debug_JSON(settings, device) + self.cluster = Cluster_JSON(settings, device) diff --git a/fHDHR/api/hub/files/cluster_json.py b/fHDHR/api/hub/files/cluster_json.py new file mode 100644 index 0000000..e97b62a --- /dev/null +++ b/fHDHR/api/hub/files/cluster_json.py @@ -0,0 +1,13 @@ +import json + + +class Cluster_JSON(): + + def __init__(self, settings, device): + self.config = settings + self.device = device + + def get_cluster_json(self, base_url, force_update=False): + jsoncluster = self.device.cluster.cluster + cluster_json = json.dumps(jsoncluster, indent=4) + return cluster_json diff --git a/fHDHR/api/hub/pages/__init__.py b/fHDHR/api/hub/pages/__init__.py index a9c5e94..17161e6 100644 --- a/fHDHR/api/hub/pages/__init__.py +++ b/fHDHR/api/hub/pages/__init__.py @@ -4,35 +4,26 @@ from io import StringIO from .htmlerror import HTMLerror from .index_html import Index_HTML from .origin_html import Origin_HTML +from .cluster_html import Cluster_HTML from .diagnostics_html import Diagnostics_HTML from .version_html import Version_HTML from .channel_guide_html import Channel_Guide_HTML -class fHDHR_Pages(): +class fHDHR_Page_Elements(): def __init__(self, settings, device): self.config = settings self.device = device - self.page_elements = { - "top": self.pagetop(), - "end": self.pageend() - } - - self.htmlerror = HTMLerror(settings) - - self.index = Index_HTML(settings, self.device, self.page_elements) - self.origin = Origin_HTML(settings, self.device, self.page_elements) - self.diagnostics = Diagnostics_HTML(settings, self.device, self.page_elements) - self.version = Version_HTML(settings, self.device, self.page_elements) - self.channel_guide = Channel_Guide_HTML(settings, self.device, self.page_elements) + def get(self): + return {"top": self.pagetop(), "end": self.pageend()} def pagetop(self): friendlyname = self.config.dict["fhdhr"]["friendlyname"] servicename = str(self.config.dict["main"]["servicename"]) - return [ + upper_part = [ "", "", @@ -56,14 +47,32 @@ class fHDHR_Pages(): "" % ("/guide", "Guide"), "" % ("/version", "Version"), "" % ("/diagnostics", "Diagnostics"), + "" % ("/cluster", "Cluster"), "%s" % ("xmltv.xml", "xmltv"), "%s" % ("channels.m3u", "m3u"), - "
", - "" + "", "| Name | \n") + fakefile.write("Location | \n") + fakefile.write("Joined | \n") + fakefile.write("Options | \n") + fakefile.write("
|---|---|---|---|
| %s | \n" % (str(location_name))) + + fakefile.write("%s | \n" % (str(location))) + + fakefile.write("%s | \n" % (str(fhdhr_list[location]["Joined"]))) + + fakefile.write("\n")
+ fakefile.write(" \n")
+ location_url_query = urllib.parse.quote(location)
+ fakefile.write(
+ " \n" %
+ (location, "Visit"))
+ if not fhdhr_list[location]["Joined"]:
+ fakefile.write(
+ " \n" %
+ ("/cluster?method=add&location=" + location_url_query, "Add"))
+ else:
+ fakefile.write(
+ " \n" %
+ ("/cluster?method=del&location=" + location_url_query, "Remove"))
+ fakefile.write(" \n")
+ fakefile.write(" | \n")
+
+ fakefile.write(" %s | \n" % (guts[1])) fakefile.write(" \n") - for line in self.page_elements["end"]: + for line in page_elements["end"]: fakefile.write(line + "\n") return fakefile.getvalue() diff --git a/fHDHR/api/hub/pages/origin_html.py b/fHDHR/api/hub/pages/origin_html.py index d6e8021..904ea96 100644 --- a/fHDHR/api/hub/pages/origin_html.py +++ b/fHDHR/api/hub/pages/origin_html.py @@ -13,8 +13,9 @@ class Origin_HTML(): servicename = str(self.config.dict["main"]["servicename"]) fakefile = StringIO() + page_elements = self.page_elements.get() - for line in self.page_elements["top"]: + for line in page_elements["top"]: fakefile.write(line + "\n") fakefile.write("%s | \n" % (str(origin_status_dict[key]))) fakefile.write(" \n") - for line in self.page_elements["end"]: + for line in page_elements["end"]: fakefile.write(line + "\n") return fakefile.getvalue() diff --git a/fHDHR/api/hub/pages/version_html.py b/fHDHR/api/hub/pages/version_html.py index 1297ee6..f5e2e31 100644 --- a/fHDHR/api/hub/pages/version_html.py +++ b/fHDHR/api/hub/pages/version_html.py @@ -13,8 +13,9 @@ class Version_HTML(): def get_version_html(self, base_url, force_update=False): fakefile = StringIO() + page_elements = self.page_elements.get() - for line in self.page_elements["top"]: + for line in page_elements["top"]: fakefile.write(line + "\n") fakefile.write("
| %s | \n" % (str(fHDHR_VERSION))) fakefile.write(" \n") - for line in self.page_elements["end"]: + for line in page_elements["end"]: fakefile.write(line + "\n") return fakefile.getvalue() diff --git a/fHDHR/cli/run.py b/fHDHR/cli/run.py index 2c89856..4987991 100644 --- a/fHDHR/cli/run.py +++ b/fHDHR/cli/run.py @@ -10,7 +10,6 @@ import fHDHR.config import fHDHR.origin import fHDHR.api -import fHDHR.ssdpserver ERR_CODE = 1 ERR_CODE_NO_RESTART = 2 @@ -37,10 +36,6 @@ def get_configuration(args, script_dir): def run(settings, origin): - if settings.dict["fhdhr"]["discovery_address"]: - ssdpServer = Process(target=fHDHR.ssdpserver.ssdpServerProcess, args=(settings,)) - ssdpServer.start() - fhdhrweb = Process(target=fHDHR.api.interface_start, args=(settings, origin)) fhdhrweb.start() diff --git a/fHDHR/config/__init__.py b/fHDHR/config/__init__.py index bfc25b9..502cdb1 100644 --- a/fHDHR/config/__init__.py +++ b/fHDHR/config/__init__.py @@ -137,6 +137,8 @@ class Config(): cache_dir = self.dict["filedir"]["cache_dir"] self.dict["main"]["channel_numbers"] = pathlib.Path(cache_dir).joinpath("cnumbers.json") + self.dict["main"]["ssdp_detect"] = pathlib.Path(cache_dir).joinpath("ssdp_list.json") + self.dict["main"]["cluster"] = pathlib.Path(cache_dir).joinpath("cluster.json") for epg_method in self.dict["main"]["valid_epg_methods"]: if epg_method and epg_method != "None": diff --git a/fHDHR/ssdpserver/__init__.py b/fHDHR/ssdpserver/__init__.py deleted file mode 100644 index 728767b..0000000 --- a/fHDHR/ssdpserver/__init__.py +++ /dev/null @@ -1,242 +0,0 @@ -# Licensed under the MIT license -# http://opensource.org/licenses/mit-license.php - -# Copyright 2005, Tim Potter