1
0
mirror of https://github.com/fHDHR/fHDHR_NextPVR.git synced 2025-12-06 08:06:59 -05:00

Add tvtv EPG method

This commit is contained in:
deathbybandaid 2020-12-09 16:00:14 -05:00
parent 5ff0cdf93c
commit 6786f10812
3 changed files with 167 additions and 0 deletions

View File

@ -1,2 +1,3 @@
# pylama:ignore=W0401,W0611
from .zap2it import *
from .tvtv import *

152
alternative_epg/tvtv.py Normal file
View File

@ -0,0 +1,152 @@
import datetime
from fHDHR.exceptions import EPGSetupError
class tvtvEPG():
def __init__(self, fhdhr, channels):
self.fhdhr = fhdhr
self.channels = channels
@property
def postalcode(self):
if self.fhdhr.config.dict["tvtv"]["postalcode"]:
return self.fhdhr.config.dict["tvtv"]["postalcode"]
try:
postalcode_url = 'http://ipinfo.io/json'
postalcode_req = self.fhdhr.web.session.get(postalcode_url)
data = postalcode_req.json()
postalcode = data["postal"]
except Exception as e:
raise EPGSetupError("Unable to automatically optain postalcode: " + str(e))
postalcode = None
return postalcode
@property
def lineup_id(self):
lineup_id_url = "https://www.tvtv.us/tvm/t/tv/v4/lineups?postalCode=%s" % self.postalcode
if self.fhdhr.config.dict["tvtv"]["lineuptype"]:
lineup_id_url += "&lineupType=%s" % self.fhdhr.config.dict["tvtv"]["lineuptype"]
lineup_id_req = self.fhdhr.web.session.get(lineup_id_url)
data = lineup_id_req.json()
lineup_id = data[0]["lineupID"]
return lineup_id
def update_epg(self):
programguide = {}
# Make a date range to pull
todaydate = datetime.date.today()
dates_to_pull = []
for x in range(-1, 6):
datesdict = {
"start": todaydate + datetime.timedelta(days=x),
"stop": todaydate + datetime.timedelta(days=x+1)
}
dates_to_pull.append(datesdict)
self.remove_stale_cache(todaydate)
cached_items = self.get_cached(dates_to_pull)
for result in cached_items:
for chan_item in result:
channel_number = "%s.%s" % (chan_item["channel"]['channelNumber'], chan_item["channel"]['subChannelNumber'])
if str(channel_number) not in list(programguide.keys()):
programguide[channel_number] = {
"callsign": chan_item["channel"]["callsign"],
"name": chan_item["channel"]["name"],
"number": channel_number,
"id": str(chan_item["channel"]["stationID"]),
"thumbnail": "https://cdn.tvpassport.com/image/station/100x100/%s" % chan_item["channel"]["logoFilename"],
"listing": [],
}
for listing in chan_item["listings"]:
timestamp = self.tvtv_timestamps(listing["listDateTime"], listing["duration"])
clean_prog_dict = {
"time_start": timestamp['time_start'],
"time_end": timestamp['time_end'],
"duration_minutes": listing["duration"],
"thumbnail": "https://cdn.tvpassport.com/image/show/480x720/%s" % listing["artwork"]["poster"],
"title": listing["showName"],
"sub-title": listing["episodeTitle"],
"description": listing["description"],
"rating": listing["rating"],
"episodetitle": listing["episodeTitle"],
"releaseyear": listing["year"],
"genres": [],
"seasonnumber": None,
"episodenumber": None,
"isnew": listing["new"],
"id": listing["listingID"],
}
if not any(d['id'] == clean_prog_dict['id'] for d in programguide[channel_number]["listing"]):
programguide[channel_number]["listing"].append(clean_prog_dict)
return programguide
def tvtv_timestamps(self, starttime, duration):
start_time = datetime.datetime.strptime(starttime, '%Y-%m-%d %H:%M:%S')
end_time = start_time + datetime.timedelta(minutes=duration)
start_time = start_time.strftime('%Y%m%d%H%M%S +0000')
end_time = end_time.strftime('%Y%m%d%H%M%S +0000')
timestamp = {
"time_start": start_time,
"time_end": end_time
}
return timestamp
def get_cached(self, dates_to_pull):
for datesdict in dates_to_pull:
starttime = str(datesdict["start"]) + "T00%3A00%3A00.000Z"
stoptime = str(datesdict["stop"]) + "T00%3A00%3A00.000Z"
url = "https://www.tvtv.us/tvm/t/tv/v4/lineups/%s/listings/grid?start=%s&end=%s" % (self.lineup_id, starttime, stoptime)
self.get_cached_item(str(datesdict["start"]), url)
cache_list = self.fhdhr.db.get_cacheitem_value("cache_list", "offline_cache", "tvtv") or []
return [self.fhdhr.db.get_cacheitem_value(x, "offline_cache", "tvtv") for x in cache_list]
def get_cached_item(self, cache_key, url):
cacheitem = self.fhdhr.db.get_cacheitem_value(cache_key, "offline_cache", "tvtv")
if cacheitem:
self.fhdhr.logger.info('FROM CACHE: ' + str(cache_key))
return cacheitem
else:
self.fhdhr.logger.info('Fetching: ' + url)
try:
resp = self.fhdhr.web.session.get(url)
except self.fhdhr.web.exceptions.HTTPError:
self.fhdhr.logger.info('Got an error! Ignoring it.')
return
result = resp.json()
self.fhdhr.db.set_cacheitem_value(cache_key, "offline_cache", result, "tvtv")
cache_list = self.fhdhr.db.get_cacheitem_value("cache_list", "offline_cache", "tvtv") or []
cache_list.append(cache_key)
self.fhdhr.db.set_cacheitem_value("cache_list", "offline_cache", cache_list, "tvtv")
def remove_stale_cache(self, todaydate):
cache_list = self.fhdhr.db.get_cacheitem_value("cache_list", "offline_cache", "tvtv") or []
cache_to_kill = []
for cacheitem in cache_list:
cachedate = datetime.datetime.strptime(str(cacheitem), "%Y-%m-%d")
todaysdate = datetime.datetime.strptime(str(todaydate), "%Y-%m-%d")
if cachedate < todaysdate:
cache_to_kill.append(cacheitem)
self.fhdhr.db.delete_cacheitem_value(cacheitem, "offline_cache", "tvtv")
self.fhdhr.logger.info('Removing stale cache: ' + str(cacheitem))
self.fhdhr.db.set_cacheitem_value("cache_list", "offline_cache", [x for x in cache_list if x not in cache_to_kill], "tvtv")
def clear_cache(self):
cache_list = self.fhdhr.db.get_cacheitem_value("cache_list", "offline_cache", "tvtv") or []
for cacheitem in cache_list:
self.fhdhr.db.delete_cacheitem_value(cacheitem, "offline_cache", "tvtv")
self.fhdhr.logger.info('Removing cache: ' + str(cacheitem))
self.fhdhr.db.delete_cacheitem_value("cache_list", "offline_cache", "tvtv")

View File

@ -0,0 +1,14 @@
{
"tvtv":{
"postalcode":{
"value": "none",
"config_file": true,
"config_web": false
},
"lineuptype":{
"value": "none",
"config_file": true,
"config_web": false
}
}
}