0001
0002import logging, sys, time, os, thread, re, urllib2, base64
0003from SimpleXMLRPCServer import SimpleXMLRPCServer, list_public_methods
0004from xmlrpclib import MAXINT
0005from aggregator.main import Aggregator
0006
0007
0008import gettext
0009gettext.install('bittorrent', 'locale')
0010
0011from BitTorrent.launchmanycore import LaunchMany
0012from BitTorrent.defaultargs import get_defaults
0013from BitTorrent import configfile
0014from BitTorrent import BTFailure
0015
0016HOST = 'localhost'
0017PORT = 8000
0018
0019__author__ = "Philippe Normand (phil at base-art dot net)"
0020__version__ = '0.1'
0021__license__ = 'GPL'
0022__doc__ = """
0023FeedMyTorrent Master is the XML/RPC server which runs the BitTorrent
0024client. The former acts as a frontend to the later, through a simple
0025XML/RPC API. The API is self documented.
0026
0027This code was inspired by an ASPN Cookbook recipe found at:
0028http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440555
0029
0030Author: %(author)s
0031Version: %(version)s
0032
0033Usage: %(prog)s directory [[host:]port] [classic BitTorrent options]
0034
0035- directory is the place where BitTorrent will download/seed files
0036- host is the hostname from which the XML/RPC server will be bound (default: %(HOST)s)
0037- port is the port under which the XML/RPC server will listen (default: %(PORT)s)
0038
0039Examples:
0040 %(prog)s /data/download 9000
0041 %(prog)s /data/download macrosoft.com:9000
0042 %(prog)s /data/download localfoo --max_upload_rate 50
0043
0044""" % { 'author': __author__, 'version': __version__, 'prog': sys.argv[0],
0045 'HOST': HOST, 'PORT': PORT }
0046
0047def usage(btException=None):
0048 if btException:
0049 print
0050 print 'BitTorrent exception occurred : %s' % btException
0051 print __doc__
0052
0053class Master(SimpleXMLRPCServer):
0054
0055 log = None
0056
0057 def __init__(self, (host,port), directory):
0058 SimpleXMLRPCServer.__init__(self,(host,port))
0059 self.logRequests = False
0060 self.isPaused = False
0061 self.torrents = {}
0062 self.torrentDir = directory
0063 self.setupLog()
0064 self.register_instance(self)
0065 self.register_introspection_functions()
0066 self.aggregator = Aggregator()
0067 Master.log.info('Starting Master in http://%s:%s' % (host, port))
0068
0069 def setupLog(self):
0070 level = logging.DEBUG
0071 format = '%(asctime)s %(levelname)-8s %(message)s'
0072 datefmt = '%a, %d %b %Y %H:%M:%S'
0073 fname = 'fmtorrent_master.log'
0074
0075 formatter = logging.Formatter(format, datefmt)
0076 handler = logging.FileHandler(fname)
0077 handler.setFormatter(formatter)
0078
0079 Master.log = logging.getLogger('FmTorrent')
0080 Master.log.addHandler(handler)
0081 Master.log.setLevel(level)
0082
0083 logging.basicConfig()
0084 root = logging.getLogger()
0085 handler = root.handlers[0]
0086 handler.setFormatter(formatter)
0087
0088 def _dispatch(self, method, params):
0089 if method.startswith('system'):
0090 return SimpleXMLRPCServer._dispatch(self, method, params)
0091 try:
0092
0093
0094
0095 func = getattr(self, 'export_' + method)
0096 except AttributeError:
0097 raise Exception('method "%s" is not supported' % method)
0098 else:
0099 if method != 'getStatus':
0100 Master.log.info("calling %s" % method)
0101 return func(*params)
0102
0103 def _listMethods(self):
0104 return [ name.split('export_')[1] for name in dir(self) if name.startswith('export')]
0105
0106 def _methodHelp(self, name):
0107 try:
0108 method = getattr(self, 'export_%s' % name)
0109 except AttributeError:
0110 help = ""
0111 else:
0112 import pydoc
0113 help = pydoc.getdoc(method)
0114 return help
0115
0116
0117
0118
0119
0120 def export_pauseAll(self):
0121 """ pauseAll() -> int
0122
0123 Pause all current torrents
0124 """
0125 self.isPaused = True
0126 return 1
0127
0128 def export_resumeAll(self):
0129 """ resumeAll() -> int
0130
0131 Resume all currently paused torrents
0132 """
0133 self.isPaused = False
0134 for name in self.torrents.keys():
0135 self.export_resumeTorrent(name)
0136 return 0
0137
0138 def export_getStatus(self):
0139 """ getStatus() -> dict keyed by torrent name
0140
0141 Each item of the dictionnary is another dictionnary:
0142
0143 ::
0144
0145 {'state': str, 'status': status,
0146 'progress': str, 'peers': int,
0147 'seeds': int, 'seedsmsg': str,
0148 'dist': int, 'uprate': float,
0149 'dnrate': float, 'upamt': int,
0150 'dnamt': int, 'size': int, 't': int,
0151 'msg': str}
0152
0153
0154 Return informations about current torrents, as a dictionnary, hashed by
0155 torrent name
0156 """
0157 status = {}
0158 for torrentName, infos in self.torrents.iteritems():
0159 tmp = {}
0160 torrentName = os.path.basename(torrentName)
0161 for k,v in infos.iteritems():
0162 if type(v) == long and v >= MAXINT:
0163 v = MAXINT -1
0164 tmp[k] = v
0165 status[torrentName] = tmp
0166 return status
0167
0168 def export_pauseTorrent(self, name):
0169 """ pauseTorrent(torrentName) -> int
0170
0171 Pause a torrent, given its name (cf. getStatus() for
0172 torrent names)
0173 """
0174 try:
0175 self._rename(name, '%s.paused' % name)
0176 except:
0177 return 0
0178 else:
0179 return self._torrentState(name,'pause')
0180
0181 def export_resumeTorrent(self, name):
0182 """ resumeTorrent(torrentName) -> int
0183
0184 Resume a torrent, given its name (cf. getStatus() for
0185 torrent names)
0186 """
0187 try:
0188 self._rename('%s.paused' % name, name)
0189 except:
0190 return 0
0191 else:
0192 return self._torrentState(name,'resume')
0193
0194 def export_stopTorrent(self, name):
0195 """ stopTorrent(torrentName) -> int
0196
0197 Stop a torrent, given its name (cf. getStatus() for
0198 torrent names)
0199 """
0200 return self._torrentState(name,'stop')
0201
0202 def export_addTorrentFromData(self, fname, data,local=0):
0203 """ addTorrentFromData(torrentFilename, str) -> 1
0204
0205 Add a new torrent to the queue, given the destination
0206 filename and its content.
0207 """
0208 if not local:
0209 data = base64.decodestring(data)
0210 if not fname.endswith('.torrent'):
0211 fname = "%s.torrent" % fname
0212
0213 full = os.path.join(self.torrentDir, fname)
0214 f = open(full,'w')
0215 f.write(data)
0216 f.close()
0217 return 1
0218
0219 def export_addTorrentFromURL(self, url):
0220 """ addTorrentFromURL(str) -> str
0221
0222 Add a new torrent to the queue, given its full URL. This
0223 method returns the filename of the new torrent.
0224 """
0225 filename, data = self._getTorrent(url)
0226 self.export_addTorrentFromData(filename, data,local=1)
0227 print filename
0228 return filename
0229
0230 def export_getFeeds(self):
0231 """ getFeeds() -> list of feed names
0232
0233 Fetch the feeds list and return it as a list of string
0234 identifiers (like ['foo','bar'])
0235 """
0236 return self.aggregator.getFeeds()
0237
0238 def export_updateFeed(self, feed):
0239 """ updateFeed(feedName) -> 0|1
0240
0241 Updates the feed identified by its name. If some torrents
0242 are found during update, add them to the queue and returns
0243 1. Either returns 0.
0244 """
0245 torrents = self.aggregator.updateFeed(feed)[feed]
0246 if torrents:
0247 self._newTorrents(feed, torrents)
0248 return 1
0249 return 0
0250
0251
0252
0253
0254
0255 def _getTorrent(self, url):
0256 req = urllib2.Request(url)
0257 handle = urllib2.urlopen(req)
0258 info = handle.info()
0259 url = handle.geturl()
0260 filename = os.path.split(urllib2.urlparse.urlsplit(url)[2])[1]
0261 if info.has_key('content-disposition'):
0262 disp = info['content-disposition']
0263 match = re.match('attachment; filename="(.*)"', disp)
0264 if match:
0265 filename = match.groups()[0]
0266 return (filename, handle.read())
0267
0268 def live(self):
0269 uiname = 'btlaunchmany'
0270
0271 wantUsage = False
0272 for opt in ('-h','--help'):
0273 if opt in sys.argv[1:]:
0274 del sys.argv[sys.argv.index(opt)]
0275 wantUsage = True
0276 try:
0277 config, args = configfile.parse_configuration_and_args(get_defaults(uiname),
0278 uiname, sys.argv[1:], 0, 1)
0279 config['torrent_dir'] = self.torrentDir
0280 config['parse_dir_interval'] = 10
0281 config['saveas_style'] = 2
0282 self.config = config
0283 except BTFailure, e:
0284 usage(e)
0285 else:
0286 if wantUsage:
0287 usage()
0288 elif self.torrentDir:
0289 thread.start_new_thread(self.serve_forever,())
0290 thread.start_new_thread(self.aggregate,())
0291 self.runTorrents()
0292 Master.log.debug('Exiting')
0293
0294 def runTorrents(self):
0295 Master.log.debug('Waiting for torrents in "%s"' % self.torrentDir)
0296
0297 LaunchMany(self.config, self, 'btlaunchmany')
0298
0299 def display(self, data):
0300 """ this is called by the launchmanycore.stats() function """
0301 while self.isPaused:
0302 time.sleep(1.0)
0303
0304 for xx in data:
0305 ( name, status, progress, peers, seeds, seedsmsg, dist,
0306 uprate, dnrate, upamt, dnamt, size, t, msg) = xx
0307
0308 stopstatus = self.torrents.get(name,{'state':''})['state']
0309 if stopstatus == 'stop':
0310 try:
0311 os.remove(name)
0312 except:
0313 traceback.print_exc()
0314 del self.torrents[name]
0315 continue
0316 elif not os.path.exists(name):
0317 try:
0318 del self.torrents[name]
0319 except KeyError: pass
0320 continue
0321 elif stopstatus == 'resume':
0322 stopstatus = 'in_queue'
0323 if not stopstatus or stopstatus == 'in_queue':
0324 self.torrents[name] = {'state': 'in_queue', 'status': status,
0325 'progress': progress, 'peers': peers,
0326 'seeds': seeds, 'seedsmsg': seedsmsg,
0327 'dist': dist, 'uprate': uprate,
0328 'dnrate': dnrate, 'upamt': upamt,
0329 'dnamt': dnamt, 'size': size, 't': t,
0330 'msg': msg}
0331 return False
0332
0333 def aggregate(self):
0334 lastUpdate = None
0335 while 1:
0336 if not lastUpdate :
0337 for feed, torrents in self.aggregator.update().iteritems():
0338 self._newTorrents(feed, torrents)
0339 lastUpdate = time.time()
0340 else:
0341 if time.time() - lastUpdate > (60 * 2):
0342 lastUpdate = None
0343 else:
0344 time.sleep(1.0)
0345
0346 def message(self, str):
0347 Master.log.info('message: %s'%str)
0348
0349 def exception(self, str):
0350 Master.log.warn('exception: %s'%str)
0351
0352 def _rename(self, old, new):
0353 Master.log.debug('"%s" -> "%s"' % (old, new))
0354 old = os.path.join(self.torrentDir, old)
0355 new = os.path.join(self.torrentDir, new)
0356 os.rename(old, new)
0357
0358 def _torrentState(self, name, s):
0359 try:
0360 self.torrents[name]['state'] = s
0361 except KeyError:
0362 return 0
0363 else:
0364 return 1
0365
0366 def _newTorrents(self, feed, torrents):
0367 """
0368 Placing .torrents in sub-dirs won't work if there's
0369 at least one torrent in data_dir ;(
0370 """
0371
0372
0373
0374 for name, url in torrents.iteritems():
0375 fname, torrent = self._getTorrent(url)
0376
0377 self.export_addTorrentFromData(name, torrent,local=1)
0378
0379
0380def main(args):
0381
0382 try:
0383 directory = sys.argv[1]
0384 except IndexError:
0385 usage()
0386 sys.exit(0)
0387 else:
0388 if not os.path.isdir(directory):
0389 usage()
0390 sys.exit(0)
0391 del sys.argv[1]
0392
0393 host = HOST
0394 port = PORT
0395
0396 if len(sys.argv) > 1:
0397 addr = ''
0398 if sys.argv[1][0] != '-':
0399 addr = sys.argv[1]
0400 infos = addr.split(':')
0401 if len(infos) == 1:
0402 infos = infos[0]
0403 try:
0404 int(infos)
0405 except ValueError:
0406 host = infos
0407 else:
0408 port = int(infos)
0409 elif len(infos) == 2:
0410 host = infos[0]
0411 port = int(infos[1])
0412 del sys.argv[1]
0413
0414 try:
0415 master = Master((host, port), directory)
0416 except RuntimeError, err:
0417 print "\n%s\n" % err
0418 else:
0419 try:
0420 master.live()
0421 except KeyboardInterrupt:
0422 print 'Master died. Exiting gracefully :-)'
0423
0424if __name__ == "__main__":
0425 main(sys.argv)