self.recorder = recorder
def read_callback(self, size):
+ chunk = None
+
try:
chunk = self.relay.read(size)
except:
return False
return True
-
def icecastloop_metadata(self):
try:
self.update_twitter_current()
from threading import Thread
-class HTTPStreamer(Thread):
+class HTTPStreamer(Thread):
protocol = 'http'
host = str
port = str
def __init__(self):
Thread.__init__(self)
import pycurl
+
self.curl = pycurl.Curl()
def set_callback(self, read_callback):
def open(self):
import pycurl
- self.uri = self.protocol + '://' + self.host + ':' + str(self.port) + \
- self.mount + '?' + 'password=' + self.password
+
+ self.uri = self.protocol + '://' + self.host + ':' + str(self.port)
+ self.uri += self.mount + '?' + 'password=' + self.password
self.curl.setopt(pycurl.URL, self.uri)
self.curl.setopt(pycurl.NOSIGNAL, 1)
self.curl.setopt(pycurl.UPLOAD, 1)
# Could make this the base class; will need to add 'publish'
class WriteXmlMixin:
- def write_xml(self, outfile, encoding = "iso-8859-1"):
+ def __init__(self):
+ pass
+
+ def write_xml(self, outfile, encoding="iso-8859-1"):
from xml.sax import saxutils
+
handler = saxutils.XMLGenerator(outfile, encoding)
handler.startDocument()
self.publish(handler)
handler.endDocument()
- def to_xml(self, encoding = "iso-8859-1"):
+ def to_xml(self, encoding="iso-8859-1"):
try:
import cStringIO as StringIO
except ImportError:
return f.getvalue()
-def _element(handler, name, obj, d = {}):
+def _element(handler, name, obj, d={}):
if isinstance(obj, basestring) or obj is None:
# special-case handling to make the API easier
# to use for the common case.
# It better know how to emit the correct XML.
obj.publish(handler)
+
def _opt_element(handler, name, obj):
if obj is None:
return
Input date must be in GMT.
"""
# Looks like:
- # Sat, 07 Sep 2002 00:00:01 GMT
+ # Sat, 07 Sep 2002 00:00:01 GMT
# Can't use strftime because that's locale dependent
#
# Isn't there a standard way to do this for Python? The
# rfc822 and email.Utils modules assume a timestamp. The
# following is based on the rfc822 module.
return "%s, %02d %s %04d %02d:%02d:%02d GMT" % (
- ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()],
- dt.day,
- ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
- "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"][dt.month-1],
- dt.year, dt.hour, dt.minute, dt.second)
+ ["Mon", "Tue", "Wed", "Thu", "Fri", "Sat", "Sun"][dt.weekday()],
+ dt.day,
+ ["Jan", "Feb", "Mar", "Apr", "May", "Jun",
+ "Jul", "Aug", "Sep", "Oct", "Nov", "Dec"][dt.month - 1],
+ dt.year, dt.hour, dt.minute, dt.second)
+
-
##
# A couple simple wrapper objects for the fields which
# take a simple value other than a string.
to text for XML.)
"""
element_attrs = {}
+
def __init__(self, name, val):
self.name = name
self.val = val
+
def publish(self, handler):
handler.startElement(self.name, self.element_attrs)
handler.characters(str(self.val))
handler.endElement(self.name)
+
class DateElement:
"""implements the 'publish' API for a datetime.datetime
Converts the datetime to RFC 2822 timestamp (4-digit year).
"""
+
def __init__(self, name, dt):
self.name = name
self.dt = dt
+
def publish(self, handler):
_element(handler, self.name, _format_date(self.dt))
+
+
####
class Category:
"""Publish a category element"""
- def __init__(self, category, domain = None):
+
+ def __init__(self, category, domain=None):
self.category = category
self.domain = domain
+
def publish(self, handler):
d = {}
if self.domain is not None:
d["domain"] = self.domain
_element(handler, "category", self.category, d)
+
class Cloud:
"""Publish a cloud"""
+
def __init__(self, domain, port, path,
registerProcedure, protocol):
self.domain = domain
self.path = path
self.registerProcedure = registerProcedure
self.protocol = protocol
+
def publish(self, handler):
_element(handler, "cloud", None, {
"domain": self.domain,
"registerProcedure": self.registerProcedure,
"protocol": self.protocol})
+
class Image:
"""Publish a channel Image"""
element_attrs = {}
+
def __init__(self, url, title, link,
- width = None, height = None, description = None):
+ width=None, height=None, description=None):
self.url = url
self.title = title
self.link = link
self.width = width
self.height = height
self.description = description
-
+
def publish(self, handler):
handler.startElement("image", self.element_attrs)
if isinstance(width, int):
width = IntElement("width", width)
_opt_element(handler, "width", width)
-
+
height = self.height
if isinstance(height, int):
height = IntElement("height", height)
handler.endElement("image")
+
class Guid:
"""Publish a guid
Defaults to being a permalink, which is the assumption if it's
omitted. Hence strings are always permalinks.
"""
- def __init__(self, guid, isPermaLink = 1):
+
+ def __init__(self, guid, isPermaLink=1):
self.guid = guid
self.isPermaLink = isPermaLink
+
def publish(self, handler):
d = {}
if self.isPermaLink:
d["isPermaLink"] = "false"
_element(handler, "guid", self.guid, d)
+
class TextInput:
"""Publish a textInput
Apparently this is rarely used.
"""
element_attrs = {}
+
def __init__(self, title, description, name, link):
self.title = title
self.description = description
_element(handler, "name", self.name)
_element(handler, "link", self.link)
handler.endElement("textInput")
-
+
class Enclosure:
"""Publish an enclosure"""
+
def __init__(self, url, length, type):
self.url = url
self.length = length
self.type = type
+
def publish(self, handler):
_element(handler, "enclosure", None,
{"url": self.url,
"length": str(self.length),
"type": self.type,
- })
+ })
+
class Source:
"""Publish the item's original source, used by aggregators"""
+
def __init__(self, name, url):
self.name = name
self.url = url
+
def publish(self, handler):
_element(handler, "source", self.name, {"url": self.url})
+
class SkipHours:
"""Publish the skipHours
This takes a list of hours, as integers.
"""
element_attrs = {}
+
def __init__(self, hours):
self.hours = hours
+
def publish(self, handler):
if self.hours:
handler.startElement("skipHours", self.element_attrs)
_element(handler, "hour", str(hour))
handler.endElement("skipHours")
+
class SkipDays:
"""Publish the skipDays
This takes a list of days as strings.
"""
element_attrs = {}
+
def __init__(self, days):
self.days = days
+
def publish(self, handler):
if self.days:
handler.startElement("skipDays", self.element_attrs)
_element(handler, "day", day)
handler.endElement("skipDays")
+
class RSS2(WriteXmlMixin):
"""The main RSS class.
Stores the channel attributes, with the "category" elements under
".categories" and the RSS items under ".items".
"""
-
+
rss_attrs = {"version": "2.0"}
element_attrs = {}
+
def __init__(self,
title,
link,
description,
- language = None,
- copyright = None,
- managingEditor = None,
- webMaster = None,
- pubDate = None, # a datetime, *in* *GMT*
- lastBuildDate = None, # a datetime
-
- categories = None, # list of strings or Category
- generator = _generator_name,
- docs = "http://blogs.law.harvard.edu/tech/rss",
- cloud = None, # a Cloud
- ttl = None, # integer number of minutes
-
- image = None, # an Image
- rating = None, # a string; I don't know how it's used
- textInput = None, # a TextInput
- skipHours = None, # a SkipHours with a list of integers
- skipDays = None, # a SkipDays with a list of strings
-
- items = None, # list of RSSItems
- ):
+ language=None,
+ copyright=None,
+ managingEditor=None,
+ webMaster=None,
+ pubDate=None, # a datetime, *in* *GMT*
+ lastBuildDate=None, # a datetime
+
+ categories=None, # list of strings or Category
+ generator=_generator_name,
+ docs="http://blogs.law.harvard.edu/tech/rss",
+ cloud=None, # a Cloud
+ ttl=None, # integer number of minutes
+
+ image=None, # an Image
+ rating=None, # a string; I don't know how it's used
+ textInput=None, # a TextInput
+ skipHours=None, # a SkipHours with a list of integers
+ skipDays=None, # a SkipDays with a list of strings
+
+ items=None, # list of RSSItems
+ ):
self.title = title
self.link = link
self.description = description
self.webMaster = webMaster
self.pubDate = pubDate
self.lastBuildDate = lastBuildDate
-
+
if categories is None:
categories = []
self.categories = categories
_element(handler, "description", self.description)
self.publish_extensions(handler)
-
+
_opt_element(handler, "language", self.language)
_opt_element(handler, "copyright", self.copyright)
_opt_element(handler, "managingEditor", self.managingEditor)
# output after the three required fields.
pass
-
-
+
class RSSItem(WriteXmlMixin):
"""Publish an RSS Item"""
element_attrs = {}
+
def __init__(self,
- title = None, # string
- link = None, # url as string
- description = None, # string
- author = None, # email address as string
- categories = None, # list of string or Category
- comments = None, # url as string
- enclosure = None, # an Enclosure
- guid = None, # a unique string
- pubDate = None, # a datetime
- source = None, # a Source
- ):
-
+ title=None, # string
+ link=None, # url as string
+ description=None, # string
+ author=None, # email address as string
+ categories=None, # list of string or Category
+ comments=None, # url as string
+ enclosure=None, # an Enclosure
+ guid=None, # a unique string
+ pubDate=None, # a datetime
+ source=None, # a Source
+ ):
+
if title is None and description is None:
raise TypeError(
"must define at least one of 'title' or 'description'")
if isinstance(category, basestring):
category = Category(category)
category.publish(handler)
-
+
_opt_element(handler, "comments", self.comments)
if self.enclosure is not None:
self.enclosure.publish(handler)
if self.source is not None:
self.source.publish(handler)
-
+
handler.endElement("item")
def publish_extensions(self, handler):
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
-# http://www.apache.org/licenses/LICENSE-2.0
+# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# limitations under the License.
-import os
import sys
# parse_qsl moved to urlparse module in v2.6
try:
- from urlparse import parse_qsl
+ from urlparse import parse_qsl
except:
- from cgi import parse_qsl
+ from cgi import parse_qsl
import oauth2 as oauth
REQUEST_TOKEN_URL = 'https://api.twitter.com/oauth/request_token'
-ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token'
+ACCESS_TOKEN_URL = 'https://api.twitter.com/oauth/access_token'
AUTHORIZATION_URL = 'https://api.twitter.com/oauth/authorize'
-SIGNIN_URL = 'https://api.twitter.com/oauth/authenticate'
+SIGNIN_URL = 'https://api.twitter.com/oauth/authenticate'
-consumer_key = 'ozs9cPS2ci6eYQzzMSTb4g'
+consumer_key = 'ozs9cPS2ci6eYQzzMSTb4g'
consumer_secret = '1kNEffHgGSXO2gMNTr8HRum5s2ofx3VQnJyfd0es'
if consumer_key is None or consumer_secret is None:
- print 'You need to edit this script and provide values for the'
- print 'consumer_key and also consumer_secret.'
- print ''
- print 'The values you need come from Twitter - you need to register'
- print 'as a developer your "application". This is needed only until'
- print 'Twitter finishes the idea they have of a way to allow open-source'
- print 'based libraries to have a token that can be used to generate a'
- print 'one-time use key that will allow the library to make the request'
- print 'on your behalf.'
- print ''
- sys.exit(1)
+ print 'You need to edit this script and provide values for the'
+ print 'consumer_key and also consumer_secret.'
+ print ''
+ print 'The values you need come from Twitter - you need to register'
+ print 'as a developer your "application". This is needed only until'
+ print 'Twitter finishes the idea they have of a way to allow open-source'
+ print 'based libraries to have a token that can be used to generate a'
+ print 'one-time use key that will allow the library to make the request'
+ print 'on your behalf.'
+ print ''
+ sys.exit(1)
signature_method_hmac_sha1 = oauth.SignatureMethod_HMAC_SHA1()
-oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
-oauth_client = oauth.Client(oauth_consumer)
+oauth_consumer = oauth.Consumer(key=consumer_key, secret=consumer_secret)
+oauth_client = oauth.Client(oauth_consumer)
print 'Requesting temp token from Twitter'
resp, content = oauth_client.request(REQUEST_TOKEN_URL, 'GET')
if resp['status'] != '200':
- print 'Invalid respond from Twitter requesting temp token: %s' % resp['status']
+ print 'Invalid respond from Twitter requesting temp token: %s' % resp['status']
else:
- request_token = dict(parse_qsl(content))
-
- print ''
- print 'Please visit this Twitter page and retrieve the pincode to be used'
- print 'in the next step to obtaining an Authentication Token:'
- print ''
- print '%s?oauth_token=%s' % (AUTHORIZATION_URL, request_token['oauth_token'])
- print ''
-
- pincode = raw_input('Pincode? ')
-
- token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret'])
- token.set_verifier(pincode)
-
- print ''
- print 'Generating and signing request for an access token'
- print ''
-
- oauth_client = oauth.Client(oauth_consumer, token)
- resp, content = oauth_client.request(ACCESS_TOKEN_URL, method='POST', body='oauth_verifier=%s' % pincode)
- access_token = dict(parse_qsl(content))
-
- if resp['status'] != '200':
- print 'The request for a Token did not succeed: %s' % resp['status']
- print access_token
- else:
- print 'Your Twitter Access Token key: %s' % access_token['oauth_token']
- print ' Access Token secret: %s' % access_token['oauth_token_secret']
+ request_token = dict(parse_qsl(content))
+
print ''
+ print 'Please visit this Twitter page and retrieve the pincode to be used'
+ print 'in the next step to obtaining an Authentication Token:'
+ print ''
+ print '%s?oauth_token=%s' % (AUTHORIZATION_URL, request_token['oauth_token'])
+ print ''
+
+ pincode = raw_input('Pincode? ')
+
+ token = oauth.Token(request_token['oauth_token'], request_token['oauth_token_secret'])
+ token.set_verifier(pincode)
+
+ print ''
+ print 'Generating and signing request for an access token'
+ print ''
+
+ oauth_client = oauth.Client(oauth_consumer, token)
+ resp, content = oauth_client.request(ACCESS_TOKEN_URL, method='POST', body='oauth_verifier=%s' % pincode)
+ access_token = dict(parse_qsl(content))
+
+ if resp['status'] != '200':
+ print 'The request for a Token did not succeed: %s' % resp['status']
+ print access_token
+ else:
+ print 'Your Twitter Access Token key: %s' % access_token['oauth_token']
+ print ' Access Token secret: %s' % access_token['oauth_token_secret']
+ print ''
def write_error(self, message):
self.logger.error(message)
+
class QueueLogger(Thread):
"""A queue-based logging object"""
-
+
def __init__(self, file, q):
Thread.__init__(self)
self.logger = Logger(file)
self.q = q
-
+
def run(self):
while True:
try:
if not isinstance(msg, dict):
self.logger.write_error(str(msg))
else:
- if not 'msg' in msg.keys():
+ if 'msg' not in msg.keys():
continue
-
+
if 'level' in msg.keys():
if msg['level'] == 'info':
self.logger.write_info(msg['msg'])
else:
self.logger.write_error(msg['msg'])
else:
- self.logger.write_error(msg['msg'])
+ self.logger.write_error(msg['msg'])
except:
pass
-
\ No newline at end of file
from mutagen import id3
from utils import *
-EasyID3.valid_keys["comment"]="COMM::'XXX'"
-EasyID3.valid_keys["copyright"]="TCOP::'XXX'"
-EasyID3.valid_keys["country"]="TXXX:COUNTRY:'XXX'"
-EasyID3.RegisterTXXXKey("country","COUNTRY")
+EasyID3.valid_keys["comment"] = "COMM::'XXX'"
+EasyID3.valid_keys["copyright"] = "TCOP::'XXX'"
+EasyID3.valid_keys["country"] = "TXXX:COUNTRY:'XXX'"
+EasyID3.RegisterTXXXKey("country", "COUNTRY")
+
class Mp3:
"""A MP3 file object"""
self.bitrate_default = '192'
self.cache_dir = os.sep + 'tmp'
self.keys2id3 = {'title': 'TIT2',
- 'artist': 'TPE1',
- 'album': 'TALB',
- 'date': 'TDRC',
- 'comment': 'COMM',
- 'country': 'COUNTRY',
- 'genre': 'TCON',
- 'copyright': 'TCOP',
- }
+ 'artist': 'TPE1',
+ 'album': 'TALB',
+ 'date': 'TDRC',
+ 'comment': 'COMM',
+ 'country': 'COUNTRY',
+ 'genre': 'TCON',
+ 'copyright': 'TCOP',
+ }
self.mp3 = MP3(self.media, ID3=EasyID3)
self.info = self.mp3.info
self.bitrate = int(str(self.info.bitrate)[:-3])
- self.length = datetime.timedelta(0,self.info.length)
+ self.length = datetime.timedelta(0, self.info.length)
try:
self.metadata = self.get_file_metadata()
except:
self.metadata = {'title': '',
- 'artist': '',
- 'album': '',
- 'date': '',
- 'comment': '',
- 'country': '',
- 'genre': '',
- 'copyright': '',
- }
+ 'artist': '',
+ 'album': '',
+ 'date': '',
+ 'comment': '',
+ 'country': '',
+ 'genre': '',
+ 'copyright': '',
+ }
self.description = self.get_description()
self.mime_type = self.get_mime_type()
self.file_ext = self.media_info[2]
self.extension = self.get_file_extension()
self.size = os.path.getsize(media)
- #self.args = self.get_args()
+ # self.args = self.get_args()
def get_format(self):
return 'MP3'
self.mp3.tags['TIT2'] = id3.TIT2(encoding=2, text=u'text')
self.mp3.save()
- #media_id3 = id3.ID3(self.media)
- #for tag in self.metadata.keys():
- #if tag in self.dub2id3_dict.keys():
- #frame_text = self.dub2id3_dict[tag]
- #value = self.metadata[tag]
- #frame = mutagen.id3.Frames[frame_text](3,value)
- #try:
- #media_id3.add(frame)
- #except:
- #raise IOError('ExporterError: cannot tag "'+tag+'"')
-
- #try:
- #media_id3.save()
- #except:
- #raise IOError('ExporterError: cannot write tags')
+ # media_id3 = id3.ID3(self.media)
+ # for tag in self.metadata.keys():
+ # if tag in self.dub2id3_dict.keys():
+ # frame_text = self.dub2id3_dict[tag]
+ # value = self.metadata[tag]
+ # frame = mutagen.id3.Frames[frame_text](3,value)
+ # try:
+ # media_id3.add(frame)
+ # except:
+ # raise IOError('ExporterError: cannot tag "'+tag+'"')
+
+ # try:
+ # media_id3.save()
+ # except:
+ # raise IOError('ExporterError: cannot write tags')
media = id3.ID3(self.media)
media.add(id3.TIT2(encoding=3, text=self.metadata['title'].decode('utf8')))
self.bitrate_default = '192'
self.cache_dir = os.sep + 'tmp'
self.keys2ogg = {'title': 'title',
- 'artist': 'artist',
- 'album': 'album',
- 'date': 'date',
- 'comment': 'comment',
- 'genre': 'genre',
- 'copyright': 'copyright',
- }
+ 'artist': 'artist',
+ 'album': 'album',
+ 'date': 'date',
+ 'comment': 'comment',
+ 'genre': 'genre',
+ 'copyright': 'copyright',
+ }
self.info = self.ogg.info
self.bitrate = int(str(self.info.bitrate)[:-3])
- self.length = datetime.timedelta(0,self.info.length)
+ self.length = datetime.timedelta(0, self.info.length)
self.metadata = self.get_file_metadata()
self.description = self.get_description()
self.mime_type = self.get_mime_type()
self.file_ext = self.media_info[2]
self.extension = self.get_file_extension()
self.size = os.path.getsize(media)
- #self.args = self.get_args()
+ # self.args = self.get_args()
def get_format(self):
return 'OGG'
def get_file_info(self):
try:
- file_out1, file_out2 = os.popen4('ogginfo "'+self.dest+'"')
+ file_out1, file_out2 = os.popen4('ogginfo "' + self.dest + '"')
info = []
for line in file_out2.readlines():
info.append(clean_word(line[:-1]))
except:
raise IOError('ExporterError: file does not exist.')
- def set_cache_dir(self,path):
- self.cache_dir = path
+ def set_cache_dir(self, path):
+ self.cache_dir = path
def get_file_metadata(self):
metadata = {}
def decode(self):
try:
- os.system('oggdec -o "'+self.cache_dir+os.sep+self.item_id+
- '.wav" "'+self.source+'"')
- return self.cache_dir+os.sep+self.item_id+'.wav'
+ os.system('oggdec -o "' + self.cache_dir + os.sep + self.item_id +
+ '.wav" "' + self.source + '"')
+ return self.cache_dir + os.sep + self.item_id + '.wav'
except:
raise IOError('ExporterError: decoder is not compatible.')
def write_tags(self):
- #self.ogg.add_tags()
+ # self.ogg.add_tags()
for tag in self.metadata.keys():
self.ogg[tag] = str(self.metadata[tag])
self.ogg.save()
- def get_args(self,options=None):
+ def get_args(self, options=None):
"""Get process options and return arguments for the encoder"""
args = []
if not options is None:
if not ('verbose' in self.options and self.options['verbose'] != '0'):
args.append('-Q ')
if 'ogg_bitrate' in self.options:
- args.append('-b '+self.options['ogg_bitrate'])
+ args.append('-b ' + self.options['ogg_bitrate'])
elif 'ogg_quality' in self.options:
- args.append('-q '+self.options['ogg_quality'])
+ args.append('-q ' + self.options['ogg_quality'])
else:
- args.append('-b '+self.bitrate_default)
+ args.append('-b ' + self.bitrate_default)
else:
- args.append('-Q -b '+self.bitrate_default)
+ args.append('-Q -b ' + self.bitrate_default)
for tag in self.metadata.keys():
value = clean_word(self.metadata[tag])
class OSCController(Thread):
-
def __init__(self, port):
Thread.__init__(self)
import liblo
+
self.port = port
try:
self.server = liblo.Server(self.port)
from threading import Thread
-class HTTPStreamer(Thread):
+class HTTPStreamer(Thread):
protocol = 'http'
host = str
port = str
def __init__(self):
Thread.__init__(self)
import pycurl
+
self.curl = pycurl.Curl()
def set_callback(self, read_callback):
def open(self):
import pycurl
- self.uri = self.protocol + '://' + self.host + ':' + str(self.port) + \
- self.mount + '?' + 'password=' + self.password
+
+ self.uri = self.protocol + '://' + self.host + ':' + str(self.port)
+ self.uri += self.mount + '?' + 'password=' + self.password
self.curl.setopt(pycurl.URL, self.uri)
self.curl.setopt(pycurl.UPLOAD, 1)
self.curl.setopt(pycurl.READFUNCTION, self.read_callback)
class Twitter:
-
def __init__(self, key, secret):
import twitter
+
self.consumer_key = DEEFUZZER_CONSUMER_KEY
self.consumer_secret = DEEFUZZER_CONSUMER_SECRET
self.access_token_key = key
from itertools import chain
from deefuzzer.tools import *
-mimetypes.add_type('application/x-yaml','.yaml')
+mimetypes.add_type('application/x-yaml', '.yaml')
-def clean_word(word) :
+
+def clean_word(word):
""" Return the word without excessive blank spaces, underscores and
characters causing problem to exporters"""
- word = re.sub("^[^\w]+","",word) #trim the beginning
- word = re.sub("[^\w]+$","",word) #trim the end
- word = re.sub("_+","_",word) #squeeze continuous _ to one _
- word = re.sub("^[^\w]+","",word) #trim the beginning _
- #word = string.replace(word,' ','_')
- #word = string.capitalize(word)
+ word = re.sub("^[^\w]+", "", word) # trim the beginning
+ word = re.sub("[^\w]+$", "", word) # trim the end
+ word = re.sub("_+", "_", word) # squeeze continuous _ to one _
+ word = re.sub("^[^\w]+", "", word) # trim the beginning _
+ # word = string.replace(word,' ','_')
+ # word = string.capitalize(word)
dict = '&[];"*:,'
for letter in dict:
- word = string.replace(word,letter,'_')
+ word = string.replace(word, letter, '_')
return word
+
def get_file_info(media):
file_name = media.split(os.sep)[-1]
file_title = file_name.split('.')[:-1]
file_ext = file_name.split('.')[-1]
return file_name, file_title, file_ext
+
def is_absolute_path(path):
return os.sep == path[0]
+
def merge_defaults(setting, default):
combined = {}
for key in set(chain(setting, default)):
combined[key] = default[key]
return combined
+
def replace_all(option, repl):
if isinstance(option, list):
r = []
elif isinstance(option, str):
r = option
for key in repl.keys():
- r = r.replace('[' + key + ']', repl[key])
+ r = r.replace('[' + key + ']', repl[key])
return r
return option
+
def get_conf_dict(file):
mime_type = mimetypes.guess_type(file)[0]
# Do the type check first, so we don't load huge files that won't be used
if 'xml' in mime_type:
- confile = open(file,'r')
+ confile = open(file, 'r')
data = confile.read()
confile.close()
- return xmltodict(data,'utf-8')
+ return xmltodict(data, 'utf-8')
elif 'yaml' in mime_type:
import yaml
+
def custom_str_constructor(loader, node):
return loader.construct_scalar(node).encode('utf-8')
+
yaml.add_constructor(u'tag:yaml.org,2002:str', custom_str_constructor)
- confile = open(file,'r')
+ confile = open(file, 'r')
data = confile.read()
confile.close()
return yaml.load(data)
elif 'json' in mime_type:
import json
- confile = open(file,'r')
+
+ confile = open(file, 'r')
data = confile.read()
confile.close()
return json.loads(data)
return False
+
def folder_contains_music(folder):
files = os.listdir(folder)
for file in files:
-
-import os, sys, yaml
+import os
+import sys
+import yaml
from deefuzzer.core import DeeFuzzer
+
path = sys.argv[-1]
d = DeeFuzzer(path)
name, ext = os.path.splitext(path)
# containing real tags opposed to just text.
for childnode in dom.childNodes:
if childnode.nodeName != "#text" and \
- childnode.nodeName != "#cdata-section":
+ childnode.nodeName != "#cdata-section":
return True
return False
+
def indexchilds(dom, enc):
childsdict = dict()
for childnode in dom.childNodes:
childsdict[name] = v
return childsdict
+
def xmltodict(data, enc=None):
dom = xml.dom.minidom.parseString(data.strip())
return indexchilds(dom, enc)