logger = None
m3u = None
rss = None
- station = []
- stations = []
+ station_settings = []
+ station_instances = {}
+ watchfolder = {}
+ logqueue = Queue.Queue()
def __init__(self, conf_file):
Thread.__init__(self)
log_dir = os.sep.join(log_file.split(os.sep)[:-1])
if not os.path.exists(log_dir) and log_dir:
os.makedirs(log_dir)
- self.logger = Logger(log_file)
+ self.logger = QueueLogger(log_file, self.logqueue)
+ self.logger.start()
for key in self.conf['deefuzzer'].keys():
if key == 'm3u':
elif key == 'stationfolder':
# Create stations automagically from a folder structure
if isinstance(self.conf['deefuzzer'][key], dict):
- self.create_stations_fromfolder(self.conf['deefuzzer'][key])
+ self.watchfolder = self.conf['deefuzzer'][key]
else:
setattr(self, key, self.conf['deefuzzer'][key])
# Set the deefuzzer logger
- self.logger.write_info('Starting DeeFuzzer')
- self.logger.write_info('Using libshout version %s' % shout.version())
- self.logger.write_info('Number of stations : ' + str(len(self.station)))
+ self._info('Starting DeeFuzzer')
+ self._info('Using libshout version %s' % shout.version())
+ self._info('Number of stations : ' + str(len(self.station_settings)))
+ def _log(self, level, msg):
+ try:
+ obj = {}
+ obj['msg'] = 'Core: ' + str(msg)
+ obj['level'] = level
+ self.logqueue.put(obj)
+ except:
+ pass
+
+ def _info(self, msg):
+ self._log('info', msg)
+
+ def _err(self, msg):
+ self._log('err', msg)
+
def set_m3u_playlist(self):
m3u_dir = os.sep.join(self.m3u.split(os.sep)[:-1])
if not os.path.exists(m3u_dir) and m3u_dir:
os.makedirs(m3u_dir)
m3u = open(self.m3u, 'w')
m3u.write('#EXTM3U\n')
- for s in self.stations:
+ for k in self.station_instances.keys():
+ s = self.station_instances[k]
m3u.write('#EXTINF:%s,%s - %s\n' % ('-1',s.short_name, s.channel.name))
m3u.write('http://' + s.channel.host + ':' + str(s.channel.port) + s.channel.mount + '\n')
m3u.close()
- self.logger.write_info('Writing M3U file to : ' + self.m3u)
+ self._info('Writing M3U file to : ' + self.m3u)
- def create_stations_fromfolder(self, options):
+ def create_stations_fromfolder(self):
"""Scan a folder for subfolders containing media, and make stations from them all."""
+ options = self.watchfolder
if not 'folder' in options.keys():
# We have no folder specified. Bail.
return
# The specified path is not a folder. Bail.
return
- self.logger.write_info('Scanning folder ' + folder + ' for stations')
+ # This makes the log file a lot more verbose. Commented out since we report on new stations anyway.
+ # self._info('Scanning folder ' + folder + ' for stations')
files = os.listdir(folder)
for file in files:
if folder_contains_music(filepath):
self.create_station(filepath, options)
+ def station_exists(self, name):
+ try:
+ for s in self.station_settings:
+ if not 'infos' in s.keys():
+ continue
+ if not 'short_name' in s['infos'].keys():
+ continue
+ if s['infos']['short_name'] == name:
+ return True
+ return False
+ except:
+ pass
+ return True
+
def create_station(self, folder, options):
"""Create a station definition for a folder given the specified options."""
- self.logger.write_info('Creating station for folder ' + folder)
s = {}
path, name = os.path.split(folder)
+ if self.station_exists(name):
+ return
+ self._info('Creating station for folder ' + folder)
d = dict(path=folder,name=name)
for i in options.keys():
if not 'folder' in i:
# Whatever we have, it's not either a file or folder. Bail.
return
- self.logger.write_info('Loading station config files in ' + folder)
+ self._info('Loading station config files in ' + folder)
files = os.listdir(folder)
for file in files:
filepath = os.path.join(folder, file)
def load_station_config(self, file):
"""Load station configuration(s) from a config file."""
- self.logger.write_info('Loading station config file ' + file)
+ self._info('Loading station config file ' + file)
stationdef = get_conf_dict(file)
if isinstance(stationdef, dict):
if 'station' in stationdef.keys():
try:
# We should probably test to see if we're putting the same station in multiple times
# Same in this case probably means the same media folder, server, and mountpoint
- self.station.append(this_station)
+ self.station_settings.append(this_station)
except Exception:
return
def run(self):
q = Queue.Queue(1)
-
- ns = len(self.station)
- for i in range(0, ns):
- try:
- station = self.station[i]
-
- # Apply station defaults if they exist
- if 'stationdefaults' in self.conf['deefuzzer']:
- if isinstance(self.conf['deefuzzer']['stationdefaults'], dict):
- station = merge_defaults(station, self.conf['deefuzzer']['stationdefaults'])
- self.stations.append(Station(station, q, self.logger, self.m3u))
- except Exception:
- name = str(i)
- if 'info' in station.keys():
- if 'short_name' in station['infos']:
- name = station['infos']['short_name']
- self.logger.write_error('Error starting station ' + name)
- continue
-
- if self.m3u:
- self.set_m3u_playlist()
-
+ ns = -1
p = Producer(q)
p.start()
-
- ns = len(self.stations)
- # Start the Stations
- for i in range(0, ns):
- try:
- self.stations[i].start()
- except Exception:
- continue
+ started = False
+ # Keep the Stations running
+ while True:
+ self.create_stations_fromfolder()
+ ns_new = len(self.station_settings)
+ if(ns_new > ns):
+ for i in range(ns+1, ns_new):
+ try:
+ station = self.station_settings[i]
+
+ # Apply station defaults if they exist
+ if 'stationdefaults' in self.conf['deefuzzer']:
+ if isinstance(self.conf['deefuzzer']['stationdefaults'], dict):
+ station = merge_defaults(station, self.conf['deefuzzer']['stationdefaults'])
+
+ name = 'Station ' + str(i)
+ if 'info' in station.keys():
+ if 'short_name' in station['infos']:
+ name = station['infos']['short_name']
+ y = 1
+ while name in self.station_instances.keys():
+ y = y + 1
+ name = station['infos']['short_name'] + " " + str(y)
+
+ self.station_instances[name] = Station(station, q, self.logqueue, self.m3u)
+ except Exception:
+ self._err('Error starting station ' + name)
+ continue
+ ns = ns_new
+
+ if self.m3u:
+ self.set_m3u_playlist()
+
+ for i in self.station_instances.keys():
+ try:
+ if not self.station_instances[i].isAlive():
+ self.station_instances[i].start()
+ msg = 'Started '
+ if started:
+ msg = 'Restarted '
+ self._info(msg + 'station ' + i)
+ except:
+ pass
+
+ started = False
+ time.sleep(5)
+ # end main loop
class Producer(Thread):
- """a DeeFuzzer Producer master thread"""
+ """a DeeFuzzer Producer master thread. Used for locking/blocking"""
def __init__(self, q):
Thread.__init__(self)
feeds_playlist = 1
feeds_showfilepath = 0
feeds_showfilename = 0
+ short_name = ''
- def __init__(self, station, q, logger, m3u):
+ def __init__(self, station, q, logqueue, m3u):
Thread.__init__(self)
self.station = station
self.q = q
- self.logger = logger
+ self.logqueue = logqueue
self.m3u = m3u
if 'station_dir' in self.station:
self.feeds_playlist_file = self.base_name + '_playlist'
# Logging
- self._info('Opening ' + self.short_name + ' - ' + self.channel.name + \
- ' (' + str(self.lp) + ' tracks)...')
+ self._info('Opening ' + self.short_name + ' - ' + self.channel.name)
self.metadata_relative_dir = 'metadata'
self.metadata_url = self.channel.url + '/rss/' + self.metadata_relative_dir
if self.record_mode:
self.record_callback('/record', [1])
+ def _log(self, level, msg):
+ try:
+ obj = {}
+ obj['msg'] = 'Station ' + str(self.channel_url) + ': ' + str(msg)
+ obj['level'] = str(level)
+ self.logqueue.put(obj)
+ except:
+ pass
+
def _info(self, msg):
- self.logger.write_info('Station ' + self.channel_url + ': ' + str(msg))
+ self._log('info', msg)
def _err(self, msg):
- self.logger.write_error('Station ' + self.channel_url + ': ' + str(msg))
-
+ self._log('err', msg)
+
def run_callback(self, path, value):
value = value[0]
self.run_mode = value
file_list.append(root + os.sep + file)
file_list.sort()
else:
- f = open(self.m3u_playlist_file, 'r')
- for path in f.readlines():
- if '#' != path[0]:
- file_list.append(path[:-1])
+ self.q.get(1)
+ try:
+ f = open(self.m3u_playlist_file, 'r')
+ try:
+ for path in f.readlines():
+ if '#' != path[0]:
+ file_list.append(path[:-1])
+ except:
+ f.close()
+ except:
+ pass
+ self.q.task_done()
return file_list
def get_jingles(self):
if self.shuffle_mode:
random.shuffle(self.playlist)
- self._info('generating new playlist (' + str(self.lp) + ' tracks)')
+ self._info('Generating new playlist (' + str(self.lp) + ' tracks)')
if self.feeds_playlist:
self.update_feeds(self.media_to_objs(self.playlist), self.feeds_playlist_file, '(playlist)')
else:
mess = 'No media in media_dir !'
self._err(mess)
- sys.exit(mess)
+ self.run_mode = 0
def media_to_objs(self, media_list):
media_objs = []
for media in media_list:
file_name, file_title, file_ext = get_file_info(media)
- if file_ext.lower() == 'mp3' or mimetypes.guess_type(media)[0] == 'audio/mpeg':
- try:
- file_meta = Mp3(media)
- except:
- continue
- elif file_ext.lower() == 'ogg' or mimetypes.guess_type(media)[0] == 'audio/ogg':
- try:
- file_meta = Ogg(media)
- except:
- continue
- elif file_ext.lower() == 'webm' or mimetypes.guess_type(media)[0] == 'video/webm':
- try:
- file_meta = WebM(media)
- except:
- continue
- if self.feeds_showfilename:
- file_meta.metadata['filename'] = file_name.decode( "utf-8" ) #decode needed for some weird filenames
- if self.feeds_showfilepath:
- file_meta.metadata['filepath'] = media.decode( "utf-8" ) #decode needed for some weird filenames
+ self.q.get(1)
+ try:
+ if file_ext.lower() == 'mp3' or mimetypes.guess_type(media)[0] == 'audio/mpeg':
+ try:
+ file_meta = Mp3(media)
+ except:
+ continue
+ elif file_ext.lower() == 'ogg' or mimetypes.guess_type(media)[0] == 'audio/ogg':
+ try:
+ file_meta = Ogg(media)
+ except:
+ continue
+ elif file_ext.lower() == 'webm' or mimetypes.guess_type(media)[0] == 'video/webm':
+ try:
+ file_meta = WebM(media)
+ except:
+ continue
+ if self.feeds_showfilename:
+ file_meta.metadata['filename'] = file_name.decode( "utf-8" ) #decode needed for some weird filenames
+ if self.feeds_showfilepath:
+ file_meta.metadata['filepath'] = media.decode( "utf-8" ) #decode needed for some weird filenames
+ except:
+ pass
+ self.q.task_done()
media_objs.append(file_meta)
return media_objs
)
json_data.append(json_item)
- rss = RSS2(title = channel_subtitle,
- link = self.channel.url,
- description = self.channel.description.decode('utf-8'),
- lastBuildDate = date_now,
+ rss = RSS2(title = channel_subtitle, \
+ link = self.channel.url, \
+ description = self.channel.description.decode('utf-8'), \
+ lastBuildDate = date_now, \
items = rss_item_list,)
-
- if self.feeds_rss:
- f = open(rss_file + '.xml', 'w')
- rss.write_xml(f, 'utf-8')
- f.close()
-
- if self.feeds_json:
- f = open(rss_file + '.json', 'w')
- f.write(json.dumps(json_data, separators=(',',':')))
- f.close()
+ self.q.get(1)
+ try:
+ if self.feeds_rss:
+ f = open(rss_file + '.xml', 'w')
+ rss.write_xml(f, 'utf-8')
+ f.close()
+
+ if self.feeds_json:
+ f = open(rss_file + '.json', 'w')
+ f.write(json.dumps(json_data, separators=(',',':')))
+ f.close()
+ except:
+ pass
+ self.q.task_done()
def update_twitter(self, message):
try:
% (self.id, self.current_media_obj[0].file_name))
self.player.set_media(self.media)
- if self.player_mode:
- self.stream = self.player.file_read_slow()
- else:
- self.stream = self.player.file_read_fast()
+ self.q.get(1)
+ try:
+ if self.player_mode:
+ self.stream = self.player.file_read_slow()
+ else:
+ self.stream = self.player.file_read_fast()
+ except:
+ pass
+ self.q.task_done()
def set_webm_read_mode(self):
self.channel.set_callback(FileReader(self.media).read_callback)
while not self.server_ping:
try:
- self.q.get(1)
server = urllib.urlopen(self.server_url)
self.server_ping = True
- text = 'channel available'
- self._info(text)
- self.q.task_done()
+ self._info('Channel available.')
except:
time.sleep(1)
if log:
- text = 'could not connect the channel'
- self._err(text)
+ self._err('Could not connect the channel. Waiting for channel to become available.')
log = False
- self.q.task_done()
def icecastloop_nextmedia(self):
- self.q.get(1)
- self.next_media = 0
- self.media = self.get_next_media()
- self.counter += 1
- if self.relay_mode:
- self.set_relay_mode()
- elif os.path.exists(self.media) and not os.sep+'.' in self.media:
- if self.lp == 0:
- self._err('has no media to stream !')
- return False
- self.set_read_mode()
- self.q.task_done()
-
- return True
+ try:
+ self.next_media = 0
+ self.media = self.get_next_media()
+ self.counter += 1
+ if self.relay_mode:
+ self.set_relay_mode()
+ elif os.path.exists(self.media) and not os.sep+'.' in self.media:
+ if self.lp == 0:
+ self._err('has no media to stream !')
+ return False
+ self.set_read_mode()
+
+ return True
+ except Exception, e:
+ self_err('icecastloop_nextmedia: Error: ' + str(e))
+ return False
def icecastloop_metadata(self):
- self.q.get(1)
- if (not (self.jingles_mode and (self.counter % 2)) or \
- self.relay_mode) and self.twitter_mode:
- self.update_twitter_current()
- self.channel.set_metadata({'song': self.song, 'charset': 'utf-8'})
- self.q.task_done()
- return True
+ try:
+ if (not (self.jingles_mode and (self.counter % 2)) or \
+ self.relay_mode) and self.twitter_mode:
+ self.update_twitter_current()
+ self.channel.set_metadata({'song': self.song, 'charset': 'utf-8'})
+ return True
+ except Exception, e:
+ self_err('icecastloop_metadata: Error: ' + str(e))
+ return False
-
def run(self):
- self.q.get(1)
self.ping_server()
- self.q.task_done()
if self.type == 'stream-m':
- self.q.get(1)
if self.relay_mode:
self.set_relay_mode()
else:
self.set_webm_read_mode()
self.channel_open()
self.channel.start()
- self.q.task_done()
if self.type == 'icecast':
- self.q.get(1)
self.channel_open()
self._info('channel connected')
- self.q.task_done()
while self.run_mode:
- try:
- if not self.icecastloop_nextmedia():
- break
- if not self.icecastloop_metadata():
+ if not self.icecastloop_nextmedia():
+ self._info('Something wrong happened in icecastloop_nextmedia. Ending.')
+ break
+
+ self.icecastloop_metadata()
+
+ # TEST MODE: Jump thru only the first chunk of each file
+ # first = True
+ for self.chunk in self.stream:
+ # if first:
+ # first = False
+ # else:
+ # break
+
+ if self.next_media or not self.run_mode:
break
-
- for self.chunk in self.stream:
+
+ if self.record_mode:
try:
- if self.next_media or not self.run_mode:
- break
+ # Record the chunk
+ self.recorder.write(self.chunk)
+ except:
+ self._err('could not write the buffer to the file')
+ continue
+ try:
+ # Send the chunk to the stream
+ self.channel.send(self.chunk)
+ self.channel.sync()
+ except:
+ self._err('could not send the buffer')
+
+ try:
+ self.channel.close()
+ self._info('channel closed')
+ except:
+ self._err('could not close the channel')
+ self.q.task_done()
+ continue
+
+ try:
+ self.ping_server()
+ self.channel_open()
+ self.channel.set_metadata({'song': self.song, 'charset': 'utf8',})
+ self._info('channel restarted')
+ except:
+ self._err('could not restart the channel')
if self.record_mode:
- try:
- self.q.get(1)
- self.recorder.write(self.chunk)
- self.q.task_done()
- except:
- self._err('could not write the buffer to the file')
- self.q.task_done()
- continue
-
+ self.recorder.close()
+ return
+
+ try:
+ self.channel.send(self.chunk)
+ self.channel.sync()
+ except:
+ self._err('could not send data after restarting the channel')
try:
- self.q.get(1)
- self.channel.send(self.chunk)
- self.channel.sync()
- self.q.task_done()
+ self.channel.close()
except:
- self._err('could not send the buffer')
- self.q.task_done()
+ self._err('could not close the channel')
- try:
- self.q.get(1)
- self.channel.close()
- self._info('channel closed')
- self.q.task_done()
- except:
- self._err('could not close the channel')
- self.q.task_done()
- continue
-
- try:
- self.ping_server()
- self.q.get(1)
- self.channel_open()
- self.channel.set_metadata({'song': self.song, 'charset': 'utf8',})
- self._info('channel restarted')
- self.q.task_done()
- except:
- self._err('could not restart the channel')
- self.q.task_done()
- continue
- continue
-
- except: # send chunk loop exception
- continue
- # send chunk loop end
-
- except: # while run_mode exception
- continue
+ if self.record_mode:
+ self.recorder.close()
+ return
+
+ # send chunk loop end
# while run_mode loop end
+ self._info("Play mode ended. Stopping stream.")
+
if self.record_mode:
self.recorder.close()