From a3ed58aa79e70a35471d59382efd89cc30912941 Mon Sep 17 00:00:00 2001 From: Guillaume Pellerin Date: Sat, 11 Apr 2009 00:03:06 +0000 Subject: [PATCH] simplify the solo station, better media object management, get 32 stations in deefuzz-mt ! :) --- deefuzz-deamon | 10 + deefuzz-mt.py | 405 +++++++++++++++++++++++++++++++++++++ deefuzz.py | 103 +++------- example/deefuzz_xml_gen.py | 21 ++ 4 files changed, 462 insertions(+), 77 deletions(-) create mode 100644 deefuzz-deamon create mode 100755 deefuzz-mt.py create mode 100755 example/deefuzz_xml_gen.py diff --git a/deefuzz-deamon b/deefuzz-deamon new file mode 100644 index 0000000..6d1c0cd --- /dev/null +++ b/deefuzz-deamon @@ -0,0 +1,10 @@ +#!/bin/bash + +conf_file=$1 +log_file=/tmp/deefuzz.log + +ulimit -c unlimited +while true; do + deefuzz $1 >> $log_file + sleep 3 +done diff --git a/deefuzz-mt.py b/deefuzz-mt.py new file mode 100755 index 0000000..c197fc0 --- /dev/null +++ b/deefuzz-mt.py @@ -0,0 +1,405 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright Guillaume Pellerin (2006-2009) + +# + +# This software is a computer program whose purpose is to stream audio +# and video data through icecast2 servers. + +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". + +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. + +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. + +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +# Author: Guillaume Pellerin + +import os +import sys +import time +import datetime +import string +import random +import Queue +import shout +import subprocess +from tools import * +from threading import Thread + +version = '0.3' +year = datetime.datetime.now().strftime("%Y") + + +def prog_info(): + desc = '\n deefuzz : easy and light streaming tool\n' + ver = ' version : %s \n\n' % (version) + info = """ Copyright (c) 2007-%s Guillaume Pellerin + All rights reserved. + + This software is licensed as described in the file COPYING, which + you should have received as part of this distribution. The terms + are also available at http://svn.parisson.org/d-fuzz/DeeFuzzLicense + + depends : python, python-xml, python-shout, libshout3, icecast2 + recommends : python-mutagen + provides : python-shout + + Usage : deefuzz $1 + where $1 is the path for a XML config file + ex: deefuzz example/myfuzz.xml + + see http://svn.parisson.org/deefuzz/ for more details + """ % (year) + text = desc + ver + info + return text + + +class DeeFuzzError: + """The DeeFuzz main error class""" + def __init__(self, message, command, subprocess): + self.message = message + self.command = str(command) + self.subprocess = subprocess + + def __str__(self): + if self.subprocess.stderr != None: + error = self.subprocess.stderr.read() + else: + error = '' + return "%s ; command: %s; error: %s" % (self.message, + self.command, + error) + + +class DeeFuzz: + """A DeeFuzz diffuser""" + + def __init__(self, conf_file): + self.conf_file = conf_file + self.conf = self.get_conf_dict() + self.buffer_size = 1024 + + def get_conf_dict(self): + confile = open(self.conf_file,'r') + conf_xml = confile.read() + confile.close() + dict = xmltodict(conf_xml,'utf-8') + return dict + + def get_station_names(self): + return self.conf['station']['name'] + + def play(self): + if isinstance(self.conf['deefuzz']['station'], dict): + # Fix wrong type data from xmltodict when one station (*) + nb_stations = 1 + else: + nb_stations = len(self.conf['deefuzz']['station']) + print 'Number of stations : ' + str(nb_stations) + + # Create a Queue + q = Queue.Queue(0) + + # Create a Producer + p = Producer(q) + p.start() + + print 'Buffer size per station = ' + str(self.buffer_size) + + s = [] + for i in range(0,nb_stations): + if isinstance(self.conf['deefuzz']['station'], dict): + station = self.conf['deefuzz']['station'] + else: + station = self.conf['deefuzz']['station'][i] + name = station['infos']['name'] + # Create a Station + s.append(Station(station, q, self.buffer_size)) + + for i in range(0,nb_stations): + # Start the Stations + s[i].start() + + +class Producer(Thread): + """a DeeFuzz Producer master thread""" + + def __init__(self, q): + Thread.__init__(self) + self.q = q + + def run(self): + i=0 + while 1: + #print "Producer produced one queue step: "+str(i) + self.q.put(i,1) + i+=1 + + +class Station(Thread): + """a DeeFuzz Station shouting slave thread""" + + def __init__(self, station, q, buffer_size): + Thread.__init__(self) + self.station = station + self.q = q + self.buffer_size = buffer_size + self.channel = shout.Shout() + self.id = 999999 + self.counter = 0 + self.rand_list = [] + self.command = 'cat ' + # Media + self.media_dir = self.station['media']['dir'] + self.channel.format = self.station['media']['format'] + self.mode_shuffle = int(self.station['media']['shuffle']) + self.bitrate = self.station['media']['bitrate'] + self.ogg_quality = self.station['media']['ogg_quality'] + self.samplerate = self.station['media']['samplerate'] + self.voices = self.station['media']['voices'] + # Infos + self.short_name = self.station['infos']['short_name'] + self.channel.name = self.station['infos']['name'] + self.channel.genre = self.station['infos']['genre'] + self.channel.description = self.station['infos']['description'] + self.channel.url = self.station['infos']['url'] + self.rss_dir = os.sep + 'tmp' + os.sep + 'rss' + self.rss_current_file = self.rss_dir + os.sep + self.short_name + '_current.xml' + self.rss_playlist_file = self.rss_dir + os.sep + self.short_name + '_playlist.xml' + self.media_url_dir = '/media/' + # Server + self.channel.protocol = 'http' # | 'xaudiocast' | 'icy' + self.channel.host = self.station['server']['host'] + self.channel.port = int(self.station['server']['port']) + self.channel.user = 'source' + self.channel.password = self.station['server']['sourcepassword'] + self.channel.mount = '/' + self.short_name + '.' + self.channel.format + self.channel.public = int(self.station['server']['public']) + self.channel.audio_info = { 'SHOUT_AI_BITRATE': self.bitrate, + 'SHOUT_AI_SAMPLERATE': self.samplerate, + 'SHOUT_AI_QUALITY': self.ogg_quality, + 'SHOUT_AI_CHANNELS': self.voices, + } + self.playlist = self.get_playlist() + #print self.playlist + self.lp = len(self.playlist) + self.channel.open() + print 'Opening ' + self.short_name + ' - ' + self.channel.name + \ + ' (' + str(self.lp) + ' tracks)...' + #time.sleep(0.1) + + def update_rss(self, media_list, rss_file): + rss_item_list = [] + if not os.path.exists(self.rss_dir): + os.makedirs(self.rss_dir) + + if len(media_list) == 1: + sub_title = '(currently playing)' + else: + sub_title = '(playlist)' + + for media in media_list: + media_link = self.channel.url + self.media_url_dir + media.file_name + media_description = '' + for key in media.metadata.keys(): + if media.metadata[key] != '': + media_description += '' % \ + (key.capitalize(), media.metadata[key]) + media_description += '
%s: %s
' + media_stats = os.stat(media.media) + media_date = time.localtime(media_stats[8]) + media_date = time.strftime("%a, %d %b %Y %H:%M:%S +0000", media_date) + + rss_item_list.append(PyRSS2Gen.RSSItem( + title = media.metadata['artist'] + ' : ' + media.metadata['title'], + link = media_link, + description = media_description, + enclosure = PyRSS2Gen.Enclosure(media_link, str(media.size), 'audio/mpeg'), + guid = PyRSS2Gen.Guid(media_link), + pubDate = media_date,) + ) + + rss = PyRSS2Gen.RSS2(title = self.channel.name + ' ' + sub_title, + link = self.channel.url, + description = self.channel.description, + lastBuildDate = datetime.datetime.now(), + items = rss_item_list,) + + f = open(rss_file, 'w') + rss.write_xml(f) + f.close() + + def get_playlist(self): + file_list = [] + for root, dirs, files in os.walk(self.media_dir): + for file in files: + s = file.split('.') + ext = s[len(s)-1] + if ext.lower() == self.channel.format and not '/.' in file: + file_list.append(root + os.sep + file) + return file_list + + def get_next_media_lin(self): + self.lp = len(self.playlist) + if self.id >= (self.lp - 1): + self.playlist = self.get_playlist() + self.id = 0 + self.lp = len(self.playlist) + self.update_rss(self.playlist_to_objs(), self.rss_playlist_file) + else: + self.id = self.id + 1 + return self.playlist[self.id] + + def get_next_media_rand(self): + self.lp = len(self.playlist) + if self.id >= (self.lp - 1): + self.playlist = self.get_playlist() + lp_new = len(self.playlist) + if lp_new != self.lp or self.counter == 0: + self.rand_list = range(0,lp_new) + random.shuffle(self.rand_list) + self.id = 0 + self.lp = len(self.playlist) + self.update_rss(self.playlist_to_objs(), self.rss_playlist_file) + else: + self.id = self.id + 1 + index = self.rand_list[self.id] + return self.playlist[index] + + def log_queue(self, it): + print 'Station ' + self.short_name + ' eated one queue step: '+str(it) + + def playlist_to_objs(self): + media_objs = [] + for media in self.playlist: + file_name, file_title, file_ext = self.get_file_info(media) + if file_ext.lower() == 'mp3': + media_objs.append(Mp3(media)) + elif file_ext.lower() == 'ogg': + media_objs.append(Ogg(media)) + return media_objs + + def get_file_info(self, media): + file_name = media.split(os.sep)[-1] + file_title = file_name.split('.')[-2] + file_ext = file_name.split('.')[-1] + return file_name, file_title, file_ext + + def core_process_stream(self, media): + """Read media and stream data through a generator. + Taken from Telemeta (see http://telemeta.org)""" + + command = self.command + '"' + media + '"' + __chunk = 0 + + try: + proc = subprocess.Popen(command, + shell = True, + bufsize = self.buffer_size, + stdin = subprocess.PIPE, + stdout = subprocess.PIPE, + close_fds = True) + except: + raise DeeFuzzError('Command failure:', command, proc) + + # Core processing + while True: + __chunk = proc.stdout.read(self.buffer_size) + status = proc.poll() + if status != None and status != 0: + raise DeeFuzzError('Command failure:', command, proc) + if len(__chunk) == 0: + break + yield __chunk + + def core_process_read(self, media): + """Read media and stream data through a generator. + Taken from Telemeta (see http://telemeta.org)""" + + __chunk = 0 + m = open(media, 'r') + # Core processing + while True: + __chunk = m.read(self.buffer_size) + if len(__chunk) == 0: + break + yield __chunk + m.close() + + + def run(self): + __chunk = 0 + + while True: + it = self.q.get(1) + if self.lp == 0: + break + if self.mode_shuffle == 1: + media = self.get_next_media_rand() + else: + media = self.get_next_media_lin() + self.counter += 1 + self.lp = len(self.playlist) + file_name, file_title, file_ext = self.get_file_info(media) + if file_ext.lower() == 'mp3': + media_obj = Mp3(media) + elif file_ext.lower() == 'ogg': + media_obj = Ogg(media) + + self.q.task_done() + #self.log_queue(it) + + if os.path.exists(media) and not os.sep+'.' in media: + it = self.q.get(1) + title = media_obj.metadata['title'] + self.channel.set_metadata({'song': str(title)}) + self.update_rss([media_obj], self.rss_current_file) + print 'DeeFuzzing this file on %s : id = %s, name = %s' % (self.short_name, self.id, file_name) + stream = self.core_process_read(media) + self.q.task_done() + #self.log_queue(it) + + for __chunk in stream: + it = self.q.get(1) + self.channel.send(__chunk) + self.channel.sync() + self.q.task_done() + #self.log_queue(it) + + self.channel.close() + + +def main(): + if len(sys.argv) == 2: + print "DeeFuzz v"+version + print "Using libshout version %s" % shout.version() + d = DeeFuzz(sys.argv[1]) + d.play() + else: + text = prog_info() + sys.exit(text) + +if __name__ == '__main__': + main() diff --git a/deefuzz.py b/deefuzz.py index ebd5001..002820f 100755 --- a/deefuzz.py +++ b/deefuzz.py @@ -46,7 +46,6 @@ import Queue import shout import subprocess from tools import * -from threading import Thread version = '0.3' year = datetime.datetime.now().strftime("%Y") @@ -99,7 +98,6 @@ class DeeFuzz: def __init__(self, conf_file): self.conf_file = conf_file self.conf = self.get_conf_dict() - #print self.conf def get_conf_dict(self): confile = open(self.conf_file,'r') @@ -111,7 +109,7 @@ class DeeFuzz: def get_station_names(self): return self.conf['station']['name'] - def play(self): + def run(self): if isinstance(self.conf['deefuzz']['station'], dict): # Fix wrong type data from xmltodict when one station (*) nb_stations = 1 @@ -119,54 +117,25 @@ class DeeFuzz: nb_stations = len(self.conf['deefuzz']['station']) print 'Number of stations : ' + str(nb_stations) - # Create a Queue - q = Queue.Queue(0) - - # Create a Producer - p = Producer(q) - p.start() + if nb_stations > 1: + print "You are trying to start multiple stations at the same time..." + print "Please deefuzz-mt.py for that !" # Define the buffer_size - buffer_size = 65536/nb_stations + buffer_size = 65536 print 'Buffer size per station = ' + str(buffer_size) - - s = [] - for i in range(0,nb_stations): - if isinstance(self.conf['deefuzz']['station'], dict): - station = self.conf['deefuzz']['station'] - else: - station = self.conf['deefuzz']['station'][i] - name = station['infos']['name'] - # Create a Station - s.append(Station(station, q, buffer_size)) - - for i in range(0,nb_stations): - # Start the Stations - s[i].start() - -class Producer(Thread): - """a DeeFuzz Producer master thread""" + # Start the station + station = self.conf['deefuzz']['station'] + s = Station(station, buffer_size) + s.run() - def __init__(self, q): - Thread.__init__(self) - self.q = q - def run(self): - i=0 - while 1: - #print "Producer produced one queue step: "+str(i) - self.q.put(i,1) - i+=1 - - -class Station(Thread): - """a DeeFuzz Station shouting slave thread""" +class Station: + """a DeeFuzz shouting station""" - def __init__(self, station, q, buffer_size): - Thread.__init__(self) + def __init__(self, station, buffer_size): self.station = station - self.q = q self.buffer_size = buffer_size self.channel = shout.Shout() self.id = 999999 @@ -242,7 +211,7 @@ class Station(Thread): guid = PyRSS2Gen.Guid(media_link), pubDate = media_date,) ) - + rss = PyRSS2Gen.RSS2(title = self.channel.name + ' ' + sub_title, link = self.channel.url, description = self.channel.description, @@ -269,7 +238,7 @@ class Station(Thread): self.playlist = self.get_playlist() self.id = 0 self.lp = len(self.playlist) - self.update_rss(self.playlist_to_objs(), self.rss_playlist_file) + self.update_rss(self.media_to_objs(self.playlist), self.rss_playlist_file) else: self.id = self.id + 1 return self.playlist[self.id] @@ -284,7 +253,7 @@ class Station(Thread): random.shuffle(self.rand_list) self.id = 0 self.lp = len(self.playlist) - self.update_rss(self.playlist_to_objs(), self.rss_playlist_file) + self.update_rss(self.media_to_objs(self.playlist), self.rss_playlist_file) else: self.id = self.id + 1 index = self.rand_list[self.id] @@ -293,9 +262,9 @@ class Station(Thread): def log_queue(self, it): print 'Station ' + self.short_name + ' eated one queue step: '+str(it) - def playlist_to_objs(self): + def media_to_objs(self, media_list): media_objs = [] - for media in self.playlist: + for media in media_list: file_name, file_title, file_ext = self.get_file_info(media) if file_ext.lower() == 'mp3': media_objs.append(Mp3(media)) @@ -314,7 +283,6 @@ class Station(Thread): Taken from Telemeta (see http://telemeta.org)""" command = self.command + '"' + media + '"' - __chunk = 0 try: proc = subprocess.Popen(command, @@ -335,14 +303,12 @@ class Station(Thread): if len(__chunk) == 0: break yield __chunk - + def core_process_read(self, media): """Read media and stream data through a generator. Taken from Telemeta (see http://telemeta.org)""" - __chunk = 0 m = open(media, 'r') - # Core processing while True: __chunk = m.read(self.buffer_size) if len(__chunk) == 0: @@ -350,12 +316,9 @@ class Station(Thread): yield __chunk m.close() - def run(self): - __chunk = 0 while True: - it = self.q.get(1) if self.lp == 0: break if self.mode_shuffle == 1: @@ -363,33 +326,21 @@ class Station(Thread): else: media = self.get_next_media_lin() self.counter += 1 - self.lp = len(self.playlist) - file_name, file_title, file_ext = self.get_file_info(media) - if file_ext.lower() == 'mp3': - media_obj = Mp3(media) - elif file_ext.lower() == 'ogg': - media_obj = Ogg(media) - - self.q.task_done() - #self.log_queue(it) if os.path.exists(media) and not os.sep+'.' in media: - it = self.q.get(1) - title = media_obj.metadata['title'] + self.current_media_obj = self.media_to_objs([media]) + title = self.current_media_obj[0].metadata['title'] self.channel.set_metadata({'song': str(title)}) - self.update_rss([media_obj], self.rss_current_file) + self.update_rss(self.current_media_obj, self.rss_current_file) + file_name, file_title, file_ext = self.get_file_info(media) print 'DeeFuzzing this file on %s : id = %s, name = %s' % (self.short_name, self.id, file_name) stream = self.core_process_read(media) - self.q.task_done() - #self.log_queue(it) - + for __chunk in stream: - it = self.q.get(1) self.channel.send(__chunk) self.channel.sync() - self.q.task_done() - #self.log_queue(it) - + stream.close() + self.channel.close() @@ -398,10 +349,8 @@ def main(): print "DeeFuzz v"+version print "Using libshout version %s" % shout.version() d = DeeFuzz(sys.argv[1]) - d.play() + d.run() else: - text = prog_info() - sys.exit(text) if __name__ == '__main__': main() diff --git a/example/deefuzz_xml_gen.py b/example/deefuzz_xml_gen.py new file mode 100755 index 0000000..e531d9a --- /dev/null +++ b/example/deefuzz_xml_gen.py @@ -0,0 +1,21 @@ +#!/usr/bin/python + +import sys +from string import Template + +nb_station = sys.argv[1] +deefuzz_from = sys.argv[2] +deefuzz_to = sys.argv[3] + +df = open(deefuzz_from, 'r') +dt = open(deefuzz_to, 'w') +t = Template(df.read()) + +dt.write('\n') +for i in range(0,int(nb_station)): + xml = t.substitute(station='MyDeeFuzz_'+str(i+1)) + dt.write(xml) +dt.write('\n') + +df.close() +dt.close() -- 2.39.5