mirror of
https://github.com/fHDHR/fHDHR_NextPVR.git
synced 2025-12-06 18:36:58 -05:00
Merge pull request #17 from deathbybandaid/dev
Cleanup of New Web server
This commit is contained in:
commit
edb5afd50e
@ -1,6 +1,6 @@
|
||||
import time
|
||||
|
||||
from fHDHR.epghandler import epgtypes, xmltv
|
||||
from fHDHR.epghandler import epgtypes
|
||||
|
||||
|
||||
class EPGhandler():
|
||||
@ -13,11 +13,6 @@ class EPGhandler():
|
||||
self.sleeptime = self.config.dict[self.epg_method]["epg_update_frequency"]
|
||||
|
||||
self.epgtypes = epgtypes.EPGTypes(settings, origserv)
|
||||
self.xmltv = xmltv.xmlTV(settings)
|
||||
|
||||
def get_xmltv(self, base_url):
|
||||
epgdict = self.epgtypes.get_epg()
|
||||
return self.xmltv.create_xmltv(base_url, epgdict)
|
||||
|
||||
def get_thumbnail(self, itemtype, itemid):
|
||||
return self.epgtypes.get_thumbnail(itemtype, itemid)
|
||||
|
||||
@ -142,12 +142,11 @@ class ZapEPG():
|
||||
time.sleep(int(delay))
|
||||
return result
|
||||
|
||||
def remove_stale_cache(self, todaydate):
|
||||
def remove_stale_cache(self, zap_time):
|
||||
for p in self.web_cache_dir.glob('*'):
|
||||
try:
|
||||
cachedate = datetime.datetime.strptime(str(p.name), "%Y-%m-%d")
|
||||
todaysdate = datetime.datetime.strptime(str(todaydate), "%Y-%m-%d")
|
||||
if cachedate >= todaysdate:
|
||||
t = int(p.name)
|
||||
if t >= zap_time:
|
||||
continue
|
||||
except Exception as e:
|
||||
print(e)
|
||||
|
||||
@ -1,110 +0,0 @@
|
||||
import xml.etree.ElementTree
|
||||
from io import BytesIO
|
||||
|
||||
|
||||
class xmlTV():
|
||||
"""Methods to create xmltv.xml"""
|
||||
|
||||
def __init__(self, settings):
|
||||
self.config = settings
|
||||
self.epg_method = self.config.dict["fhdhr"]["epg_method"]
|
||||
|
||||
def sub_el(self, parent, name, text=None, **kwargs):
|
||||
el = xml.etree.ElementTree.SubElement(parent, name, **kwargs)
|
||||
if text:
|
||||
el.text = text
|
||||
return el
|
||||
|
||||
def xmltv_headers(self):
|
||||
"""This method creates the XML headers for our xmltv"""
|
||||
xmltvgen = xml.etree.ElementTree.Element('tv')
|
||||
xmltvgen.set('source-info-url', self.config.dict["fhdhr"]["friendlyname"])
|
||||
xmltvgen.set('source-info-name', self.config.dict["main"]["servicename"])
|
||||
xmltvgen.set('generator-info-name', 'fHDHR')
|
||||
xmltvgen.set('generator-info-url', 'fHDHR/' + self.config.dict["main"]["reponame"])
|
||||
return xmltvgen
|
||||
|
||||
def xmltv_file(self, xmltvgen):
|
||||
"""This method is used to close out the xml file"""
|
||||
xmltvfile = BytesIO()
|
||||
xmltvfile.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
xmltvfile.write(xml.etree.ElementTree.tostring(xmltvgen, encoding='UTF-8'))
|
||||
return xmltvfile.getvalue()
|
||||
|
||||
def xmltv_empty(self):
|
||||
"""This method is called when creation of a full xmltv is not possible"""
|
||||
return self.xmltv_file(self.xmltv_headers())
|
||||
|
||||
def create_xmltv(self, base_url, epgdict):
|
||||
if not epgdict:
|
||||
return self.xmltv_empty()
|
||||
|
||||
out = self.xmltv_headers()
|
||||
|
||||
for c in list(epgdict.keys()):
|
||||
|
||||
c_out = self.sub_el(out, 'channel', id=str(epgdict[c]['number']))
|
||||
self.sub_el(c_out, 'display-name',
|
||||
text='%s %s' % (epgdict[c]['number'], epgdict[c]['callsign']))
|
||||
self.sub_el(c_out, 'display-name',
|
||||
text='%s %s %s' % (epgdict[c]['number'], epgdict[c]['callsign'], str(epgdict[c]['id'])))
|
||||
self.sub_el(c_out, 'display-name', text=epgdict[c]['number'])
|
||||
self.sub_el(c_out, 'display-name',
|
||||
text='%s %s fcc' % (epgdict[c]['number'], epgdict[c]['callsign']))
|
||||
self.sub_el(c_out, 'display-name', text=epgdict[c]['callsign'])
|
||||
self.sub_el(c_out, 'display-name', text=epgdict[c]['callsign'])
|
||||
self.sub_el(c_out, 'display-name', text=epgdict[c]['name'])
|
||||
|
||||
if epgdict[c]["thumbnail"] is not None:
|
||||
self.sub_el(c_out, 'icon', src=("http://" + str(base_url) + "/images?source=epg&type=channel&id=" + epgdict[c]['id']))
|
||||
else:
|
||||
self.sub_el(c_out, 'icon', src=("http://" + str(base_url) + "/images?source=generate&message=" + epgdict[c]['number']))
|
||||
|
||||
for channelnum in list(epgdict.keys()):
|
||||
|
||||
channel_listing = epgdict[channelnum]['listing']
|
||||
|
||||
for program in channel_listing:
|
||||
|
||||
prog_out = self.sub_el(out, 'programme',
|
||||
start=program['time_start'],
|
||||
stop=program['time_end'],
|
||||
channel=str(channelnum))
|
||||
|
||||
self.sub_el(prog_out, 'title', lang='en', text=program['title'])
|
||||
|
||||
self.sub_el(prog_out, 'desc', lang='en', text=program['description'])
|
||||
|
||||
self.sub_el(prog_out, 'sub-title', lang='en', text='Movie: ' + program['sub-title'])
|
||||
|
||||
self.sub_el(prog_out, 'length', units='minutes', text=str(int(program['duration_minutes'])))
|
||||
|
||||
for f in program['genres']:
|
||||
self.sub_el(prog_out, 'category', lang='en', text=f)
|
||||
self.sub_el(prog_out, 'genre', lang='en', text=f)
|
||||
|
||||
if program['seasonnumber'] and program['episodenumber']:
|
||||
s_ = int(str(program['seasonnumber']), 10)
|
||||
e_ = int(str(program['episodenumber']), 10)
|
||||
self.sub_el(prog_out, 'episode-num', system='dd_progid',
|
||||
text=str(program['id']))
|
||||
self.sub_el(prog_out, 'episode-num', system='common',
|
||||
text='S%02dE%02d' % (s_, e_))
|
||||
self.sub_el(prog_out, 'episode-num', system='xmltv_ns',
|
||||
text='%d.%d.' % (int(s_)-1, int(e_)-1))
|
||||
self.sub_el(prog_out, 'episode-num', system='SxxExx">S',
|
||||
text='S%02dE%02d' % (s_, e_))
|
||||
|
||||
if program["thumbnail"]:
|
||||
self.sub_el(prog_out, 'icon', src=("http://" + str(base_url) + "/images?source=epg&type=content&id=" + program['id']))
|
||||
else:
|
||||
self.sub_el(prog_out, 'icon', src=("http://" + str(base_url) + "/images?source=generate&message=" + program['title'].replace(" ", "")))
|
||||
|
||||
if program['rating']:
|
||||
rating_out = self.sub_el(prog_out, 'rating', system="MPAA")
|
||||
self.sub_el(rating_out, 'value', text=program['rating'])
|
||||
|
||||
if program['isnew']:
|
||||
self.sub_el(prog_out, 'new')
|
||||
|
||||
return self.xmltv_file(out)
|
||||
@ -1,4 +1,12 @@
|
||||
|
||||
class TunerError(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return 'LoginError: %s' % self.value
|
||||
|
||||
|
||||
class LoginError(Exception):
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
@ -1,203 +1,69 @@
|
||||
from gevent.pywsgi import WSGIServer
|
||||
from flask import (Flask, send_from_directory, request, Response,
|
||||
abort, stream_with_context)
|
||||
from io import BytesIO
|
||||
import xml.etree.ElementTree as ET
|
||||
import json
|
||||
import requests
|
||||
import subprocess
|
||||
import threading
|
||||
import PIL.Image
|
||||
import PIL.ImageDraw
|
||||
import PIL.ImageFont
|
||||
|
||||
|
||||
def sub_el(parent, name, text=None, **kwargs):
|
||||
el = ET.SubElement(parent, name, **kwargs)
|
||||
if text:
|
||||
el.text = text
|
||||
return el
|
||||
|
||||
|
||||
def getSize(txt, font):
|
||||
testImg = PIL.Image.new('RGB', (1, 1))
|
||||
testDraw = PIL.ImageDraw.Draw(testImg)
|
||||
return testDraw.textsize(txt, font)
|
||||
from . import fHDHRdevice
|
||||
from fHDHR.fHDHRerrors import TunerError
|
||||
|
||||
|
||||
class HDHR_Hub():
|
||||
config = None
|
||||
origserv = None
|
||||
epghandling = None
|
||||
station_scan = False
|
||||
station_list = []
|
||||
http = None
|
||||
|
||||
def __init__(self):
|
||||
self.tuner_lock = threading.Lock()
|
||||
self.tuners = 0
|
||||
pass
|
||||
|
||||
def hubprep(self, settings, origserv, epghandling):
|
||||
self.config = settings
|
||||
|
||||
self.devicexml = fHDHRdevice.Device_XML(settings)
|
||||
self.discoverjson = fHDHRdevice.Discover_JSON(settings)
|
||||
self.lineupxml = fHDHRdevice.Lineup_XML(settings, origserv)
|
||||
self.lineupjson = fHDHRdevice.Lineup_JSON(settings, origserv)
|
||||
self.lineupstatusjson = fHDHRdevice.Lineup_Status_JSON(settings, origserv)
|
||||
self.images = fHDHRdevice.imageHandler(settings, epghandling)
|
||||
self.tuners = fHDHRdevice.Tuners(settings)
|
||||
self.station_scan = fHDHRdevice.Station_Scan(settings, origserv)
|
||||
self.xmltv = fHDHRdevice.xmlTV_XML(settings, epghandling)
|
||||
self.htmlerror = fHDHRdevice.HTMLerror(settings)
|
||||
|
||||
self.debug = fHDHRdevice.Debug_JSON(settings, origserv, epghandling)
|
||||
|
||||
def hubprep(self, config, origserv, epghandling):
|
||||
self.config = config
|
||||
self.max_tuners = int(self.config.dict["fhdhr"]["tuner_count"])
|
||||
self.station_scan = False
|
||||
self.origserv = origserv
|
||||
self.epghandling = epghandling
|
||||
|
||||
def tuner_usage(self, number):
|
||||
self.tuner_lock.acquire()
|
||||
self.tuners += number
|
||||
if self.tuners < 0:
|
||||
self.tuners = 0
|
||||
elif self.tuners > self.max_tuners:
|
||||
self.tuners = self.max_tuners
|
||||
self.tuner_lock.release()
|
||||
def tuner_grab(self):
|
||||
self.tuners.tuner_grab()
|
||||
|
||||
def get_tuner(self):
|
||||
if self.tuners <= self.max_tuners:
|
||||
return True
|
||||
return False
|
||||
def tuner_close(self):
|
||||
self.tuners.tuner_close()
|
||||
|
||||
def get_xmltv(self, base_url):
|
||||
return self.epghandling.get_xmltv(base_url)
|
||||
return self.xmltv.get_xmltv_xml(base_url)
|
||||
|
||||
def generate_image(self, messagetype, message):
|
||||
if messagetype == "channel":
|
||||
width = 360
|
||||
height = 270
|
||||
fontsize = 72
|
||||
elif messagetype == "content":
|
||||
width = 1080
|
||||
height = 1440
|
||||
fontsize = 100
|
||||
|
||||
colorBackground = "#228822"
|
||||
colorText = "#717D7E"
|
||||
colorOutline = "#717D7E"
|
||||
fontname = str(self.config.dict["filedir"]["font"])
|
||||
|
||||
font = PIL.ImageFont.truetype(fontname, fontsize)
|
||||
text_width, text_height = getSize(message, font)
|
||||
img = PIL.Image.new('RGBA', (width+4, height+4), colorBackground)
|
||||
d = PIL.ImageDraw.Draw(img)
|
||||
d.text(((width-text_width)/2, (height-text_height)/2), message, fill=colorText, font=font)
|
||||
d.rectangle((0, 0, width+3, height+3), outline=colorOutline)
|
||||
|
||||
s = BytesIO()
|
||||
img.save(s, 'png')
|
||||
return s.getvalue()
|
||||
|
||||
def get_image(self, req_args):
|
||||
|
||||
imageUri = self.epghandling.get_thumbnail(req_args["type"], req_args["id"])
|
||||
if not imageUri:
|
||||
return self.generate_image(req_args["type"], req_args["id"])
|
||||
|
||||
try:
|
||||
req = requests.get(imageUri)
|
||||
return req.content
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return self.generate_image(req_args["type"], req_args["id"])
|
||||
|
||||
def get_image_type(self, image_data):
|
||||
header_byte = image_data[0:3].hex().lower()
|
||||
if header_byte == '474946':
|
||||
return "image/gif"
|
||||
elif header_byte == '89504e':
|
||||
return "image/png"
|
||||
elif header_byte == 'ffd8ff':
|
||||
return "image/jpeg"
|
||||
else:
|
||||
return "image/jpeg"
|
||||
|
||||
def get_xmldiscover(self, base_url):
|
||||
out = ET.Element('root')
|
||||
out.set('xmlns', "urn:schemas-upnp-org:device-1-0")
|
||||
|
||||
sub_el(out, 'URLBase', "http://" + base_url)
|
||||
|
||||
specVersion_out = sub_el(out, 'specVersion')
|
||||
sub_el(specVersion_out, 'major', "1")
|
||||
sub_el(specVersion_out, 'minor', "0")
|
||||
|
||||
device_out = sub_el(out, 'device')
|
||||
sub_el(device_out, 'deviceType', "urn:schemas-upnp-org:device:MediaServer:1")
|
||||
sub_el(device_out, 'friendlyName', self.config.dict["fhdhr"]["friendlyname"])
|
||||
sub_el(device_out, 'manufacturer', self.config.dict["dev"]["reporting_manufacturer"])
|
||||
sub_el(device_out, 'modelName', self.config.dict["dev"]["reporting_model"])
|
||||
sub_el(device_out, 'modelNumber', self.config.dict["dev"]["reporting_model"])
|
||||
sub_el(device_out, 'serialNumber')
|
||||
sub_el(device_out, 'UDN', "uuid:" + self.config.dict["main"]["uuid"])
|
||||
|
||||
fakefile = BytesIO()
|
||||
fakefile.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
fakefile.write(ET.tostring(out, encoding='UTF-8'))
|
||||
return fakefile.getvalue()
|
||||
def get_device_xml(self, base_url):
|
||||
return self.devicexml.get_device_xml(base_url)
|
||||
|
||||
def get_discover_json(self, base_url):
|
||||
jsondiscover = {
|
||||
"FriendlyName": self.config.dict["fhdhr"]["friendlyname"],
|
||||
"Manufacturer": "Borondust",
|
||||
"ModelNumber": self.config.dict["dev"]["reporting_model"],
|
||||
"FirmwareName": self.config.dict["dev"]["reporting_firmware_name"],
|
||||
"TunerCount": self.config.dict["fhdhr"]["tuner_count"],
|
||||
"FirmwareVersion": self.config.dict["dev"]["reporting_firmware_ver"],
|
||||
"DeviceID": self.config.dict["main"]["uuid"],
|
||||
"DeviceAuth": "fHDHR",
|
||||
"BaseURL": "http://" + base_url,
|
||||
"LineupURL": "http://" + base_url + "/lineup.json"
|
||||
}
|
||||
return jsondiscover
|
||||
return self.discoverjson.get_discover_json(base_url)
|
||||
|
||||
def get_lineup_status(self):
|
||||
if self.station_scan:
|
||||
channel_count = self.origserv.get_station_total()
|
||||
jsonlineup = {
|
||||
"ScanInProgress": "true",
|
||||
"Progress": 99,
|
||||
"Found": channel_count
|
||||
}
|
||||
else:
|
||||
jsonlineup = {
|
||||
"ScanInProgress": "false",
|
||||
"ScanPossible": "true",
|
||||
"Source": self.config.dict["dev"]["reporting_tuner_type"],
|
||||
"SourceList": [self.config.dict["dev"]["reporting_tuner_type"]],
|
||||
}
|
||||
return jsonlineup
|
||||
def get_lineup_status_json(self):
|
||||
return self.lineupstatusjson.get_lineup_json(self.station_scan.scanning())
|
||||
|
||||
def get_lineup_xml(self, base_url):
|
||||
out = ET.Element('Lineup')
|
||||
station_list = self.origserv.get_station_list(base_url)
|
||||
for station_item in station_list:
|
||||
program_out = sub_el(out, 'Program')
|
||||
sub_el(program_out, 'GuideNumber', station_item['GuideNumber'])
|
||||
sub_el(program_out, 'GuideName', station_item['GuideName'])
|
||||
sub_el(program_out, 'URL', station_item['URL'])
|
||||
return self.lineupxml.get_lineup_xml(base_url)
|
||||
|
||||
fakefile = BytesIO()
|
||||
fakefile.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
fakefile.write(ET.tostring(out, encoding='UTF-8'))
|
||||
return fakefile.getvalue()
|
||||
def get_lineup_json(self, base_url):
|
||||
return self.lineupjson.get_lineup_json(base_url)
|
||||
|
||||
def get_debug(self, base_url):
|
||||
debugjson = {
|
||||
"base_url": base_url,
|
||||
}
|
||||
return debugjson
|
||||
def get_debug_json(self, base_url):
|
||||
return self.debug.get_debug_json(base_url, self.tuners.tuners)
|
||||
|
||||
def get_html_error(self, message):
|
||||
htmlerror = """<html>
|
||||
<head></head>
|
||||
<body>
|
||||
<h2>{}</h2>
|
||||
</body>
|
||||
</html>"""
|
||||
return htmlerror.format(message)
|
||||
return self.htmlerror.get_html_error(message)
|
||||
|
||||
def station_scan_change(self, enablement):
|
||||
self.station_scan = enablement
|
||||
def post_lineup_scan_start(self):
|
||||
self.station_scan.scan()
|
||||
|
||||
|
||||
hdhr = HDHR_Hub()
|
||||
@ -219,24 +85,24 @@ class HDHR_HTTP_Server():
|
||||
@app.route('/device.xml', methods=['GET'])
|
||||
def device_xml():
|
||||
base_url = request.headers["host"]
|
||||
devicexml = hdhr.get_xmldiscover(base_url)
|
||||
device_xml = hdhr.get_device_xml(base_url)
|
||||
return Response(status=200,
|
||||
response=devicexml,
|
||||
response=device_xml,
|
||||
mimetype='application/xml')
|
||||
|
||||
@app.route('/discover.json', methods=['GET'])
|
||||
def discover_json():
|
||||
base_url = request.headers["host"]
|
||||
jsondiscover = hdhr.get_discover_json(base_url)
|
||||
discover_json = hdhr.get_discover_json(base_url)
|
||||
return Response(status=200,
|
||||
response=json.dumps(jsondiscover, indent=4),
|
||||
response=discover_json,
|
||||
mimetype='application/json')
|
||||
|
||||
@app.route('/lineup_status.json', methods=['GET'])
|
||||
def lineup_status_json():
|
||||
linup_status_json = hdhr.get_lineup_status()
|
||||
linup_status_json = hdhr.get_lineup_status_json()
|
||||
return Response(status=200,
|
||||
response=json.dumps(linup_status_json, indent=4),
|
||||
response=linup_status_json,
|
||||
mimetype='application/json')
|
||||
|
||||
@app.route('/lineup.xml', methods=['GET'])
|
||||
@ -250,9 +116,9 @@ class HDHR_HTTP_Server():
|
||||
@app.route('/lineup.json', methods=['GET'])
|
||||
def lineup_json():
|
||||
base_url = request.headers["host"]
|
||||
station_list = hdhr.origserv.get_station_list(base_url)
|
||||
station_list = hdhr.get_lineup_json(base_url)
|
||||
return Response(status=200,
|
||||
response=json.dumps(station_list, indent=4),
|
||||
response=station_list,
|
||||
mimetype='application/json')
|
||||
|
||||
@app.route('/xmltv.xml', methods=['GET'])
|
||||
@ -266,16 +132,16 @@ class HDHR_HTTP_Server():
|
||||
@app.route('/debug.json', methods=['GET'])
|
||||
def debug_json():
|
||||
base_url = request.headers["host"]
|
||||
debugreport = hdhr.get_debug(base_url)
|
||||
debugreport = hdhr.get_debug_json(base_url)
|
||||
return Response(status=200,
|
||||
response=json.dumps(debugreport, indent=4),
|
||||
response=debugreport,
|
||||
mimetype='application/json')
|
||||
|
||||
@app.route('/images', methods=['GET'])
|
||||
def images():
|
||||
|
||||
if 'source' not in list(request.args.keys()):
|
||||
image = hdhr.generate_image("content", "Unknown Request")
|
||||
image = hdhr.images.generate_image("content", "Unknown Request")
|
||||
else:
|
||||
|
||||
itemtype = 'content'
|
||||
@ -289,19 +155,19 @@ class HDHR_HTTP_Server():
|
||||
"type": request.args["type"],
|
||||
"id": request.args["id"],
|
||||
}
|
||||
image = hdhr.get_image(req_dict)
|
||||
image = hdhr.images.get_image(req_dict)
|
||||
else:
|
||||
itemmessage = "Unknown Request"
|
||||
image = hdhr.generate_image(itemtype, itemmessage)
|
||||
image = hdhr.images.generate_image(itemtype, itemmessage)
|
||||
elif request.args['source'] == 'generate':
|
||||
itemmessage = "Unknown Request"
|
||||
if 'message' in list(request.args.keys()):
|
||||
itemmessage = request.args["message"]
|
||||
image = hdhr.generate_image(itemtype, itemmessage)
|
||||
image = hdhr.images.generate_image(itemtype, itemmessage)
|
||||
else:
|
||||
itemmessage = "Unknown Request"
|
||||
image = hdhr.generate_image(itemtype, itemmessage)
|
||||
return Response(image, content_type=hdhr.get_image_type(image), direct_passthrough=True)
|
||||
image = hdhr.images.generate_image(itemtype, itemmessage)
|
||||
return Response(image, content_type=hdhr.images.get_image_type(image), direct_passthrough=True)
|
||||
|
||||
@app.route('/watch', methods=['GET'])
|
||||
def watch():
|
||||
@ -311,14 +177,14 @@ class HDHR_HTTP_Server():
|
||||
method = str(request.args["method"])
|
||||
channel_id = str(request.args["channel"])
|
||||
|
||||
tuner = hdhr.get_tuner()
|
||||
if not tuner:
|
||||
try:
|
||||
hdhr.tuner_grab()
|
||||
except TunerError:
|
||||
print("A " + method + " stream request for channel " +
|
||||
str(channel_id) + " was rejected do to a lack of available tuners.")
|
||||
abort(503)
|
||||
|
||||
print("Attempting a " + method + " stream request for channel " + str(channel_id))
|
||||
hdhr.tuner_usage(1)
|
||||
|
||||
channelUri = hdhr.origserv.get_channel_stream(channel_id)
|
||||
# print("Proxy URL determined as " + str(channelUri))
|
||||
@ -335,7 +201,7 @@ class HDHR_HTTP_Server():
|
||||
except GeneratorExit:
|
||||
req.close()
|
||||
print("Connection Closed.")
|
||||
hdhr.tuner_usage(-1)
|
||||
hdhr.tuner_close()
|
||||
|
||||
return Response(generate(), content_type=req.headers['content-type'], direct_passthrough=True)
|
||||
|
||||
@ -366,12 +232,11 @@ class HDHR_HTTP_Server():
|
||||
ffmpeg_proc.terminate()
|
||||
ffmpeg_proc.communicate()
|
||||
print("Connection Closed: " + str(e))
|
||||
hdhr.tuner_usage(-1)
|
||||
except GeneratorExit:
|
||||
ffmpeg_proc.terminate()
|
||||
ffmpeg_proc.communicate()
|
||||
print("Connection Closed.")
|
||||
hdhr.tuner_usage(-1)
|
||||
hdhr.tuner_close()
|
||||
|
||||
return Response(stream_with_context(generate()), mimetype="audio/mpeg")
|
||||
|
||||
@ -379,25 +244,20 @@ class HDHR_HTTP_Server():
|
||||
def lineup_post():
|
||||
if 'scan' in list(request.args.keys()):
|
||||
if request.args['scan'] == 'start':
|
||||
hdhr.station_scan_change(True)
|
||||
hdhr.station_list = []
|
||||
hdhr.station_scan_change(False)
|
||||
hdhr.post_lineup_scan_start()
|
||||
return Response(status=200, mimetype='text/html')
|
||||
|
||||
elif request.args['scan'] == 'abort':
|
||||
return Response(status=200, mimetype='text/html')
|
||||
|
||||
else:
|
||||
print("Unknown scan command " + request.args['scan'])
|
||||
currenthtmlerror = hdhr.get_html_error("501 - " + request.args['scan'] + " is not a valid scan command")
|
||||
return Response(status=200, response=currenthtmlerror, mimetype='text/html')
|
||||
|
||||
else:
|
||||
currenthtmlerror = hdhr.get_html_error("501 - not a valid command")
|
||||
return Response(status=200, response=currenthtmlerror, mimetype='text/html')
|
||||
|
||||
def __init__(self, config):
|
||||
self.config = config
|
||||
def __init__(self, settings):
|
||||
self.config = settings
|
||||
|
||||
def run(self):
|
||||
self.http = WSGIServer((
|
||||
@ -410,8 +270,8 @@ class HDHR_HTTP_Server():
|
||||
self.http.stop()
|
||||
|
||||
|
||||
def interface_start(config, origserv, epghandling):
|
||||
def interface_start(settings, origserv, epghandling):
|
||||
print("Starting fHDHR Web Interface")
|
||||
hdhr.hubprep(config, origserv, epghandling)
|
||||
fakhdhrserver = HDHR_HTTP_Server(config)
|
||||
hdhr.hubprep(settings, origserv, epghandling)
|
||||
fakhdhrserver = HDHR_HTTP_Server(settings)
|
||||
fakhdhrserver.run()
|
||||
|
||||
14
fHDHR/fHDHRweb/fHDHRdevice/__init__.py
Normal file
14
fHDHR/fHDHRweb/fHDHRdevice/__init__.py
Normal file
@ -0,0 +1,14 @@
|
||||
# pylama:ignore=W0611
|
||||
from .tuners import Tuners
|
||||
from .images import imageHandler
|
||||
from .station_scan import Station_Scan
|
||||
|
||||
from .discover_json import Discover_JSON
|
||||
from .device_xml import Device_XML
|
||||
from .lineup_xml import Lineup_XML
|
||||
from .lineup_json import Lineup_JSON
|
||||
from .debug_json import Debug_JSON
|
||||
from .lineup_status_json import Lineup_Status_JSON
|
||||
from .xmltv_xml import xmlTV_XML
|
||||
|
||||
from .htmlerror import HTMLerror
|
||||
14
fHDHR/fHDHRweb/fHDHRdevice/debug_json.py
Normal file
14
fHDHR/fHDHRweb/fHDHRdevice/debug_json.py
Normal file
@ -0,0 +1,14 @@
|
||||
import json
|
||||
|
||||
|
||||
class Debug_JSON():
|
||||
|
||||
def __init__(self, settings, origserv, epghandling):
|
||||
self.config = settings
|
||||
|
||||
def get_debug_json(self, base_url, tuners):
|
||||
debugjson = {
|
||||
"base_url": base_url,
|
||||
"available tuners": tuners
|
||||
}
|
||||
return json.dumps(debugjson, indent=4)
|
||||
38
fHDHR/fHDHRweb/fHDHRdevice/device_xml.py
Normal file
38
fHDHR/fHDHRweb/fHDHRdevice/device_xml.py
Normal file
@ -0,0 +1,38 @@
|
||||
import xml.etree.ElementTree
|
||||
from io import BytesIO
|
||||
|
||||
from fHDHR.tools import sub_el
|
||||
|
||||
|
||||
class Device_XML():
|
||||
device_xml = None
|
||||
|
||||
def __init__(self, settings):
|
||||
self.config = settings
|
||||
|
||||
def get_device_xml(self, base_url, force_update=False):
|
||||
if not self.device_xml or force_update:
|
||||
out = xml.etree.ElementTree.Element('root')
|
||||
out.set('xmlns', "urn:schemas-upnp-org:device-1-0")
|
||||
|
||||
sub_el(out, 'URLBase', "http://" + base_url)
|
||||
|
||||
specVersion_out = sub_el(out, 'specVersion')
|
||||
sub_el(specVersion_out, 'major', "1")
|
||||
sub_el(specVersion_out, 'minor', "0")
|
||||
|
||||
device_out = sub_el(out, 'device')
|
||||
sub_el(device_out, 'deviceType', "urn:schemas-upnp-org:device:MediaServer:1")
|
||||
sub_el(device_out, 'friendlyName', self.config.dict["fhdhr"]["friendlyname"])
|
||||
sub_el(device_out, 'manufacturer', self.config.dict["dev"]["reporting_manufacturer"])
|
||||
sub_el(device_out, 'modelName', self.config.dict["dev"]["reporting_model"])
|
||||
sub_el(device_out, 'modelNumber', self.config.dict["dev"]["reporting_model"])
|
||||
sub_el(device_out, 'serialNumber')
|
||||
sub_el(device_out, 'UDN', "uuid:" + self.config.dict["main"]["uuid"])
|
||||
|
||||
fakefile = BytesIO()
|
||||
fakefile.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
fakefile.write(xml.etree.ElementTree.tostring(out, encoding='UTF-8'))
|
||||
self.device_xml = fakefile.getvalue()
|
||||
|
||||
return self.device_xml
|
||||
26
fHDHR/fHDHRweb/fHDHRdevice/discover_json.py
Normal file
26
fHDHR/fHDHRweb/fHDHRdevice/discover_json.py
Normal file
@ -0,0 +1,26 @@
|
||||
import json
|
||||
|
||||
|
||||
class Discover_JSON():
|
||||
discover_json = None
|
||||
|
||||
def __init__(self, settings):
|
||||
self.config = settings
|
||||
|
||||
def get_discover_json(self, base_url, force_update=False):
|
||||
if not self.discover_json or force_update:
|
||||
jsondiscover = {
|
||||
"FriendlyName": self.config.dict["fhdhr"]["friendlyname"],
|
||||
"Manufacturer": self.config.dict["dev"]["reporting_manufacturer"],
|
||||
"ModelNumber": self.config.dict["dev"]["reporting_model"],
|
||||
"FirmwareName": self.config.dict["dev"]["reporting_firmware_name"],
|
||||
"TunerCount": self.config.dict["fhdhr"]["tuner_count"],
|
||||
"FirmwareVersion": self.config.dict["dev"]["reporting_firmware_ver"],
|
||||
"DeviceID": self.config.dict["main"]["uuid"],
|
||||
"DeviceAuth": "fHDHR",
|
||||
"BaseURL": "http://" + base_url,
|
||||
"LineupURL": "http://" + base_url + "/lineup.json"
|
||||
}
|
||||
self.discover_json = json.dumps(jsondiscover, indent=4)
|
||||
|
||||
return self.discover_json
|
||||
13
fHDHR/fHDHRweb/fHDHRdevice/htmlerror.py
Normal file
13
fHDHR/fHDHRweb/fHDHRdevice/htmlerror.py
Normal file
@ -0,0 +1,13 @@
|
||||
|
||||
class HTMLerror():
|
||||
def __init__(self, settings):
|
||||
self.config = settings
|
||||
|
||||
def get_html_error(self, message):
|
||||
htmlerror = """<html>
|
||||
<head></head>
|
||||
<body>
|
||||
<h2>{}</h2>
|
||||
</body>
|
||||
</html>"""
|
||||
return htmlerror.format(message)
|
||||
67
fHDHR/fHDHRweb/fHDHRdevice/images.py
Normal file
67
fHDHR/fHDHRweb/fHDHRdevice/images.py
Normal file
@ -0,0 +1,67 @@
|
||||
from io import BytesIO
|
||||
import requests
|
||||
import PIL.Image
|
||||
import PIL.ImageDraw
|
||||
import PIL.ImageFont
|
||||
|
||||
|
||||
class imageHandler():
|
||||
|
||||
def __init__(self, settings, epghandling):
|
||||
self.config = settings
|
||||
self.epghandling = epghandling
|
||||
|
||||
def getSize(self, txt, font):
|
||||
testImg = PIL.Image.new('RGB', (1, 1))
|
||||
testDraw = PIL.ImageDraw.Draw(testImg)
|
||||
return testDraw.textsize(txt, font)
|
||||
|
||||
def generate_image(self, messagetype, message):
|
||||
if messagetype == "channel":
|
||||
width = 360
|
||||
height = 270
|
||||
fontsize = 72
|
||||
elif messagetype == "content":
|
||||
width = 1080
|
||||
height = 1440
|
||||
fontsize = 100
|
||||
|
||||
colorBackground = "#228822"
|
||||
colorText = "#717D7E"
|
||||
colorOutline = "#717D7E"
|
||||
fontname = str(self.config.dict["filedir"]["font"])
|
||||
|
||||
font = PIL.ImageFont.truetype(fontname, fontsize)
|
||||
text_width, text_height = self.getSize(message, font)
|
||||
img = PIL.Image.new('RGBA', (width+4, height+4), colorBackground)
|
||||
d = PIL.ImageDraw.Draw(img)
|
||||
d.text(((width-text_width)/2, (height-text_height)/2), message, fill=colorText, font=font)
|
||||
d.rectangle((0, 0, width+3, height+3), outline=colorOutline)
|
||||
|
||||
s = BytesIO()
|
||||
img.save(s, 'png')
|
||||
return s.getvalue()
|
||||
|
||||
def get_image(self, req_args):
|
||||
|
||||
imageUri = self.epghandling.get_thumbnail(req_args["type"], req_args["id"])
|
||||
if not imageUri:
|
||||
return self.generate_image(req_args["type"], req_args["id"])
|
||||
|
||||
try:
|
||||
req = requests.get(imageUri)
|
||||
return req.content
|
||||
except Exception as e:
|
||||
print(e)
|
||||
return self.generate_image(req_args["type"], req_args["id"])
|
||||
|
||||
def get_image_type(self, image_data):
|
||||
header_byte = image_data[0:3].hex().lower()
|
||||
if header_byte == '474946':
|
||||
return "image/gif"
|
||||
elif header_byte == '89504e':
|
||||
return "image/png"
|
||||
elif header_byte == 'ffd8ff':
|
||||
return "image/jpeg"
|
||||
else:
|
||||
return "image/jpeg"
|
||||
16
fHDHR/fHDHRweb/fHDHRdevice/lineup_json.py
Normal file
16
fHDHR/fHDHRweb/fHDHRdevice/lineup_json.py
Normal file
@ -0,0 +1,16 @@
|
||||
import json
|
||||
|
||||
|
||||
class Lineup_JSON():
|
||||
lineup_json = None
|
||||
|
||||
def __init__(self, settings, origserv):
|
||||
self.config = settings
|
||||
self.origserv = origserv
|
||||
|
||||
def get_lineup_json(self, base_url, force_update=False):
|
||||
if not self.lineup_json or force_update:
|
||||
jsonlineup = self.origserv.get_station_list(base_url)
|
||||
self.lineup_json = json.dumps(jsonlineup, indent=4)
|
||||
|
||||
return self.lineup_json
|
||||
35
fHDHR/fHDHRweb/fHDHRdevice/lineup_status_json.py
Normal file
35
fHDHR/fHDHRweb/fHDHRdevice/lineup_status_json.py
Normal file
@ -0,0 +1,35 @@
|
||||
import json
|
||||
|
||||
|
||||
class Lineup_Status_JSON():
|
||||
|
||||
def __init__(self, settings, origserv):
|
||||
self.config = settings
|
||||
self.origserv = origserv
|
||||
|
||||
def get_lineup_json(self, station_scanning):
|
||||
if station_scanning:
|
||||
jsonlineup = self.scan_in_progress()
|
||||
elif not self.origserv.get_station_total():
|
||||
jsonlineup = self.scan_in_progress()
|
||||
else:
|
||||
jsonlineup = self.not_scanning()
|
||||
return json.dumps(jsonlineup, indent=4)
|
||||
|
||||
def scan_in_progress(self):
|
||||
channel_count = self.origserv.get_station_total()
|
||||
jsonlineup = {
|
||||
"ScanInProgress": "true",
|
||||
"Progress": 99,
|
||||
"Found": channel_count
|
||||
}
|
||||
return jsonlineup
|
||||
|
||||
def not_scanning(self):
|
||||
jsonlineup = {
|
||||
"ScanInProgress": "false",
|
||||
"ScanPossible": "true",
|
||||
"Source": self.config.dict["dev"]["reporting_tuner_type"],
|
||||
"SourceList": [self.config.dict["dev"]["reporting_tuner_type"]],
|
||||
}
|
||||
return jsonlineup
|
||||
29
fHDHR/fHDHRweb/fHDHRdevice/lineup_xml.py
Normal file
29
fHDHR/fHDHRweb/fHDHRdevice/lineup_xml.py
Normal file
@ -0,0 +1,29 @@
|
||||
import xml.etree.ElementTree
|
||||
from io import BytesIO
|
||||
|
||||
from fHDHR.tools import sub_el
|
||||
|
||||
|
||||
class Lineup_XML():
|
||||
device_xml = None
|
||||
|
||||
def __init__(self, settings, origserv):
|
||||
self.config = settings
|
||||
self.origserv = origserv
|
||||
|
||||
def get_lineup_xml(self, base_url, force_update=False):
|
||||
if not self.device_xml or force_update:
|
||||
out = xml.etree.ElementTree.Element('Lineup')
|
||||
station_list = self.origserv.get_station_list(base_url)
|
||||
for station_item in station_list:
|
||||
program_out = sub_el(out, 'Program')
|
||||
sub_el(program_out, 'GuideNumber', station_item['GuideNumber'])
|
||||
sub_el(program_out, 'GuideName', station_item['GuideName'])
|
||||
sub_el(program_out, 'URL', station_item['URL'])
|
||||
|
||||
fakefile = BytesIO()
|
||||
fakefile.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
fakefile.write(xml.etree.ElementTree.tostring(out, encoding='UTF-8'))
|
||||
self.device_xml = fakefile.getvalue()
|
||||
|
||||
return self.device_xml
|
||||
27
fHDHR/fHDHRweb/fHDHRdevice/station_scan.py
Normal file
27
fHDHR/fHDHRweb/fHDHRdevice/station_scan.py
Normal file
@ -0,0 +1,27 @@
|
||||
from multiprocessing import Process
|
||||
|
||||
|
||||
class Station_Scan():
|
||||
|
||||
def __init__(self, settings, origserv):
|
||||
self.config = settings
|
||||
self.origserv = origserv
|
||||
self.chanscan = Process(target=self.runscan)
|
||||
|
||||
def scan(self):
|
||||
print("Channel Scan Requested by Client.")
|
||||
try:
|
||||
self.chanscan.start()
|
||||
except AssertionError:
|
||||
print("Channel Scan Already In Progress!")
|
||||
|
||||
def runscan(self):
|
||||
self.origserv.get_channels(forceupdate=True)
|
||||
print("Requested Channel Scan Complete.")
|
||||
|
||||
def scanning(self):
|
||||
try:
|
||||
self.chanscan.join(timeout=0)
|
||||
return self.chanscan.is_alive()
|
||||
except AssertionError:
|
||||
return False
|
||||
26
fHDHR/fHDHRweb/fHDHRdevice/tuners.py
Normal file
26
fHDHR/fHDHRweb/fHDHRdevice/tuners.py
Normal file
@ -0,0 +1,26 @@
|
||||
import threading
|
||||
|
||||
from fHDHR.fHDHRerrors import TunerError
|
||||
|
||||
|
||||
class Tuners():
|
||||
|
||||
def __init__(self, settings):
|
||||
self.config = settings
|
||||
|
||||
self.max_tuners = int(self.config.dict["fhdhr"]["tuner_count"])
|
||||
self.tuners = self.max_tuners
|
||||
self.tuner_lock = threading.Lock()
|
||||
|
||||
def tuner_grab(self):
|
||||
self.tuner_lock.acquire()
|
||||
if self.tuners == 0:
|
||||
self.tuner_lock.release()
|
||||
raise TunerError("No Available Tuners.")
|
||||
self.tuners -= 1
|
||||
self.tuner_lock.release()
|
||||
|
||||
def tuner_close(self):
|
||||
self.tuner_lock.acquire()
|
||||
self.tuners += 1
|
||||
self.tuner_lock.release()
|
||||
130
fHDHR/fHDHRweb/fHDHRdevice/xmltv_xml.py
Normal file
130
fHDHR/fHDHRweb/fHDHRdevice/xmltv_xml.py
Normal file
@ -0,0 +1,130 @@
|
||||
import xml.etree.ElementTree
|
||||
from io import BytesIO
|
||||
import time
|
||||
|
||||
from fHDHR.tools import sub_el
|
||||
|
||||
|
||||
class xmlTV_XML():
|
||||
"""Methods to create xmltv.xml"""
|
||||
xmltv_xml = None
|
||||
|
||||
def __init__(self, settings, epghandling):
|
||||
self.config = settings
|
||||
self.epghandling = epghandling
|
||||
self.updated_at = None
|
||||
self.epg_method = self.config.dict["fhdhr"]["epg_method"]
|
||||
self.epg_sleeptime = self.config.dict[self.epg_method]["epg_update_frequency"]
|
||||
|
||||
def get_xmltv_xml(self, base_url, force_update=False):
|
||||
nowtime = time.time()
|
||||
update_xmltv = False
|
||||
|
||||
if not self.xmltv_xml or force_update:
|
||||
update_xmltv = True
|
||||
elif not self.updated_at:
|
||||
update_xmltv = True
|
||||
elif nowtime >= (self.updated_at + self.epg_sleeptime):
|
||||
update_xmltv = True
|
||||
|
||||
if update_xmltv:
|
||||
print("Updating xmltv cache.")
|
||||
epgdict = self.epghandling.epgtypes.get_epg()
|
||||
self.xmltv_xml = self.create_xmltv(base_url, epgdict)
|
||||
self.updated_at = nowtime
|
||||
|
||||
return self.xmltv_xml
|
||||
|
||||
def xmltv_headers(self):
|
||||
"""This method creates the XML headers for our xmltv"""
|
||||
xmltvgen = xml.etree.ElementTree.Element('tv')
|
||||
xmltvgen.set('source-info-url', self.config.dict["fhdhr"]["friendlyname"])
|
||||
xmltvgen.set('source-info-name', self.config.dict["main"]["servicename"])
|
||||
xmltvgen.set('generator-info-name', 'fHDHR')
|
||||
xmltvgen.set('generator-info-url', 'fHDHR/' + self.config.dict["main"]["reponame"])
|
||||
return xmltvgen
|
||||
|
||||
def xmltv_file(self, xmltvgen):
|
||||
"""This method is used to close out the xml file"""
|
||||
xmltvfile = BytesIO()
|
||||
xmltvfile.write(b'<?xml version="1.0" encoding="UTF-8"?>\n')
|
||||
xmltvfile.write(xml.etree.ElementTree.tostring(xmltvgen, encoding='UTF-8'))
|
||||
return xmltvfile.getvalue()
|
||||
|
||||
def xmltv_empty(self):
|
||||
"""This method is called when creation of a full xmltv is not possible"""
|
||||
return self.xmltv_file(self.xmltv_headers())
|
||||
|
||||
def create_xmltv(self, base_url, epgdict):
|
||||
if not epgdict:
|
||||
return self.xmltv_empty()
|
||||
|
||||
out = self.xmltv_headers()
|
||||
|
||||
for c in list(epgdict.keys()):
|
||||
|
||||
c_out = sub_el(out, 'channel', id=str(epgdict[c]['number']))
|
||||
sub_el(c_out, 'display-name',
|
||||
text='%s %s' % (epgdict[c]['number'], epgdict[c]['callsign']))
|
||||
sub_el(c_out, 'display-name',
|
||||
text='%s %s %s' % (epgdict[c]['number'], epgdict[c]['callsign'], str(epgdict[c]['id'])))
|
||||
sub_el(c_out, 'display-name', text=epgdict[c]['number'])
|
||||
sub_el(c_out, 'display-name',
|
||||
text='%s %s fcc' % (epgdict[c]['number'], epgdict[c]['callsign']))
|
||||
sub_el(c_out, 'display-name', text=epgdict[c]['callsign'])
|
||||
sub_el(c_out, 'display-name', text=epgdict[c]['callsign'])
|
||||
sub_el(c_out, 'display-name', text=epgdict[c]['name'])
|
||||
|
||||
if epgdict[c]["thumbnail"] is not None:
|
||||
sub_el(c_out, 'icon', src=("http://" + str(base_url) + "/images?source=epg&type=channel&id=" + epgdict[c]['id']))
|
||||
else:
|
||||
sub_el(c_out, 'icon', src=("http://" + str(base_url) + "/images?source=generate&message=" + epgdict[c]['number']))
|
||||
|
||||
for channelnum in list(epgdict.keys()):
|
||||
|
||||
channel_listing = epgdict[channelnum]['listing']
|
||||
|
||||
for program in channel_listing:
|
||||
|
||||
prog_out = sub_el(out, 'programme',
|
||||
start=program['time_start'],
|
||||
stop=program['time_end'],
|
||||
channel=str(channelnum))
|
||||
|
||||
sub_el(prog_out, 'title', lang='en', text=program['title'])
|
||||
|
||||
sub_el(prog_out, 'desc', lang='en', text=program['description'])
|
||||
|
||||
sub_el(prog_out, 'sub-title', lang='en', text='Movie: ' + program['sub-title'])
|
||||
|
||||
sub_el(prog_out, 'length', units='minutes', text=str(int(program['duration_minutes'])))
|
||||
|
||||
for f in program['genres']:
|
||||
sub_el(prog_out, 'category', lang='en', text=f)
|
||||
sub_el(prog_out, 'genre', lang='en', text=f)
|
||||
|
||||
if program['seasonnumber'] and program['episodenumber']:
|
||||
s_ = int(str(program['seasonnumber']), 10)
|
||||
e_ = int(str(program['episodenumber']), 10)
|
||||
sub_el(prog_out, 'episode-num', system='dd_progid',
|
||||
text=str(program['id']))
|
||||
sub_el(prog_out, 'episode-num', system='common',
|
||||
text='S%02dE%02d' % (s_, e_))
|
||||
sub_el(prog_out, 'episode-num', system='xmltv_ns',
|
||||
text='%d.%d.' % (int(s_)-1, int(e_)-1))
|
||||
sub_el(prog_out, 'episode-num', system='SxxExx">S',
|
||||
text='S%02dE%02d' % (s_, e_))
|
||||
|
||||
if program["thumbnail"]:
|
||||
sub_el(prog_out, 'icon', src=("http://" + str(base_url) + "/images?source=epg&type=content&id=" + program['id']))
|
||||
else:
|
||||
sub_el(prog_out, 'icon', src=("http://" + str(base_url) + "/images?source=generate&message=" + program['title'].replace(" ", "")))
|
||||
|
||||
if program['rating']:
|
||||
rating_out = sub_el(prog_out, 'rating', system="MPAA")
|
||||
sub_el(rating_out, 'value', text=program['rating'])
|
||||
|
||||
if program['isnew']:
|
||||
sub_el(prog_out, 'new')
|
||||
|
||||
return self.xmltv_file(out)
|
||||
@ -26,13 +26,15 @@ class OriginService():
|
||||
for chankey in list(chan.keys()):
|
||||
self.channels["list"][chan["number"]][chankey] = chan[chankey]
|
||||
|
||||
def get_channels(self):
|
||||
def get_channels(self, forceupdate=False):
|
||||
|
||||
updatelist = False
|
||||
if not self.channels["list_updated"]:
|
||||
updatelist = True
|
||||
elif hours_between_datetime(self.channels["list_updated"], datetime.datetime.now()) > 12:
|
||||
updatelist = True
|
||||
elif forceupdate:
|
||||
updatelist = True
|
||||
|
||||
if updatelist:
|
||||
chanlist = self.serviceorigin.get_channels()
|
||||
|
||||
@ -2,6 +2,7 @@ import os
|
||||
import sys
|
||||
import ast
|
||||
import requests
|
||||
import xml.etree.ElementTree
|
||||
|
||||
UNARY_OPS = (ast.UAdd, ast.USub)
|
||||
BINARY_OPS = (ast.Add, ast.Sub, ast.Mult, ast.Div, ast.Mod)
|
||||
@ -13,6 +14,13 @@ def clean_exit():
|
||||
os._exit(0)
|
||||
|
||||
|
||||
def sub_el(parent, name, text=None, **kwargs):
|
||||
el = xml.etree.ElementTree.SubElement(parent, name, **kwargs)
|
||||
if text:
|
||||
el.text = text
|
||||
return el
|
||||
|
||||
|
||||
def xmldictmaker(inputdict, req_items, list_items=[], str_items=[]):
|
||||
xml_dict = {}
|
||||
|
||||
|
||||
Loading…
Reference in New Issue
Block a user