Package flumotion :: Package component :: Package consumers :: Package httpstreamer :: Module resources
[hide private]

Source Code for Module flumotion.component.consumers.httpstreamer.resources

  1  # -*- Mode: Python; test-case-name: flumotion.test.test_http -*- 
  2  # vi:si:et:sw=4:sts=4:ts=4 
  3  # 
  4  # Flumotion - a streaming media server 
  5  # Copyright (C) 2004,2005,2006,2007 Fluendo, S.L. (www.fluendo.com). 
  6  # All rights reserved. 
  7   
  8  # This file may be distributed and/or modified under the terms of 
  9  # the GNU General Public License version 2 as published by 
 10  # the Free Software Foundation. 
 11  # This file is distributed without any warranty; without even the implied 
 12  # warranty of merchantability or fitness for a particular purpose. 
 13  # See "LICENSE.GPL" in the source distribution for more information. 
 14   
 15  # Licensees having purchased or holding a valid Flumotion Advanced 
 16  # Streaming Server license may use this file in accordance with the 
 17  # Flumotion Advanced Streaming Server Commercial License Agreement. 
 18  # See "LICENSE.Flumotion" in the source distribution for more information. 
 19   
 20  # Headers in this file shall remain intact. 
 21   
 22  import os 
 23  import socket 
 24  import time 
 25  import errno 
 26  import string 
 27  import resource 
 28  import fcntl 
 29   
 30  import gst 
 31   
 32  try: 
 33      from twisted.web import http 
 34  except ImportError: 
 35      from twisted.protocols import http 
 36   
 37  from twisted.web import server, resource as web_resource 
 38  from twisted.internet import reactor, defer 
 39  from twisted.python import reflect 
 40   
 41  from flumotion.configure import configure 
 42  from flumotion.common import errors 
 43   
 44  from flumotion.common import common, log, keycards 
 45   
 46  from flumotion.component.base import http as httpbase 
 47   
 48  __all__ = ['HTTPStreamingResource', 'MultifdSinkStreamer'] 
 49  __version__ = "$Rev: 8318 $" 
 50   
 51  HTTP_NAME = 'FlumotionHTTPServer' 
 52  HTTP_VERSION = configure.version 
 53   
 54  ERROR_TEMPLATE = """<!doctype html public "-//IETF//DTD HTML 2.0//EN"> 
 55  <html> 
 56  <head> 
 57    <title>%(code)d %(error)s</title> 
 58  </head> 
 59  <body> 
 60  <h2>%(code)d %(error)s</h2> 
 61  </body> 
 62  </html> 
 63  """ 
 64   
 65  HTTP_SERVER = '%s/%s' % (HTTP_NAME, HTTP_VERSION) 
 66   
 67  ### the Twisted resource that handles the base URL 
 68   
 69   
