From da0035214d63e5263281fc9d72380b72c5360ee2 Mon Sep 17 00:00:00 2001 From: Guillaume Pellerin Date: Tue, 22 Dec 2009 10:57:33 +0000 Subject: [PATCH] get the relay working, strong class rearrangement, cleanup queue management --- deefuzzer.py | 216 ++++++++++----------------------------- tools/__init__.py | 6 +- tools/osc.py | 61 +++++++++++ tools/osc_no_relay.py | 2 +- tools/osc_relay_start.py | 14 +++ tools/osc_relay_stop.py | 14 +++ tools/player.py | 101 ++++++++++++++++++ tools/relay.py | 75 ++++++++++++++ tools/twitter.py | 52 ++++++++++ 9 files changed, 378 insertions(+), 163 deletions(-) create mode 100644 tools/osc.py create mode 100644 tools/osc_relay_start.py create mode 100644 tools/osc_relay_stop.py create mode 100644 tools/player.py create mode 100644 tools/relay.py create mode 100644 tools/twitter.py diff --git a/deefuzzer.py b/deefuzzer.py index 2d014d8..1b4d6a9 100755 --- a/deefuzzer.py +++ b/deefuzzer.py @@ -44,9 +44,7 @@ import string import random import Queue import shout -import subprocess import platform -import urllib from threading import Thread from tools import * @@ -247,6 +245,7 @@ class Station(Thread): self.playlist = self.get_playlist() self.lp = len(self.playlist) self.channel.open() + self.channel_delay = self.channel.delay() # Logging self.logger.write('Opening ' + self.short_name + ' - ' + self.channel.name + \ @@ -259,11 +258,11 @@ class Station(Thread): os.makedirs(self.metadata_dir) # OSC - self.osc_control_mode = '0' + self.osc_control_mode = 0 if 'control' in self.station: - self.osc_control_mode = self.station['control']['mode'] + self.osc_control_mode = int(self.station['control']['mode']) self.osc_port = self.station['control']['port'] - if self.osc_control_mode =='1': + if self.osc_control_mode == 1: self.osc_controller = OSCController(self.osc_port) self.osc_controller.start() # OSC paths and callbacks @@ -273,59 +272,64 @@ class Station(Thread): self.osc_controller.add_method('/mode/jingles', 'i', self.jingles_callback) # Twitter - self.twitter_mode = '0' + self.twitter_mode = 0 if 'twitter' in self.station: - self.twitter_mode = self.station['twitter']['mode'] + self.twitter_mode = int(self.station['twitter']['mode']) self.twitter_user = self.station['twitter']['user'] self.twitter_pass = self.station['twitter']['pass'] - if self.twitter_mode == '1': + if self.twitter_mode == 1: self.twitter = Twitter(self.twitter_user, self.twitter_pass) self.twitter_tags = self.station['twitter']['tags'].split(' ') import tinyurl self.tinyurl = tinyurl.create_one(self.channel.url + '/m3u/' + self.m3u.split(os.sep)[-1]) # A jingle between each media - self.jingles_mode = '0' + self.jingles_mode = 0 if 'jingles' in self.station: - self.jingles_mode = self.station['jingles']['mode'] + self.jingles_mode = int(self.station['jingles']['mode']) self.jingles_shuffle = self.station['jingles']['shuffle'] self.jingles_dir = self.station['jingles']['dir'] - if self.jingles_mode =='1': + if self.jingles_mode == 1: self.jingles_list = self.get_jingles() self.jingles_length = len(self.jingles_list) self.jingle_id = 0 - # The station's player - self.player = Player() - # Relay - self.relay_mode = '0' + self.relay_mode = 0 if 'relay' in self.station: - self.relay_mode = self.station['relay']['mode'] + self.relay_mode = int(self.station['relay']['mode']) self.relay_url = self.station['relay']['url'] - self.player.set_relay(self.relay_url) + + # The station's player + self.player = Player() def media_next_callback(self, path, value): value = value[0] - self.next_media = str(value) + self.next_media = value message = "Received OSC message '%s' with arguments '%d'" % (path, value) self.logger.write(message) def relay_callback(self, path, value): value = value[0] - self.relay_mode = str(value) + if value == 1: + self.relay_mode = 1 + self.player.start_relay(self.relay_url) + elif value == 0: + self.relay_mode = 0 + self.player.stop_relay() + self.next_media = 1 message = "Received OSC message '%s' with arguments '%d'" % (path, value) self.logger.write(message) def twitter_callback(self, path, value): value = value[0] - self.twitter_mode = str(value) + self.twitter_mode = value message = "Received OSC message '%s' with arguments '%d'" % (path, value) self.logger.write(message) def jingles_callback(self, path, value): value = value[0] - self.jingles_mode = str(value) + self.jingles_mode = value message = "Received OSC message '%s' with arguments '%d'" % (path, value) self.logger.write(message) @@ -493,17 +497,21 @@ class Station(Thread): def run(self): while True: - it = self.q.get(1) - if self.lp == 0: - self.logger.write('Error : Station ' + self.short_name + ' have no media to stream !') - break - self.next_media = '0' + self.q.get(1) + self.next_media = 0 self.media = self.get_next_media() self.counter += 1 self.q.task_done() - if os.path.exists(self.media) and not os.sep+'.' in self.media: - self.q.get(1) + self.q.get(1) + if self.relay_mode == 1: + self.channel.set_metadata({'song': 'LIVE', 'charset': 'utf8',}) + self.stream = self.player.relay_read() + + elif os.path.exists(self.media) and not os.sep+'.' in self.media: + if self.lp == 0: + self.logger.write('Error : Station ' + self.short_name + ' have no media to stream !') + break self.current_media_obj = self.media_to_objs([self.media]) self.title = self.current_media_obj[0].metadata['title'] self.artist = self.current_media_obj[0].metadata['artist'] @@ -522,144 +530,30 @@ class Station(Thread): self.update_rss(self.current_media_obj, self.rss_current_file, '(currently playing)') self.logger.write('DeeFuzzing this file on %s : id = %s, name = %s' \ % (self.short_name, self.id, self.current_media_obj[0].file_name)) + self.player.set_media(self.media) + self.stream = self.player.file_read_slow() - if not (self.jingles_mode == '1' and (self.counter % 2) == 1): - message = 'Now playing: %s #%s #%s' % (self.song.replace('_', ' '), self.artist.replace(' ', ''), self.short_name) - self.update_twitter(message) - - if self.relay_mode != '0': - self.stream = self.player.relay() - self.channel.set_metadata({'song': 'LIVE', 'charset': 'utf8',}) - else: - self.player.set_media(self.media) - self.stream = self.player.read_slow() - self.q.task_done() - - for __chunk in self.stream: - self.q.get(1) - try: - self.channel.send(__chunk) - self.channel.sync() - if self.next_media != '0': - break - # self.logger.write('Station delay (ms) ' + self.short_name + ' : ' + str(self.channel.delay())) - except: - self.logger.write('ERROR : Station ' + self.short_name + ' : could not send the buffer... ') - self.channel.close() - self.channel.open() - continue - self.q.task_done() - else: - self.logger.write('Error : Station ' + self.short_name + ' : ' + self.media + 'not found !') - - self.channel.close() - - -class Player: - """A file streaming iterator""" - - def __init__(self): - self.main_buffer_size = 0x100000 - self.sub_buffer_size = 0x10000 - - def set_media(self, media): - self.media = media - - def set_relay(self, url): - self.q = Queue.Queue(self.main_buffer_size) - self.r = Relay(self.q, self.sub_buffer_size, url) - self.r.start() - - def read_fast(self): - """Read media and stream data through a generator.""" - m = open(self.media, 'r') - while True: - __main_chunk = m.read(self.sub_buffer_size) - if not __main_chunk: - break - yield __main_chunk - m.close() - - def read_slow(self): - """Read a bigger part of the media and stream the little parts - of the data through a generator""" - m = open(self.media, 'r') - while True: - __main_chunk = m.read(self.main_buffer_size) - if not __main_chunk: - break - i = 0 - while True: - start = i * self.sub_buffer_size - end = self.sub_buffer_size + (i * self.sub_buffer_size) - __sub_chunk = __main_chunk[start:end] - if not __sub_chunk: - break - yield __sub_chunk - i += 1 - m.close() + if not (self.jingles_mode == 1 and (self.counter % 2) == 1): + message = 'Now playing: %s #%s #%s' % (self.song.replace('_', ' '), self.artist.replace(' ', ''), self.short_name) + self.update_twitter(message) - def relay(self): - """Read a distant media through its URL""" - while True: - __chunk = self.q.get(self.sub_buffer_size) - if not __chunk: - break - yield __chunk self.q.task_done() + for self.chunk in self.stream: + self.q.get(1) + try: + self.channel.send(self.chunk) + self.channel.sync() + if self.next_media == 1: + break + except: + self.logger.write('ERROR : Station ' + self.short_name + ' : could not send the buffer... ') + self.channel.close() + self.channel.open() + continue + self.q.task_done() -class Relay(Thread): - - def __init__(self, q, buffer_size, url): - Thread.__init__(self) - self.q = q - self.buffer_size = buffer_size - self.url = url - self.u = urllib.urlopen(self.url) - - def run(self): - while True: - data = self.u.read(self.buffer_size) - self.q.put_nowait(data) - self.u.close() - - -class Twitter: - - def __init__(self, username, password): - import twitter - self.username = username - self.password = password - self.api = twitter.Api(username=self.username, password=self.password) - - def post(self, message): - try: - self.api.PostUpdate(message) - except: - pass - - -class OSCController(Thread): - - def __init__(self, port): - Thread.__init__(self) - import liblo - self.port = port - try: - self.server = liblo.Server(self.port) - except liblo.ServerError, err: - print str(err) - - def add_method(self, path, type, method): - self.server.add_method(path, type, method) - - def server(self): - return self.server - - def run(self): - while True: - self.server.recv(1000) + self.channel.close() def main(): diff --git a/tools/__init__.py b/tools/__init__.py index 3f9d101..f103ac3 100644 --- a/tools/__init__.py +++ b/tools/__init__.py @@ -3,4 +3,8 @@ from xmltodict import * from PyRSS2Gen import * from mp3 import * from ogg import * -from logger import * \ No newline at end of file +from logger import * +from player import * +from osc import * +from twitter import * +from relay import * diff --git a/tools/osc.py b/tools/osc.py new file mode 100644 index 0000000..cdf6b24 --- /dev/null +++ b/tools/osc.py @@ -0,0 +1,61 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2006-2009 Guillaume Pellerin + +# + +# This software is a computer program whose purpose is to stream audio +# and video data through icecast2 servers. + +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". + +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. + +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. + +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +# Author: Guillaume Pellerin + +from threading import Thread + + +class OSCController(Thread): + + def __init__(self, port): + Thread.__init__(self) + import liblo + self.port = port + try: + self.server = liblo.Server(self.port) + except liblo.ServerError, err: + print str(err) + + def add_method(self, path, type, method): + self.server.add_method(path, type, method) + + def server(self): + return self.server + + def run(self): + while True: + self.server.recv(1000) diff --git a/tools/osc_no_relay.py b/tools/osc_no_relay.py index eaefe1a..4f9a3bf 100644 --- a/tools/osc_no_relay.py +++ b/tools/osc_no_relay.py @@ -11,4 +11,4 @@ except liblo.AddressError, err: sys.exit() # send message "/foo/message1" with int, float and string arguments -liblo.send(target, "/media/relay", 0) +liblo.send(target, "/media/relay", 'stop') diff --git a/tools/osc_relay_start.py b/tools/osc_relay_start.py new file mode 100644 index 0000000..14bcb69 --- /dev/null +++ b/tools/osc_relay_start.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import liblo, sys + +# send all messages to port 1234 on the local machine +try: + target = liblo.Address(1234) +except liblo.AddressError, err: + print str(err) + sys.exit() + +# send message "/foo/message1" with int, float and string arguments +liblo.send(target, "/media/relay", 1) diff --git a/tools/osc_relay_stop.py b/tools/osc_relay_stop.py new file mode 100644 index 0000000..eaefe1a --- /dev/null +++ b/tools/osc_relay_stop.py @@ -0,0 +1,14 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- + +import liblo, sys + +# send all messages to port 1234 on the local machine +try: + target = liblo.Address(1234) +except liblo.AddressError, err: + print str(err) + sys.exit() + +# send message "/foo/message1" with int, float and string arguments +liblo.send(target, "/media/relay", 0) diff --git a/tools/player.py b/tools/player.py new file mode 100644 index 0000000..9d78d8b --- /dev/null +++ b/tools/player.py @@ -0,0 +1,101 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2006-2009 Guillaume Pellerin + +# + +# This software is a computer program whose purpose is to stream audio +# and video data through icecast2 servers. + +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". + +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. + +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. + +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +# Author: Guillaume Pellerin + +from relay import * + +class Player: + """A file streaming iterator""" + + def __init__(self): + self.main_buffer_size = 0x100000 + self.sub_buffer_size = 0x10000 + + def set_media(self, media): + self.media = media + + def start_relay(self, url): + self.url = url + self.relay = Relay(self.sub_buffer_size, self.main_buffer_size) + self.relay.set_url(self.url) + self.relay.open() + self.relay.start() + self.queue = self.relay.queue + + def stop_relay(self): + self.relay.close() + + def file_read_fast(self): + """Read media and stream data through a generator.""" + m = open(self.media, 'r') + while True: + __main_chunk = m.read(self.sub_buffer_size) + if not __main_chunk: + break + yield __main_chunk + m.close() + + def file_read_slow(self): + """Read a bigger part of the media and stream the little parts + of the data through a generator""" + m = open(self.media, 'r') + while True: + self.main_chunk = m.read(self.main_buffer_size) + if not self.main_chunk: + break + i = 0 + while True: + start = i * self.sub_buffer_size + end = self.sub_buffer_size + (i * self.sub_buffer_size) + self.sub_chunk = self.main_chunk[start:end] + if not self.sub_chunk: + break + yield self.sub_chunk + i += 1 + self.main_chunk = 0 + self.sub_chunk = 0 + m.close() + + def relay_read(self): + """Read a distant media through its URL""" + while True: + self.sub_chunk = self.queue.get(self.sub_buffer_size) + if not self.sub_chunk: + break + yield self.sub_chunk + self.queue.task_done() + self.sub_chunk = 0 diff --git a/tools/relay.py b/tools/relay.py new file mode 100644 index 0000000..831989e --- /dev/null +++ b/tools/relay.py @@ -0,0 +1,75 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2006-2009 Guillaume Pellerin + +# + +# This software is a computer program whose purpose is to stream audio +# and video data through icecast2 servers. + +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". + +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. + +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. + +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +# Author: Guillaume Pellerin + +from threading import Thread +import Queue +import urllib + +class Relay(Thread): + + def __init__(self, sub_buffer_size, main_buffer_size): + Thread.__init__(self) + self.sub_buffer_size = sub_buffer_size + self.main_buffer_size = main_buffer_size + self.queue = Queue.Queue(self.main_buffer_size) + + def set_url(self, url): + self.url = url + + def open(self): + try: + self.stream = urllib.urlopen(self.url) + self.isopen = True + except: + self.isopen = False + + def close(self): + if self.stream: + self.isopen = False + + def run(self): + while True: + if self.isopen: + self.chunk = self.stream.read(self.sub_buffer_size) + self.queue.put_nowait(self.chunk) + else: + self.stream.close() + break + + + diff --git a/tools/twitter.py b/tools/twitter.py new file mode 100644 index 0000000..6451116 --- /dev/null +++ b/tools/twitter.py @@ -0,0 +1,52 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- +# +# Copyright (C) 2006-2009 Guillaume Pellerin + +# + +# This software is a computer program whose purpose is to stream audio +# and video data through icecast2 servers. + +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". + +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. + +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. + +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + +# Author: Guillaume Pellerin + +class Twitter: + + def __init__(self, username, password): + import twitter + self.username = username + self.password = password + self.api = twitter.Api(username=self.username, password=self.password) + + def post(self, message): + try: + self.api.PostUpdate(message) + except: + pass + -- 2.39.5