EVOLUTION-MANAGER
Edit File: request.py
from __future__ import unicode_literals import base64 import logging import mimetypes import os import random import shutil import sys from collections import OrderedDict from io import BytesIO from json import loads as json_loads import six from six.moves.http_client import UNAUTHORIZED, NOT_MODIFIED from six.moves.http_cookiejar import Cookie, CookieJar, MozillaCookieJar from six.moves.urllib.error import HTTPError, URLError from six.moves.urllib.parse import parse_qsl, urlencode, urlparse, urlunparse from six.moves.urllib.request import ( BaseHandler, HTTPBasicAuthHandler, HTTPCookieProcessor, HTTPDigestAuthHandler, HTTPErrorProcessor, HTTPPasswordMgr, ProxyHandler, Request as URLRequest, build_opener, install_opener, urlopen) from rbtools import get_package_version from rbtools.api.cache import APICache from rbtools.api.errors import APIError, create_api_error, ServerInterfaceError from rbtools.utils.encoding import force_bytes, force_unicode from rbtools.utils.filesystem import get_home_path # Python 2.7.9+ added strict HTTPS certificate validation (finally). These APIs # don't exist everywhere so soft-import them. try: import ssl from six.moves.urllib.request import HTTPSHandler except ImportError: ssl = None HTTPSHandler = None RBTOOLS_COOKIE_FILE = '.rbtools-cookies' RB_COOKIE_NAME = 'rbsessionid' class HttpRequest(object): """A high-level HTTP request. This is used to construct an HTTP request to a Review Board server. It takes in the URL, HTTP method, any query arguments and headers needed to perform the request, and provides methods for building a request payload compatible with the Review Board API. Instances are intentionally generic and not tied to :py:mod:`urllib2`, providing API stability and a path toward eventually interfacing with other HTTP backends. Attributes: headers (dict): Any HTTP headers to provide in the request. url (unicode): The URL to request. """ def __init__(self, url, method='GET', query_args={}, headers={}): """Initialize the HTTP request. Args: url (bytes or unicode): The URL to request. method (bytes or unicode, optional): The HTTP method to send to the server. query_args (dict, optional): Any query arguments to add to the URL. All keys and values are expected to be strings (either byte strings or unicode strings). headers (dict, optional): Any HTTP headers to provide in the request. All keys and values are expected to be strings (either byte strings or unicode strings). """ self.method = method self._fields = OrderedDict() self._files = OrderedDict() # Replace all underscores in each query argument # key with dashes. query_args = { self.encode_url_key(key): self.encode_url_value(key, value) for key, value in six.iteritems(query_args) } # Make sure headers are always in the native string type. self.headers = { str(key): str(value) for key, value in six.iteritems(headers) } # Add the query arguments to the url url_parts = list(urlparse(str(url))) query = dict(parse_qsl(url_parts[4])) query.update(query_args) url_parts[4] = urlencode(query) self.url = urlunparse(url_parts) def encode_url_key(self, key): """Encode the given key for inclusion in a URL. Args: key (unicode): The key that is being encoded. Raises: ValueError: The given key was neither a unicode string or byte string. Returns: unicode: The key encoded as a unicode string. """ return force_unicode(key).replace('_', '-') def encode_url_value(self, key, value): """Encode the given value for inclusion in a URL. Args: key (unicode): The field name for which the value is being encoded. This argument is only used to generate an error message. value (object): The value to be encoded. Raises: ValueError: The given value could not be encoded. Returns: unicode: The value encoded as a unicode string. """ if isinstance(value, bool): if value: value = '1' else: value = '0' elif isinstance(value, six.integer_types + (float,)): value = six.text_type(value) elif isinstance(value, (bytes, six.text_type)): value = force_unicode(value) else: raise ValueError( 'Could not encode value %r for key %s: expected int, float, ' 'bool, or string type; got %s instead' % (key, value, type(value).__name__) ) return value @property def method(self): """The HTTP method to send to the server.""" return self._method @method.setter def method(self, method): """The HTTP method to send to the server. Args: method (bytes or unicode): The HTTP method to send to the server. """ self._method = str(method) def add_field(self, name, value): """Add a form-data field for the request. Args: name (bytes or unicode): The name of the field. value (bytes or unicode): The value to send for the field. For backwards-compatibility, other values will be converted to strings. This may turn into a warning in future releases. Callers are encouraged to only send strings. """ if not isinstance(value, (bytes, six.text_type)): value = str(value) self._fields[force_bytes(name)] = force_bytes(value) def add_file(self, name, filename, content, mimetype=None): """Add an uploaded file for the request. Args: name (bytes or unicode): The name of the field representing the file. filename (bytes or unicode): The filename. content (bytes or unicode): The contents of the file. mimetype (bytes or unicode, optional): The optional mimetype of the content. If not provided, it will be guessed. """ mimetype = force_bytes( mimetypes.guess_type(force_unicode(filename))[0] or b'application/octet-stream') self._files[force_bytes(name)] = { 'filename': force_bytes(filename), 'content': force_bytes(content), 'mimetype': mimetype, } def encode_multipart_formdata(self): """Encode the request into a multi-aprt form-data payload. Returns: tuple: A tuple containing: * The content type (:py:class:`unicode`) * The form-data payload (:py:class:`bytes`) If there are no fields or files in the request, both values will be ``None``. """ if not (self._fields or self._files): return None, None NEWLINE = b'\r\n' BOUNDARY = self._make_mime_boundary() content = BytesIO() for key, value in six.iteritems(self._fields): content.write(b'--%s%s' % (BOUNDARY, NEWLINE)) content.write(b'Content-Disposition: form-data; name="%s"%s' % (key, NEWLINE)) content.write(NEWLINE) content.write(value) content.write(NEWLINE) for key, file_info in six.iteritems(self._files): content.write(b'--%s%s' % (BOUNDARY, NEWLINE)) content.write(b'Content-Disposition: form-data; name="%s"; ' % key) content.write(b'filename="%s"%s' % (file_info['filename'], NEWLINE)) content.write(b'Content-Type: %s%s' % (file_info['mimetype'], NEWLINE)) content.write(NEWLINE) content.write(file_info['content']) content.write(NEWLINE) content.write(b'--%s--%s%s' % (BOUNDARY, NEWLINE, NEWLINE)) content_type = ('multipart/form-data; boundary=%s' % BOUNDARY.decode('utf-8')) return content_type, content.getvalue() def _make_mime_boundary(self): """Create a mime boundary. This exists because :py:func:`mimetools.choose_boundary` is gone in Python 3.x, and :py:func:`email.generator._make_boundary` isn't really appropriate to use here. Returns: bytes: The generated boundary. """ fmt = '%%0%dd' % len(repr(sys.maxsize - 1)) token = random.randrange(sys.maxsize) return (b'=' * 15) + (fmt % token).encode('utf-8') + b'==' class Request(URLRequest): """A request which contains a method attribute.""" def __init__(self, url, body=b'', headers={}, method='PUT'): normalized_headers = { str(key): str(value) for key, value in six.iteritems(headers) } URLRequest.__init__(self, str(url), body, normalized_headers) self.method = str(method) def get_method(self): return self.method class PresetHTTPAuthHandler(BaseHandler): """Handler that presets the use of HTTP Basic Auth.""" handler_order = 480 # After Basic auth AUTH_HEADER = 'Authorization' def __init__(self, url, password_mgr): self.url = url self.password_mgr = password_mgr self.used = False def reset(self, username, password): self.password_mgr.rb_user = username self.password_mgr.rb_pass = password self.used = False def http_request(self, request): if not self.used: if self.password_mgr.api_token: request.add_header(self.AUTH_HEADER, 'token %s' % self.password_mgr.api_token) self.used = True elif self.password_mgr.rb_user: # Note that we call password_mgr.find_user_password to get the # username and password we're working with. username, password = \ self.password_mgr.find_user_password('Web API', self.url) raw = '%s:%s' % (username, password) header = (b'Basic %s' % base64.b64encode(raw.encode('utf-8')).strip()) request.add_header(self.AUTH_HEADER, header.decode('utf-8')) self.used = True return request https_request = http_request class ReviewBoardHTTPErrorProcessor(HTTPErrorProcessor): """Processes HTTP error codes. Python's built-in error processing understands 2XX responses as successful, but processes 3XX as an error. This handler ensures that all valid responses from the API are processed as such. """ def http_response(self, request, response): if not (200 <= response.code < 300 or response.code == NOT_MODIFIED): response = self.parent.error('http', request, response, response.code, response.msg, response.info()) return response https_response = http_response class ReviewBoardHTTPBasicAuthHandler(HTTPBasicAuthHandler): """Custom Basic Auth handler that doesn't retry excessively. urllib's HTTPBasicAuthHandler retries over and over, which is useless. This subclass only retries once to make sure we've attempted with a valid username and password. It will then fail so we can use our own retry handler. This also supports two-factor auth, for Review Board servers that support it. When requested by the server, the client will be prompted for a one-time password token, which would be sent generally through a mobile device. In this case, the client will prompt up to a set number of times until a valid token is entered. """ OTP_TOKEN_HEADER = 'X-ReviewBoard-OTP' MAX_OTP_TOKEN_ATTEMPTS = 2 def __init__(self, *args, **kwargs): """Initialize the Basic Auth handler. Args: *args (tuple): Positional arguments to pass to the parent class. **kwargs (dict): Keyword arguments to pass to the parent class. """ HTTPBasicAuthHandler.__init__(self, *args, **kwargs) self._tried_login = False self._otp_token_method = None self._otp_token_attempts = 0 self._last_otp_token = None def http_error_auth_reqed(self, authreq, host, req, headers): """Handle an HTTP 401 Unauthorized from an API request. This will start by checking whether a two-factor authentication token is required by the server, and which method it will be sent by (SMS or token generator application), before handing back to the parent class, which will then call into our custom :py:meth:`retry_http_basic_auth`. Args: authreq (unicode): The authentication request type. host (unicode): The URL being accessed. req (rbtools.api.request.Request): The API request being made. headers (dict): The headers sent in the Unauthorized error response. Returns: httplib.HTTPResponse: If attempting another request, this will be the HTTP response from that request. This will be ``None`` if not making another request. Raises: urllib2.URLError: The HTTP request resulted in an error. If this is an :http:`401`, it may be handled by this class again. """ otp_header = headers.get(self.OTP_TOKEN_HEADER, '') if otp_header and otp_header.startswith('required'): try: self._otp_token_method = otp_header.split(';')[1].strip() except IndexError: logging.error('Invalid %s header value: "%s". This header ' 'is needed for two-factor authentication to ' 'work. Please report this!', self.OTP_TOKEN_HEADER, otp_header) return None return HTTPBasicAuthHandler.http_error_auth_reqed( self, authreq, host, req, headers) def retry_http_basic_auth(self, host, request, realm): """Attempt another HTTP Basic Auth request. This will determine if another request should be made (based on previous attempts and 2FA requirements. Based on this, it may make another attempt. Args: host (unicode): The URL being accessed. request (rbtools.api.request.Request): The API request being made. realm (unicode): The Basic Auth realm, which will be used to look up any stored passwords. Returns: httplib.HTTPResponse: If attempting another request, this will be the HTTP response from that request. This will be ``None`` if not making another request. Raises: urllib2.URLError: The HTTP request resulted in an error. If this is an :http:`401`, it may be handled by this class again. """ # First, check if we even want to try again. If two-factor # authentication is disabled and we've made one username/password # attempt, or it's enabled and we've made too many 2FA token attempts, # we're done. if (self._otp_token_attempts > self.MAX_OTP_TOKEN_ATTEMPTS or (not self._otp_token_method and self._tried_login)): return None # Next, figure out what credentials we'll be working with. if self._otp_token_attempts > 0: # We've made at least one 2FA attempt. Reuse the login and # password so we don't prompt for it again. user = self.passwd.rb_user password = self.passwd.rb_pass else: # We don't have a login and password recorded for this request. # Request one from the user. user, password = self.passwd.find_user_password(realm, host) if password is None: return None # If the response had sent a X-ReviewBoard-OTP header stating that # a 2FA token is required, request it from the user. if self._otp_token_method: otp_token = ( self.passwd.get_otp_token(request.get_full_url(), self._otp_token_method) .encode('utf-8') ) else: otp_token = None # Prepare some auth headers and then check if we've already made an # attempt with them. raw = '%s:%s' % (user, password) auth = b'Basic %s' % base64.b64encode(raw.encode('utf-8')).strip() if (request.get_header(self.auth_header) == auth and (not otp_token or otp_token == self._last_otp_token)): # We've already tried with these credentials/token, and the # attempt failed. No point trying again and wasting a login # attempt. return None # Based on the above, set the headers for the next login attempt and # try again. If it fails, we'll end up back in http_error_auth_reqed(), # starting again but with the recorded state. request.add_unredirected_header(self.auth_header, auth.decode('utf-8')) if otp_token: request.add_unredirected_header(self.OTP_TOKEN_HEADER, otp_token) self._otp_token_attempts += 1 self._last_otp_token = otp_token self._tried_login = True return self.parent.open(request, timeout=request.timeout) class ReviewBoardHTTPPasswordMgr(HTTPPasswordMgr): """Adds HTTP authentication support for URLs.""" def __init__(self, reviewboard_url, rb_user=None, rb_pass=None, api_token=None, auth_callback=None, otp_token_callback=None): HTTPPasswordMgr.__init__(self) self.passwd = {} self.rb_url = reviewboard_url self.rb_user = rb_user self.rb_pass = rb_pass self.api_token = api_token self.auth_callback = auth_callback self.otp_token_callback = otp_token_callback def find_user_password(self, realm, uri): if realm == 'Web API': if self.auth_callback: username, password = self.auth_callback(realm, uri, username=self.rb_user, password=self.rb_pass) self.rb_user = username self.rb_pass = password return self.rb_user, self.rb_pass else: # If this is an auth request for some other domain (since HTTP # handlers are global), fall back to standard password management. return HTTPPasswordMgr.find_user_password(self, realm, uri) def get_otp_token(self, uri, method): if self.otp_token_callback: return self.otp_token_callback(uri, method) def create_cookie_jar(cookie_file=None): """Return a cookie jar backed by cookie_file If cooie_file is not provided, we will default it. If the cookie_file does not exist, we will create it with the proper permissions. In the case where we default cookie_file, and it does not exist, we will attempt to copy the .post-review-cookies.txt file. """ home_path = get_home_path() if not cookie_file: cookie_file = os.path.join(home_path, RBTOOLS_COOKIE_FILE) post_review_cookies = os.path.join(home_path, '.post-review-cookies.txt') if (not os.path.isfile(cookie_file) and os.path.isfile(post_review_cookies)): try: shutil.copyfile(post_review_cookies, cookie_file) os.chmod(cookie_file, 0o600) except IOError as e: logging.warning('There was an error while copying ' 'legacy post-review cookies: %s', e) if not os.path.isfile(cookie_file): try: open(cookie_file, 'w').close() os.chmod(cookie_file, 0o600) except IOError as e: logging.warning('There was an error while creating a ' 'cookie file: %s', e) return MozillaCookieJar(cookie_file), cookie_file class ReviewBoardServer(object): """Represents a Review Board server we are communicating with. Provides methods for executing HTTP requests on a Review Board server's Web API. The ``auth_callback`` parameter can be used to specify a callable which will be called when authentication fails. This callable will be passed the realm, and url of the Review Board server and should return a 2-tuple of username, password. The user can be prompted for their credentials using this mechanism. """ def __init__(self, url, cookie_file=None, username=None, password=None, api_token=None, agent=None, session=None, disable_proxy=False, auth_callback=None, otp_token_callback=None, verify_ssl=True, save_cookies=True, ext_auth_cookies=None, ca_certs=None, client_key=None, client_cert=None): if not url.endswith('/'): url += '/' self.url = url + 'api/' self.save_cookies = save_cookies self.ext_auth_cookies = ext_auth_cookies if self.save_cookies: self.cookie_jar, self.cookie_file = create_cookie_jar( cookie_file=cookie_file) try: self.cookie_jar.load(ignore_expires=True) except IOError: pass else: self.cookie_jar = CookieJar() self.cookie_file = None if self.ext_auth_cookies: try: self.cookie_jar.load(ext_auth_cookies, ignore_expires=True) except IOError as e: logging.critical('There was an error while loading a ' 'cookie file: %s', e) pass # Get the cookie domain from the url. If the domain # does not contain a '.' (e.g. 'localhost'), we assume # it is a local domain and suffix it (See RFC 2109). parsed_url = urlparse(url) self.domain = parsed_url[1].partition(':')[0] # Remove Port. if self.domain.count('.') < 1: self.domain = '%s.local' % self.domain if session: cookie = Cookie( version=0, name=RB_COOKIE_NAME, value=session, port=None, port_specified=False, domain=self.domain, domain_specified=True, domain_initial_dot=True, path=parsed_url[2], path_specified=True, secure=False, expires=None, discard=False, comment=None, comment_url=None, rest={'HttpOnly': None}) self.cookie_jar.set_cookie(cookie) if self.save_cookies: self.cookie_jar.save() if username: # If the username parameter is given, we have to clear the session # cookie manually or it will override the username:password # combination retrieved from the authentication callback. try: self.cookie_jar.clear(self.domain, parsed_url[2], RB_COOKIE_NAME) except KeyError: pass # Set up the HTTP libraries to support all of the features we need. password_mgr = ReviewBoardHTTPPasswordMgr(self.url, username, password, api_token, auth_callback, otp_token_callback) self.preset_auth_handler = PresetHTTPAuthHandler(self.url, password_mgr) handlers = [] if not verify_ssl: context = ssl._create_unverified_context() else: context = ssl.create_default_context(cafile=ca_certs) if client_cert or client_key: context.load_cert_chain(client_cert, client_key) handlers.append(HTTPSHandler(context=context)) if disable_proxy: handlers.append(ProxyHandler({})) handlers += [ HTTPCookieProcessor(self.cookie_jar), ReviewBoardHTTPBasicAuthHandler(password_mgr), HTTPDigestAuthHandler(password_mgr), self.preset_auth_handler, ReviewBoardHTTPErrorProcessor(), ] if agent: self.agent = agent else: self.agent = ('RBTools/' + get_package_version()).encode('utf-8') opener = build_opener(*handlers) opener.addheaders = [ (str('User-agent'), str(self.agent)), ] install_opener(opener) self._cache = None self._urlopen = urlopen def enable_cache(self, cache_location=None, in_memory=False): """Enable caching for all future HTTP requests. The cache will be created at the default location if none is provided. If the in_memory parameter is True, the cache will be created in memory instead of on disk. This overrides the cache_location parameter. """ if not self._cache: self._cache = APICache(create_db_in_memory=in_memory, db_location=cache_location) self._urlopen = self._cache.make_request def login(self, username, password): """Reset the user information""" self.preset_auth_handler.reset(username, password) def logout(self): """Logs the user out of the session.""" self.preset_auth_handler.reset(None, None) self.make_request(HttpRequest('%ssession/' % self.url, method='DELETE')) self.cookie_jar.clear(self.domain) if self.save_cookies: self.cookie_jar.save() def process_error(self, http_status, data): """Processes an error, raising an APIError with the information.""" # In Python 3, the data can be bytes, not str, and json.loads # explicitly requires decoded strings. data = force_unicode(data) try: rsp = json_loads(data) assert rsp['stat'] == 'fail' logging.debug('Got API Error %d (HTTP code %d): %s', rsp['err']['code'], http_status, rsp['err']['msg']) logging.debug('Error data: %r', rsp) raise create_api_error(http_status, rsp['err']['code'], rsp, rsp['err']['msg']) except ValueError: logging.debug('Got HTTP error: %s: %s', http_status, data) raise APIError(http_status, None, None, data) def make_request(self, request): """Perform an http request. The request argument should be an instance of 'rbtools.api.request.HttpRequest'. """ try: content_type, body = request.encode_multipart_formdata() headers = request.headers if body: headers.update({ 'Content-Type': content_type, 'Content-Length': str(len(body)), }) else: headers['Content-Length'] = '0' rsp = self._urlopen(Request( request.url, body, headers, request.method)) except HTTPError as e: self.process_error(e.code, e.read()) except URLError as e: raise ServerInterfaceError('%s' % e.reason) if self.save_cookies: try: self.cookie_jar.save() except IOError: pass return rsp