diff --git a/fHDHR/device/tuners/stream/__init__.py b/fHDHR/device/tuners/stream/__init__.py index b7a01f7..e768011 100644 --- a/fHDHR/device/tuners/stream/__init__.py +++ b/fHDHR/device/tuners/stream/__init__.py @@ -1,6 +1,7 @@ from .direct_stream import Direct_Stream +from .direct_m3u8_stream import Direct_M3U8_Stream from .ffmpeg_stream import FFMPEG_Stream from .vlc_stream import VLC_Stream @@ -15,8 +16,12 @@ class Stream(): self.method = FFMPEG_Stream(fhdhr, stream_args, tuner) if stream_args["method"] == "vlc": self.method = VLC_Stream(fhdhr, stream_args, tuner) - elif stream_args["method"] == "direct": + elif (stream_args["method"] == "direct" and + not self.stream_args["true_content_type"].startswith(tuple(["application/", "text/"]))): self.method = Direct_Stream(fhdhr, stream_args, tuner) + elif (stream_args["method"] == "direct" and + self.stream_args["true_content_type"].startswith(tuple(["application/", "text/"]))): + self.method = Direct_M3U8_Stream(fhdhr, stream_args, tuner) def get(self): return self.method.get() diff --git a/fHDHR/device/tuners/stream/direct_m3u8_stream.py b/fHDHR/device/tuners/stream/direct_m3u8_stream.py new file mode 100644 index 0000000..aace1f1 --- /dev/null +++ b/fHDHR/device/tuners/stream/direct_m3u8_stream.py @@ -0,0 +1,102 @@ +import sys +import time +import m3u8 + +from Crypto.Cipher import AES + +# from fHDHR.exceptions import TunerError + + +class Direct_M3U8_Stream(): + + def __init__(self, fhdhr, stream_args, tuner): + self.fhdhr = fhdhr + self.stream_args = stream_args + self.tuner = tuner + + self.chunksize = int(self.fhdhr.config.dict["direct_stream"]['chunksize']) + + def get(self): + + if not self.stream_args["duration"] == 0: + self.stream_args["time_end"] = self.stream_args["duration"] + time.time() + + self.fhdhr.logger.info("Detected stream URL is m3u8: %s" % self.stream_args["true_content_type"]) + + channelUri = self.stream_args["channelUri"] + while True: + + self.fhdhr.logger.info("Opening m3u8 for reading %s" % channelUri) + videoUrlM3u = m3u8.load(channelUri) + if len(videoUrlM3u.playlists): + self.fhdhr.logger.info("%s m3u8 varients found" % len(videoUrlM3u.playlists)) + channelUri = videoUrlM3u.playlists[0].absolute_uri + else: + break + + def generate(): + + try: + + played_chunk_urls = [] + + while self.tuner.tuner_lock.locked(): + + playlist = m3u8.load(channelUri) + segments = playlist.segments + + if len(played_chunk_urls): + newsegments = 0 + for segment in segments: + if segment.absolute_uri not in played_chunk_urls: + newsegments += 1 + self.fhdhr.logger.info("Refreshing m3u8, Loaded %s new segments." % str(newsegments)) + else: + self.fhdhr.logger.info("Loaded %s segments." % str(len(segments))) + + if playlist.keys != [None]: + keys = [{"url": key.absolute_uri, "method": key.method, "iv": key.iv} for key in playlist.keys if key] + else: + keys = [None for i in range(0, len(segments))] + + for segment, key in zip(segments, keys): + chunkurl = segment.absolute_uri + + if chunkurl and chunkurl not in played_chunk_urls: + played_chunk_urls.append(chunkurl) + + if (not self.stream_args["duration"] == 0 and + not time.time() < self.stream_args["time_end"]): + self.fhdhr.logger.info("Requested Duration Expired.") + self.tuner.close() + + chunk = self.fhdhr.web.session.get(chunkurl).content + if not chunk: + break + # raise TunerError("807 - No Video Data") + if key: + if key["url"]: + keyfile = self.fhdhr.web.session.get(key["url"]).content + cryptor = AES.new(keyfile, AES.MODE_CBC, keyfile) + self.fhdhr.logger.info("Decrypting Chunk #%s with key: %s" % (len(played_chunk_urls), key["url"])) + chunk = cryptor.decrypt(chunk) + + chunk_size = int(sys.getsizeof(chunk)) + self.fhdhr.logger.info("Passing Through Chunk #%s with size %s: %s" % (len(played_chunk_urls), chunk_size, chunkurl)) + yield chunk + self.tuner.add_downloaded_size(chunk_size) + + if playlist.target_duration: + time.sleep(int(playlist.target_duration)) + + self.fhdhr.logger.info("Connection Closed: Tuner Lock Removed") + + except GeneratorExit: + self.fhdhr.logger.info("Connection Closed.") + except Exception as e: + self.fhdhr.logger.info("Connection Closed: " + str(e)) + finally: + self.tuner.close() + # raise TunerError("806 - Tune Failed") + + return generate() diff --git a/fHDHR/device/tuners/stream/direct_stream.py b/fHDHR/device/tuners/stream/direct_stream.py index 286b4aa..ea8c22b 100644 --- a/fHDHR/device/tuners/stream/direct_stream.py +++ b/fHDHR/device/tuners/stream/direct_stream.py @@ -1,8 +1,5 @@ import sys import time -import m3u8 - -from Crypto.Cipher import AES # from fHDHR.exceptions import TunerError @@ -21,125 +18,46 @@ class Direct_Stream(): if not self.stream_args["duration"] == 0: self.stream_args["time_end"] = self.stream_args["duration"] + time.time() - if not self.stream_args["true_content_type"].startswith(tuple(["application/", "text/"])): + self.fhdhr.logger.info("Direct Stream of %s URL: %s" % (self.stream_args["true_content_type"], self.stream_args["channelUri"])) - self.fhdhr.logger.info("Direct Stream of %s URL: %s" % (self.stream_args["true_content_type"], self.stream_args["channelUri"])) + req = self.fhdhr.web.session.get(self.stream_args["channelUri"], stream=True) - req = self.fhdhr.web.session.get(self.stream_args["channelUri"], stream=True) + def generate(): - def generate(): + try: - try: + chunk_counter = 1 - chunk_counter = 1 + while self.tuner.tuner_lock.locked(): - while self.tuner.tuner_lock.locked(): + for chunk in req.iter_content(chunk_size=self.chunksize): - for chunk in req.iter_content(chunk_size=self.chunksize): + if (not self.stream_args["duration"] == 0 and + not time.time() < self.stream_args["time_end"]): + req.close() + self.fhdhr.logger.info("Requested Duration Expired.") + self.tuner.close() - if (not self.stream_args["duration"] == 0 and - not time.time() < self.stream_args["time_end"]): - req.close() - self.fhdhr.logger.info("Requested Duration Expired.") - self.tuner.close() + if not chunk: + break + # raise TunerError("807 - No Video Data") - if not chunk: - break - # raise TunerError("807 - No Video Data") + chunk_size = int(sys.getsizeof(chunk)) + self.fhdhr.logger.info("Passing Through Chunk #%s with size %s" % (chunk_counter, chunk_size)) + yield chunk + self.tuner.add_downloaded_size(chunk_size) - self.fhdhr.logger.info("Passing Through Chunk #%s with size %s" % (chunk_counter, self.chunksize)) - yield chunk - chunk_size = int(sys.getsizeof(chunk)) - self.tuner.add_downloaded_size(chunk_size) + chunk_counter += 1 - chunk_counter += 1 + self.fhdhr.logger.info("Connection Closed: Tuner Lock Removed") - self.fhdhr.logger.info("Connection Closed: Tuner Lock Removed") - - except GeneratorExit: - self.fhdhr.logger.info("Connection Closed.") - except Exception as e: - self.fhdhr.logger.info("Connection Closed: " + str(e)) - finally: - req.close() - self.tuner.close() - # raise TunerError("806 - Tune Failed") - - else: - - self.fhdhr.logger.info("Detected stream URL is m3u8: %s" % self.stream_args["true_content_type"]) - - channelUri = self.stream_args["channelUri"] - while True: - - videoUrlM3u = m3u8.load(channelUri) - if len(videoUrlM3u.playlists): - channelUri = videoUrlM3u.playlists[0].absolute_uri - else: - break - - def generate(): - - try: - - played_chunk_urls = [] - - while self.tuner.tuner_lock.locked(): - - playlist = m3u8.load(channelUri) - segments = playlist.segments - - if len(played_chunk_urls): - newsegments = 0 - for segment in segments: - if segment.absolute_uri not in played_chunk_urls: - newsegments += 1 - self.fhdhr.logger.info("Refreshing m3u8, Loaded %s new segments." % str(newsegments)) - else: - self.fhdhr.logger.info("Loaded %s segments." % str(len(segments))) - - if playlist.keys != [None]: - keys = [{"url": key.absolute_uri, "method": key.method, "iv": key.iv} for key in playlist.keys if key] - else: - keys = [None for i in range(0, len(segments))] - - for segment, key in zip(segments, keys): - chunkurl = segment.absolute_uri - - if chunkurl and chunkurl not in played_chunk_urls: - played_chunk_urls.append(chunkurl) - - if (not self.stream_args["duration"] == 0 and - not time.time() < self.stream_args["time_end"]): - self.fhdhr.logger.info("Requested Duration Expired.") - self.tuner.close() - - chunk = self.fhdhr.web.session.get(chunkurl).content - if not chunk: - break - # raise TunerError("807 - No Video Data") - if key: - if key["url"]: - keyfile = self.fhdhr.web.session.get(key["url"]).content - cryptor = AES.new(keyfile, AES.MODE_CBC, keyfile) - chunk = cryptor.decrypt(chunk) - - self.fhdhr.logger.info("Passing Through Chunk: %s" % chunkurl) - yield chunk - chunk_size = int(sys.getsizeof(chunk)) - self.tuner.add_downloaded_size(chunk_size) - - if playlist.target_duration: - time.sleep(int(playlist.target_duration)) - - self.fhdhr.logger.info("Connection Closed: Tuner Lock Removed") - - except GeneratorExit: - self.fhdhr.logger.info("Connection Closed.") - except Exception as e: - self.fhdhr.logger.info("Connection Closed: " + str(e)) - finally: - self.tuner.close() - # raise TunerError("806 - Tune Failed") + except GeneratorExit: + self.fhdhr.logger.info("Connection Closed.") + except Exception as e: + self.fhdhr.logger.info("Connection Closed: " + str(e)) + finally: + req.close() + self.tuner.close() + # raise TunerError("806 - Tune Failed") return generate() diff --git a/fHDHR/device/tuners/tuner.py b/fHDHR/device/tuners/tuner.py index 37f6409..a431012 100644 --- a/fHDHR/device/tuners/tuner.py +++ b/fHDHR/device/tuners/tuner.py @@ -53,12 +53,19 @@ class Tuner(): return stream.get() def set_status(self, stream_args): - self.status = { - "status": "Active", - "method": stream_args["method"], - "accessed": stream_args["accessed"], - "channel": stream_args["channel"], - "proxied_url": stream_args["channelUri"], - "time_start": datetime.datetime.utcnow(), - "downloaded": 0 - } + if self.status["status"] != "Active": + self.status = { + "status": "Active", + "clients": [], + "clients_id": [], + "method": stream_args["method"], + "accessed": [stream_args["accessed"]], + "channel": stream_args["channel"], + "proxied_url": stream_args["channelUri"], + "time_start": datetime.datetime.utcnow(), + "downloaded": 0 + } + if stream_args["client"] not in self.status["clients"]: + self.status["clients"].append(stream_args["client"]) + if stream_args["client_id"] not in self.status["clients_id"]: + self.status["clients_id"].append(stream_args["client_id"]) diff --git a/fHDHR/http/api/watch.py b/fHDHR/http/api/watch.py index 56825c3..a26ef68 100644 --- a/fHDHR/http/api/watch.py +++ b/fHDHR/http/api/watch.py @@ -1,5 +1,6 @@ from flask import Response, request, redirect, abort, stream_with_context import urllib.parse +import uuid from fHDHR.exceptions import TunerError @@ -63,7 +64,8 @@ class Watch(): "duration": duration, "transcode": transcode, "accessed": accessed_url, - "client": client_address + "client": client_address, + "client_id": str(client_address) + "_" + str(uuid.uuid4()) } try: