0001#!/usr/bin/env python
0002import logging, sys, time, os, thread, re, urllib2, base64
0003from SimpleXMLRPCServer import SimpleXMLRPCServer, list_public_methods
0004from xmlrpclib import MAXINT
0005from aggregator.main import Aggregator
0006
0007# need this prior to BT imports
0008import gettext
0009gettext.install('bittorrent', 'locale')
0010
0011from BitTorrent.launchmanycore import LaunchMany
0012from BitTorrent.defaultargs import get_defaults
0013from BitTorrent import configfile
0014from BitTorrent import BTFailure
0015
0016HOST = 'localhost'
0017PORT = 8000
0018
0019__author__ = "Philippe Normand (phil at base-art dot net)"
0020__version__ = '0.1'
0021__license__ = 'GPL'
0022__doc__ = """
0023FeedMyTorrent Master is the XML/RPC server which runs the BitTorrent
0024client. The former acts as a frontend to the later, through a simple
0025XML/RPC API. The API is self documented.
0026
0027This code was inspired by an ASPN Cookbook recipe found at:
0028http://aspn.activestate.com/ASPN/Cookbook/Python/Recipe/440555
0029
0030Author: %(author)s
0031Version: %(version)s
0032
0033Usage: %(prog)s directory [[host:]port] [classic BitTorrent options]
0034
0035- directory is the place where BitTorrent will download/seed files
0036- host is the hostname from which the XML/RPC server will be bound (default: %(HOST)s)
0037- port is the port under which the XML/RPC server will listen (default: %(PORT)s)
0038
0039Examples:
0040       %(prog)s /data/download 9000
0041       %(prog)s /data/download macrosoft.com:9000
0042       %(prog)s /data/download localfoo --max_upload_rate 50
0043       
0044"""  % { 'author': __author__, 'version': __version__, 'prog': sys.argv[0],
0045         'HOST': HOST, 'PORT': PORT }
0046
0047def usage(btException=None):
0048    if btException:
0049        print
0050        print 'BitTorrent exception occurred : %s' % btException
0051    print __doc__
0052
0053class Master(SimpleXMLRPCServer):
0054
0055    log = None
0056
0057    def __init__(self, (host,port), directory):
0058        SimpleXMLRPCServer.__init__(self,(host,port))
0059        self.logRequests = False
0060        self.isPaused = False
0061        self.torrents = {}
0062        self.torrentDir = directory
0063        self.setupLog()
0064        self.register_instance(self)
0065        self.register_introspection_functions()
0066        self.aggregator = Aggregator()
0067        Master.log.info('Starting Master in http://%s:%s' % (host, port))
0068
0069    def setupLog(self):
0070        level = logging.DEBUG
0071        format = '%(asctime)s %(levelname)-8s %(message)s'
0072        datefmt = '%a, %d %b %Y %H:%M:%S'
0073        fname = 'fmtorrent_master.log'
0074
0075        formatter = logging.Formatter(format, datefmt)
0076        handler = logging.FileHandler(fname)
0077        handler.setFormatter(formatter)
0078
0079        Master.log = logging.getLogger('FmTorrent')
0080        Master.log.addHandler(handler)
0081        Master.log.setLevel(level)
0082
0083        logging.basicConfig()
0084        root = logging.getLogger()
0085        handler = root.handlers[0]
0086        handler.setFormatter(formatter)
0087
0088    def _dispatch(self, method, params):
0089        if method.startswith('system'):
0090            return SimpleXMLRPCServer._dispatch(self, method, params)
0091        try:
0092            # We are forcing the 'export_' prefix on methods that are
0093            # callable through XML-RPC to prevent potential security
0094            # problems
0095            func = getattr(self, 'export_' + method)
0096        except AttributeError:
0097            raise Exception('method "%s" is not supported' % method)
0098        else:
0099            if method != 'getStatus':
0100                Master.log.info("calling %s" % method)
0101            return func(*params)
0102
0103    def _listMethods(self):
0104        return [ name.split('export_')[1] for name in dir(self) if name.startswith('export')]
0105
0106    def _methodHelp(self, name):
0107        try:
0108            method = getattr(self, 'export_%s' % name)
0109        except AttributeError:
0110            help = ""
0111        else:
0112            import pydoc
0113            help = pydoc.getdoc(method)
0114        return help
0115
0116    ##################################################################
0117    ### XML/RPC API
0118    ##################################################################
0119
0120    def export_pauseAll(self):
0121        """ pauseAll() -> int
0122
0123            Pause all current torrents
0124        """
0125        self.isPaused = True
0126        return 1
0127
0128    def export_resumeAll(self):
0129        """ resumeAll() -> int
0130
0131            Resume all currently paused torrents
0132        """
0133        self.isPaused = False
0134        for name in self.torrents.keys():
0135            self.export_resumeTorrent(name)
0136        return 0
0137
0138    def export_getStatus(self):
0139        """ getStatus() -> dict keyed by torrent name
0140
0141            Each item of the dictionnary is another dictionnary:
0142
0143            ::
0144
0145              {'state': str, 'status': status,
0146               'progress': str, 'peers': int,
0147               'seeds': int, 'seedsmsg': str,
0148               'dist': int, 'uprate': float,
0149               'dnrate': float, 'upamt': int,
0150               'dnamt': int, 'size': int, 't': int,
0151               'msg': str}
0152               
0153                                           
0154            Return informations about current torrents, as a dictionnary, hashed by
0155            torrent name
0156        """
0157        status = {}
0158        for torrentName, infos in self.torrents.iteritems():
0159            tmp = {}
0160            torrentName = os.path.basename(torrentName)
0161            for  k,v in infos.iteritems():
0162                if type(v) == long and v >= MAXINT:
0163                    v = MAXINT -1
0164                tmp[k] = v
0165            status[torrentName] = tmp
0166        return status
0167
0168    def export_pauseTorrent(self, name):
0169        """ pauseTorrent(torrentName) -> int
0170        
0171            Pause a torrent, given its name (cf. getStatus() for
0172            torrent names)
0173        """
0174        try:
0175            self._rename(name, '%s.paused' % name)
0176        except:
0177            return 0
0178        else:
0179            return self._torrentState(name,'pause')
0180
0181    def export_resumeTorrent(self, name):
0182        """ resumeTorrent(torrentName) -> int
0183        
0184            Resume a torrent, given its name (cf. getStatus() for
0185            torrent names)
0186        """
0187        try:
0188            self._rename('%s.paused' % name, name)
0189        except:
0190            return 0
0191        else:
0192            return self._torrentState(name,'resume')
0193
0194    def export_stopTorrent(self, name):
0195        """ stopTorrent(torrentName) -> int
0196
0197            Stop a torrent, given its name (cf. getStatus() for
0198            torrent names)
0199        """
0200        return self._torrentState(name,'stop')
0201
0202    def export_addTorrentFromData(self, fname, data,local=0):
0203        """ addTorrentFromData(torrentFilename, str) -> 1
0204
0205            Add a new torrent to the queue, given the destination
0206            filename and its content.
0207        """
0208        if not local:
0209            data = base64.decodestring(data)
0210        if not fname.endswith('.torrent'):
0211            fname = "%s.torrent" % fname
0212        #fname = fname.replace('/','_')
0213        full = os.path.join(self.torrentDir, fname)
0214        f = open(full,'w')
0215        f.write(data)
0216        f.close()
0217        return 1
0218
0219    def export_addTorrentFromURL(self, url):
0220        """ addTorrentFromURL(str) -> str
0221
0222            Add a new torrent to the queue, given its full URL. This
0223            method returns the filename of the new torrent.
0224        """
0225        filename, data = self._getTorrent(url)
0226        self.export_addTorrentFromData(filename, data,local=1)
0227        print filename
0228        return filename
0229
0230    def export_getFeeds(self):
0231        """ getFeeds() -> list of feed names
0232
0233            Fetch the feeds list and return it as a list of string
0234            identifiers (like ['foo','bar'])
0235        """
0236        return self.aggregator.getFeeds()
0237
0238    def export_updateFeed(self, feed):
0239        """ updateFeed(feedName) -> 0|1
0240
0241            Updates the feed identified by its name. If some torrents
0242            are found during update, add them to the queue and returns
0243            1. Either returns 0.
0244        """
0245        torrents = self.aggregator.updateFeed(feed)[feed]
0246        if torrents:
0247            self._newTorrents(feed, torrents)
0248            return 1
0249        return 0
0250
0251    ##################################################################
0252    ### / XML/RPC API
0253    ##################################################################
0254
0255    def _getTorrent(self, url):
0256        req = urllib2.Request(url)
0257        handle = urllib2.urlopen(req)
0258        info = handle.info()
0259        url = handle.geturl()
0260        filename = os.path.split(urllib2.urlparse.urlsplit(url)[2])[1]
0261        if info.has_key('content-disposition'):
0262            disp = info['content-disposition']
0263            match = re.match('attachment; filename="(.*)"', disp)
0264            if match:
0265                filename = match.groups()[0]
0266        return (filename, handle.read())
0267
0268    def live(self):
0269        uiname = 'btlaunchmany'
0270        # try to hook our usage msg over Bt's one
0271        wantUsage = False
0272        for opt in ('-h','--help'):
0273            if opt in sys.argv[1:]:
0274                del sys.argv[sys.argv.index(opt)]
0275                wantUsage = True
0276        try:
0277            config, args = configfile.parse_configuration_and_args(get_defaults(uiname),
0278                                                                   uiname, sys.argv[1:], 0, 1)
0279            config['torrent_dir'] = self.torrentDir
0280            config['parse_dir_interval'] = 10
0281            config['saveas_style'] = 2
0282            self.config = config
0283        except BTFailure, e:
0284            usage(e)
0285        else:
0286            if wantUsage:
0287                usage()
0288            elif self.torrentDir:
0289                thread.start_new_thread(self.serve_forever,())
0290                thread.start_new_thread(self.aggregate,())
0291                self.runTorrents()
0292        Master.log.debug('Exiting')
0293
0294    def runTorrents(self):
0295        Master.log.debug('Waiting for torrents in "%s"' % self.torrentDir)
0296        # pass in self to be the display delegate
0297        LaunchMany(self.config, self, 'btlaunchmany')
0298
0299    def display(self, data):
0300        """ this is called by the launchmanycore.stats() function """
0301        while self.isPaused:
0302            time.sleep(1.0)
0303
0304        for xx in data:
0305            ( name, status, progress, peers, seeds, seedsmsg, dist,
0306              uprate, dnrate, upamt, dnamt, size, t, msg) = xx
0307
0308            stopstatus = self.torrents.get(name,{'state':''})['state']
0309            if stopstatus == 'stop':
0310                try:
0311                    os.remove(name)
0312                except:
0313                    traceback.print_exc()
0314                del self.torrents[name]
0315                continue
0316            elif not os.path.exists(name):
0317                try:
0318                    del self.torrents[name]
0319                except KeyError: pass
0320                continue
0321            elif stopstatus == 'resume':
0322                stopstatus = 'in_queue'
0323            if not stopstatus or stopstatus == 'in_queue':
0324                self.torrents[name] = {'state': 'in_queue', 'status': status,
0325                                       'progress': progress, 'peers': peers,
0326                                       'seeds': seeds, 'seedsmsg': seedsmsg,
0327                                       'dist': dist, 'uprate': uprate,
0328                                       'dnrate': dnrate, 'upamt': upamt,
0329                                       'dnamt': dnamt, 'size': size, 't': t,
0330                                       'msg': msg}
0331        return False
0332
0333    def aggregate(self):
0334        lastUpdate = None
0335        while 1:
0336            if not lastUpdate :
0337                for feed, torrents in self.aggregator.update().iteritems():
0338                    self._newTorrents(feed, torrents)
0339                lastUpdate = time.time()
0340            else:
0341                if time.time() - lastUpdate > (60 * 2):
0342                    lastUpdate = None
0343                else:
0344                    time.sleep(1.0)
0345
0346    def message(self, str):
0347        Master.log.info('message: %s'%str)
0348
0349    def exception(self, str):
0350        Master.log.warn('exception: %s'%str)
0351
0352    def _rename(self, old, new):
0353        Master.log.debug('"%s" -> "%s"' % (old, new))
0354        old = os.path.join(self.torrentDir, old)
0355        new = os.path.join(self.torrentDir, new)
0356        os.rename(old, new)
0357
0358    def _torrentState(self, name, s):
0359        try:
0360            self.torrents[name]['state'] = s
0361        except KeyError:
0362            return 0
0363        else:
0364            return 1
0365
0366    def _newTorrents(self, feed, torrents):
0367        """
0368        Placing .torrents in sub-dirs won't work if there's
0369        at least one torrent in data_dir ;(
0370        """
0371        # if not os.path.exists(feed):
0372        #     os.mkdir(feed)
0373        #import pdb; pdb.set_trace()
0374        for name, url in torrents.iteritems():
0375            fname, torrent = self._getTorrent(url)
0376            # name = os.path.join(feed, name)
0377            self.export_addTorrentFromData(name, torrent,local=1)
0378
0379
0380def main(args):
0381
0382    try:
0383        directory = sys.argv[1]
0384    except IndexError:
0385        usage()
0386        sys.exit(0)
0387    else:
0388        if not os.path.isdir(directory):
0389            usage()
0390            sys.exit(0)
0391        del sys.argv[1]
0392
0393    host = HOST
0394    port = PORT
0395
0396    if len(sys.argv) > 1:
0397        addr = ''
0398        if sys.argv[1][0] != '-':
0399            addr = sys.argv[1]
0400            infos = addr.split(':')
0401            if len(infos) == 1:
0402                infos = infos[0]
0403                try:
0404                    int(infos)
0405                except ValueError:
0406                    host = infos
0407                else:
0408                    port = int(infos)
0409            elif len(infos) == 2:
0410                host = infos[0]
0411                port = int(infos[1])
0412            del sys.argv[1]
0413
0414    try:
0415        master = Master((host, port), directory)
0416    except RuntimeError, err:
0417        print "\n%s\n" % err
0418    else:
0419        try:
0420            master.live()
0421        except KeyboardInterrupt:
0422            print 'Master died. Exiting gracefully :-)'
0423
0424if __name__ == "__main__":
0425    main(sys.argv)