70 -class HTTPStreamingResource(web_resource.Resource, log.Loggable):
71 72 __reserve_fds__ = 50 # number of fd's to reserve for non-streaming 73 74 logCategory = 'httpstreamer' 75 76 # IResource interface variable; True means it will not chain requests 77 # further down the path to other resource providers through 78 # getChildWithDefault 79 isLeaf = True 80
81 - def __init__(self, streamer, httpauth):
82 """ 83 @param streamer: L{MultifdSinkStreamer} 84 """ 85 self.streamer = streamer 86 self.httpauth = httpauth 87 88 self._requests = {} # request fd -> Request 89 90 self.maxclients = self.getMaxAllowedClients(-1) 91 self.maxbandwidth = -1 # not limited by default 92 93 # If set, a URL to redirect a user to when the limits above are reached 94 self._redirectOnFull = None 95 96 self._removing = {} # Optional deferred notification of client removals 97 98 socket = 'flumotion.component.plugs.request.RequestLoggerPlug' 99 self.loggers = streamer.plugs.get(socket, []) 100 101 socket = \ 102 'flumotion.component.plugs.requestmodifier.RequestModifierPlug' 103 self.modifiers = streamer.plugs.get(socket, []) 104 105 self.logfilter = None 106 107 web_resource.Resource.__init__(self)
108
109 - def clientRemoved(self, sink, fd, reason, stats):
110 # this is the callback attached to our flumotion component, 111 # not the GStreamer element 112 if fd in self._requests: 113 request = self._requests[fd] 114 self._removeClient(request, fd, stats) 115 else: 116 self.warning('[fd %5d] not found in _requests' % fd)
117
118 - def removeAllClients(self):
119 """ 120 Start to remove all the clients connected (this will complete 121 asynchronously from another thread) 122 123 Returns a deferred that will fire once they're all removed. 124 """ 125 l = [] 126 for fd in self._requests: 127 self._removing[fd] = defer.Deferred() 128 l.append(self._removing[fd]) 129 self.streamer.remove_client(fd) 130 131 return defer.DeferredList(l)
132
133 - def setRoot(self, path):
134 self.putChild(path, self)
135
136 - def setLogFilter(self, logfilter):
137 self.logfilter = logfilter
138
139 - def rotateLogs(self):
140 """ 141 Close the logfile, then reopen using the previous logfilename 142 """ 143 for logger in self.loggers: 144 self.debug('rotating logger %r' % logger) 145 logger.rotate()
146
147 - def logWrite(self, fd, ip, request, stats):
148 149 headers = request.getAllHeaders() 150 151 if stats: 152 bytes_sent = stats[0] 153 time_connected = int(stats[3] / gst.SECOND) 154 else: 155 bytes_sent = -1 156 time_connected = -1 157 158 args = {'ip': ip, 159 'time': time.gmtime(), 160 'method': request.method, 161 'uri': request.uri, 162 'username': '-', # FIXME: put the httpauth name 163 'get-parameters': request.args, 164 'clientproto': request.clientproto, 165 'response': request.code, 166 'bytes-sent': bytes_sent, 167 'referer': headers.get('referer', None), 168 'user-agent': headers.get('user-agent', None), 169 'time-connected': time_connected} 170 171 l = [] 172 for logger in self.loggers: 173 l.append(defer.maybeDeferred( 174 logger.event, 'http_session_completed', args)) 175 176 return defer.DeferredList(l)
177
178 - def setUserLimit(self, limit):
179 self.info('setting maxclients to %d' % limit) 180 self.maxclients = self.getMaxAllowedClients(limit) 181 # Log what we actually managed to set it to. 182 self.info('set maxclients to %d' % self.maxclients)
183
184 - def setBandwidthLimit(self, limit):
185 self.maxbandwidth = limit 186 self.info("set maxbandwidth to %d", self.maxbandwidth)
187
188 - def setRedirectionOnLimits(self, url):
189 self._redirectOnFull = url
190 191 # FIXME: rename to writeHeaders 192
193 - def _writeHeaders(self, request):
194 """ 195 Write out the HTTP headers for the incoming HTTP request. 196 197 @rtype: boolean 198 @returns: whether or not the file descriptor can be used further. 199 """ 200 fd = request.transport.fileno() 201 fdi = request.fdIncoming 202 203 # the fd could have been closed, in which case it will be -1 204 if fd == -1: 205 self.info('[fd %5d] Client gone before writing header' % fdi) 206 # FIXME: do this ? del request 207 return False 208 if fd != request.fdIncoming: 209 self.warning('[fd %5d] does not match current fd %d' % (fdi, fd)) 210 # FIXME: do this ? del request 211 return False 212 213 content = self.streamer.get_content_type() 214 request.setHeader('Server', HTTP_SERVER) 215 request.setHeader('Date', http.datetimeToString()) 216 request.setHeader('Connection', 'close') 217 request.setHeader('Cache-Control', 'no-cache') 218 request.setHeader('Cache-Control', 'private') 219 request.setHeader('Content-type', content) 220 221 # Call request modifiers 222 for modifier in self.modifiers: 223 modifier.modify(request) 224 225 # Mimic Twisted as close as possible 226 headers = [] 227 for name, value in request.headers.items(): 228 headers.append('%s: %s\r\n' % (name.capitalize(), value)) 229 for cookie in request.cookies: 230 headers.append('%s: %s\r\n' % ("Set-Cookie", cookie)) 231 232 233 # ASF needs a Pragma header for live broadcasts 234 # Apparently ASF breaks on WMP port 80 if you use the pragma header 235 # - Sep 5 2006 236 #if content in [ 237 # "video/x-ms-asf", 238 # "audio/x-ms-asf", 239 #]: 240 #setHeader('Pragma', 'features=broadcast') 241 242 #self.debug('setting Content-type to %s' % mime) 243 ### FIXME: there's a window where Twisted could have removed the 244 # fd because the client disconnected. Catch EBADF correctly here. 245 try: 246 # TODO: This is a non-blocking socket, we really should check 247 # return values here, or just let twisted handle all of this 248 # normally, and not hand off the fd until after twisted has 249 # finished writing the headers. 250 os.write(fd, 'HTTP/1.0 200 OK\r\n%s\r\n' % ''.join(headers)) 251 # tell TwistedWeb we already wrote headers ourselves 252 request.startedWriting = True 253 return True 254 except OSError, (no, s): 255 if no == errno.EBADF: 256 self.info('[fd %5d] client gone before writing header' % fd) 257 elif no == errno.ECONNRESET: 258 self.info( 259 '[fd %5d] client reset connection writing header' % fd) 260 else: 261 self.info( 262 '[fd %5d] unhandled write error when writing header: %s' 263 % (fd, s)) 264 # trigger cleanup of request 265 del request 266 return False
267
268 - def isReady(self):
269 if self.streamer.caps == None: 270 self.debug('We have no caps yet') 271 return False 272 273 return True
274
275 - def getMaxAllowedClients(self, maxclients):
276 """ 277 maximum number of allowed clients based on soft limit for number of 278 open file descriptors and fd reservation. Increases soft limit to 279 hard limit if possible. 280 """ 281 (softmax, hardmax) = resource.getrlimit(resource.RLIMIT_NOFILE) 282 import sys 283 version = sys.version_info 284 285 if maxclients != -1: 286 neededfds = maxclients + self.__reserve_fds__ 287 288 # Bug in python 2.4.3, see 289 # http://sourceforge.net/tracker/index.php?func=detail& 290 # aid=1494314&group_id=5470&atid=105470 291 if version[:3] == (2, 4, 3) and \ 292 not hasattr(socket, "has_2_4_3_patch"): 293 self.warning( 294 'Setting hardmax to 1024 due to python 2.4.3 bug') 295 hardmax = 1024 296 297 if neededfds > softmax: 298 lim = min(neededfds, hardmax) 299 resource.setrlimit(resource.RLIMIT_NOFILE, (lim, hardmax)) 300 return lim - self.__reserve_fds__ 301 else: 302 return maxclients 303 else: 304 return softmax - self.__reserve_fds__
305
306 - def reachedServerLimits(self):
307 if self.maxclients >= 0 and len(self._requests) >= self.maxclients: 308 return True 309 elif self.maxbandwidth >= 0: 310 # Reject if adding one more client would take us over the limit. 311 if ((len(self._requests) + 1) * 312 self.streamer.getCurrentBitrate() >= self.maxbandwidth): 313 return True 314 return False
315
316 - def _addClient(self, request):
317 """ 318 Add a request, so it can be used for statistics. 319 320 @param request: the request 321 @type request: twisted.protocol.http.Request 322 """ 323 324 fd = request.transport.fileno() 325 self._requests[fd] = request
326
327 - def _logRequestFromIP(self, ip):
328 """ 329 Returns whether we want to log a request from this IP; allows us to 330 filter requests from automated monitoring systems. 331 """ 332 if self.logfilter: 333 return not self.logfilter.isInRange(ip) 334 else: 335 return True
336
337 - def _removeClient(self, request, fd, stats):
338 """ 339 Removes a request and add logging. 340 Note that it does not disconnect the client; it is called in reaction 341 to a client disconnecting. 342 It also removes the keycard if one was created. 343 344 @param request: the request 345 @type request: L{twisted.protocols.http.Request} 346 @param fd: the file descriptor for the client being removed 347 @type fd: L{int} 348 @param stats: the statistics for the removed client 349 @type stats: GValueArray 350 """ 351 352 # PROBE: finishing request; see httpserver.httpserver 353 self.debug('[fd %5d] (ts %f) finishing request %r', 354 request.transport.fileno(), time.time(), request) 355 356 ip = request.getClientIP() 357 if self._logRequestFromIP(ip): 358 d = self.logWrite(fd, ip, request, stats) 359 else: 360 d = defer.succeed(True) 361 self.info('[fd %5d] Client from %s disconnected' % (fd, ip)) 362 363 # We can't call request.finish(), since we already "stole" the fd, we 364 # just loseConnection on the transport directly, and delete the 365 # Request object, after cleaning up the bouncer bits. 366 self.httpauth.cleanupAuth(fd) 367 368 self.debug('[fd %5d] (ts %f) closing transport %r', fd, time.time(), 369 request.transport) 370 # This will close the underlying socket. We first remove the request 371 # from our fd->request map, since the moment we call this the fd might 372 # get re-added. 373 del self._requests[fd] 374 request.transport.loseConnection() 375 376 self.debug('[fd %5d] closed transport %r' % (fd, request.transport)) 377 378 def _done(_): 379 if fd in self._removing: 380 self.debug("client is removed; firing deferred") 381 removeD = self._removing.pop(fd) 382 removeD.callback(None) 383 384 # PROBE: finished request; see httpserver.httpserver 385 self.debug('[fd %5d] (ts %f) finished request %r', 386 fd, time.time(), request)
387 388 d.addCallback(_done) 389 return d
390
391 - def handleAuthenticatedRequest(self, res, request):
392 393 # PROBE: authenticated request; see httpserver.httpfile 394 self.debug('[fd %5d] (ts %f) authenticated request %r', 395 request.transport.fileno(), time.time(), request) 396 397 if request.method == 'GET': 398 self._handleNewClient(request) 399 elif request.method == 'HEAD': 400 self.debug('handling HEAD request') 401 self._writeHeaders(request) 402 request.finish() 403 else: 404 raise AssertionError 405 406 return res
407 408 ### resource.Resource methods 409 410 # this is the callback receiving the request initially 411
412 - def _render(self, request):
413 fd = request.transport.fileno() 414 # we store the fd again in the request using it as an id for later 415 # on, so we can check when an fd went away (being -1) inbetween 416 request.fdIncoming = fd 417 418 # PROBE: incoming request; see httpserver.httpfile 419 self.debug('[fd %5d] (ts %f) incoming request %r', 420 fd, time.time(), request) 421 422 self.info('[fd %5d] Incoming client connection from %s' % ( 423 fd, request.getClientIP())) 424 self.debug('[fd %5d] _render(): request %s' % ( 425 fd, request)) 426 427 if not self.isReady(): 428 return self._handleNotReady(request) 429 elif self.reachedServerLimits(): 430 return self._handleServerFull(request) 431 432 self.debug('_render(): asked for (possible) authentication') 433 d = self.httpauth.startAuthentication(request) 434 d.addCallback(self.handleAuthenticatedRequest, request) 435 # Authentication has failed and we've written a response; nothing 436 # more to do 437 d.addErrback(lambda x: None) 438 439 # we MUST return this from our _render. 440 return server.NOT_DONE_YET
441
442 - def _handleNotReady(self, request):
443 self.debug('Not sending data, it\'s not ready') 444 return server.NOT_DONE_YET
445
446 - def _handleServerFull(self, request):
447 if self._redirectOnFull: 448 self.debug("Redirecting client, client limit %d reached", 449 self.maxclients) 450 error_code = http.FOUND 451 request.setHeader('location', self._redirectOnFull) 452 else: 453 self.debug('Refusing clients, client limit %d reached' % 454 self.maxclients) 455 error_code = http.SERVICE_UNAVAILABLE 456 457 request.setHeader('content-type', 'text/html') 458 459 request.setHeader('server', HTTP_VERSION) 460 request.setResponseCode(error_code) 461 462 return ERROR_TEMPLATE % {'code': error_code, 463 'error': http.RESPONSES[error_code]}
464
465 - def _handleNewClient(self, request):
466 # everything fulfilled, serve to client 467 fdi = request.fdIncoming 468 if not self._writeHeaders(request): 469 self.debug("[fd %5d] not adding as a client" % fdi) 470 return 471 self._addClient(request) 472 473 # take over the file descriptor from Twisted by removing them from 474 # the reactor 475 # spiv told us to remove* on request.transport, and that works 476 # then we figured out that a new request is only a Reader, so we 477 # remove the removedWriter - this is because we never write to the 478 # socket through twisted, only with direct os.write() calls from 479 # _writeHeaders. 480 481 # see http://twistedmatrix.com/trac/ticket/1796 for a guarantee 482 # that this is a supported way of stealing the socket 483 fd = fdi 484 self.debug("[fd %5d] taking away from Twisted" % fd) 485 reactor.removeReader(request.transport) 486 #reactor.removeWriter(request.transport) 487 488 # check if it's really an open fd (i.e. that twisted didn't close it 489 # before the removeReader() call) 490 try: 491 fcntl.fcntl(fd, fcntl.F_GETFL) 492 except IOError, e: 493 if e.errno == errno.EBADF: 494 self.warning("[fd %5d] is not actually open, ignoring" % fd) 495 else: 496 self.warning("[fd %5d] error during check: %s (%d)" % ( 497 fd, e.strerror, e.errno)) 498 return 499 500 # hand it to multifdsink 501 self.streamer.add_client(fd) 502 ip = request.getClientIP() 503 504 # PROBE: started request; see httpfile.httpfile 505 self.debug('[fd %5d] (ts %f) started request %r', 506 fd, time.time(), request) 507 508 self.info('[fd %5d] Started streaming to %s' % (fd, ip))
509 510 render_GET = _render 511 render_HEAD = _render 512 513
514 -class HTTPRoot(web_resource.Resource, log.Loggable):
515 logCategory = "httproot" 516
517 - def getChildWithDefault(self, path, request):
518 # we override this method so that we can look up tree resources 519 # directly without having their parents. 520 # There's probably a more Twisted way of doing this, but ... 521 fullPath = path 522 if request.postpath: 523 fullPath += '/' + string.join(request.postpath, '/') 524 self.debug("[fd %5d] Incoming request %r for path %s", 525 request.transport.fileno(), request, fullPath) 526 r = web_resource.Resource.getChildWithDefault(self, fullPath, request) 527 self.debug("Returning resource %r" % r) 528 return r
529