EVOLUTION-MANAGER
Edit File: svn.py
"""A client for Subversion.""" from __future__ import unicode_literals import logging import os import posixpath import re import sys from xml.etree import ElementTree import six from six.moves import map from six.moves.urllib.parse import unquote from rbtools.api.errors import APIError from rbtools.clients import PatchResult, RepositoryInfo, SCMClient from rbtools.clients.errors import (AuthenticationError, InvalidRevisionSpecError, MinimumVersionError, OptionsCheckError, SCMError, TooManyRevisionsError) from rbtools.utils.checks import (check_gnu_diff, check_install, is_valid_version) from rbtools.utils.console import get_pass from rbtools.utils.diffs import (filename_match_any_patterns, filter_diff, normalize_patterns) from rbtools.utils.filesystem import (make_empty_files, make_tempfile, walk_parents) from rbtools.utils.process import execute _fs_encoding = sys.getfilesystemencoding() class SVNClient(SCMClient): """A client for Subversion. This is a wrapper around the svn executable that fetches repository information and generates compatible diffs. """ name = 'Subversion' supports_diff_exclude_patterns = True supports_patch_revert = True INDEX_SEP = b'=' * 67 INDEX_FILE_RE = re.compile(b'^Index: (.+?)(?:\t\((added|deleted)\))?\n$') # Match the diff control lines generated by 'svn diff'. DIFF_ORIG_FILE_LINE_RE = re.compile(br'^---\s+.*\s+\(.*\)') DIFF_NEW_FILE_LINE_RE = re.compile(br'^\+\+\+\s+.*\s+\(.*\)') DIFF_COMPLETE_REMOVAL_RE = re.compile(br'^@@ -1,\d+ \+0,0 @@$') ADDED_FILES_RE = re.compile(br'^Index:\s+(\S+)\t\(added\)$', re.M) DELETED_FILES_RE = re.compile(br'^Index:\s+(\S+)\t\(deleted\)$', re.M) REVISION_WORKING_COPY = '--rbtools-working-copy' REVISION_CHANGELIST_PREFIX = '--rbtools-changelist:' VERSION_NUMBER_RE = re.compile(r'(\d+)\.(\d+)\.(\d+)') SHOW_COPIES_AS_ADDS_MIN_VERSION = (1, 7, 0) PATCH_MIN_VERSION = (1, 7, 0) def __init__(self, **kwargs): """Initialize the client. Args: **kwargs (dict): Keyword arguments to pass through to the superclass. """ super(SVNClient, self).__init__(**kwargs) self._svn_info_cache = {} self._svn_repository_info_cache = None def get_repository_info(self): """Return repository information for the current SVN working tree. Returns: rbtools.clients.RepositoryInfo: The repository info structure. """ if self._svn_repository_info_cache: return self._svn_repository_info_cache if not check_install(['svn', 'help']): logging.debug('Unable to execute "svn help": skipping SVN') return None # Get the SVN repository path (either via a working copy or # a supplied URI) svn_info_params = ['info'] if getattr(self.options, 'repository_url', None): svn_info_params.append(self.options.repository_url) data = self._run_svn(svn_info_params, ignore_errors=True, log_output_on_error=False) m = re.search('^Repository Root: (.+)$', data, re.M) if not m: return None path = m.group(1) m = re.search('^Working Copy Root Path: (.+)$', data, re.M) if m: local_path = m.group(1) else: local_path = None m = re.search('^URL: (.+)$', data, re.M) if not m: return None base_path = m.group(1)[len(path):] or '/' m = re.search('^Repository UUID: (.+)$', data, re.M) if not m: return None uuid = m.group(1) # Now that we know it's SVN, make sure we have GNU diff installed, # and error out if we don't. check_gnu_diff() # Grab version of SVN client and store as a tuple in the form: # (major_version, minor_version, micro_version) ver_string = self._run_svn(['--version', '-q'], ignore_errors=True) m = self.VERSION_NUMBER_RE.match(ver_string) if not m: logging.warn('Unable to parse SVN client version triple from ' '"%s". Assuming version 0.0.0.', ver_string.strip()) self.subversion_client_version = (0, 0, 0) else: self.subversion_client_version = tuple(map(int, m.groups())) self._svn_repository_info_cache = SVNRepositoryInfo( path=path, base_path=base_path, local_path=local_path, uuid=uuid) return self._svn_repository_info_cache def parse_revision_spec(self, revisions=[]): """Parse the given revision spec. Args: revisions (list of unicode, optional): A list of revisions as specified by the user. Items in the list do not necessarily represent a single revision, since the user can use SCM-native syntaxes such as ``r1..r2`` or ``r1:r2``. SCMTool-specific overrides of this method are expected to deal with such syntaxes. Raises: rbtools.clients.errors.InvalidRevisionSpecError: The given revisions could not be parsed. rbtools.clients.errors.TooManyRevisionsError: The specified revisions list contained too many revisions. Returns: dict: A dictionary with the following keys: ``base`` (:py:class:`unicode`): A revision to use as the base of the resulting diff. ``tip`` (:py:class:`unicode`): A revision to use as the tip of the resulting diff. These will be used to generate the diffs to upload to Review Board (or print). The diff for review will include the changes in (base, tip]. If a single revision is passed in, this will return the parent of that revision for "base" and the passed-in revision for "tip". If zero revisions are passed in, this will return the most recently checked-out revision for 'base' and a special string indicating the working copy for "tip". The SVN SCMClient never fills in the 'parent_base' key. Users who are using other patch-stack tools who want to use parent diffs with SVN will have to generate their diffs by hand. """ n_revisions = len(revisions) if n_revisions == 1 and ':' in revisions[0]: revisions = revisions[0].split(':') n_revisions = len(revisions) if n_revisions == 0: # Most recent checked-out revision -- working copy # TODO: this should warn about mixed-revision working copies that # affect the list of files changed (see bug 2392). return { 'base': 'BASE', 'tip': self.REVISION_WORKING_COPY, } elif n_revisions == 1: # Either a numeric revision (n-1:n) or a changelist revision = revisions[0] try: revision = self._convert_symbolic_revision(revision) return { 'base': revision - 1, 'tip': revision, } except ValueError: # It's not a revision--let's try a changelist. This only makes # sense if we have a working copy. if not self.options.repository_url: status = self._run_svn( ['status', '--cl', six.text_type(revision), '--ignore-externals', '--xml'], results_unicode=False) cl = ElementTree.fromstring(status).find('changelist') if cl is not None: # TODO: this should warn about mixed-revision working # copies that affect the list of files changed (see # bug 2392). return { 'base': 'BASE', 'tip': self.REVISION_CHANGELIST_PREFIX + revision } raise InvalidRevisionSpecError( '"%s" does not appear to be a valid revision or ' 'changelist name' % revision) elif n_revisions == 2: # Diff between two numeric revisions try: return { 'base': self._convert_symbolic_revision(revisions[0]), 'tip': self._convert_symbolic_revision(revisions[1]), } except ValueError: raise InvalidRevisionSpecError( 'Could not parse specified revisions: %s' % revisions) else: raise TooManyRevisionsError def _convert_symbolic_revision(self, revision): """Convert a symbolic revision to a numbered revision. Args: revision (unicode): The name of a symbolic revision. Raises: ValueError: The given revision could not be converted. Returns: int: The revision number. """ command = ['-r', six.text_type(revision), '-l', '1'] if getattr(self.options, 'repository_url', None): command.append(self.options.repository_url) log = self.svn_log_xml(command) if log is not None: try: root = ElementTree.fromstring(log) except ValueError as e: # _convert_symbolic_revision() nominally raises a ValueError to # indicate any failure to determine the revision number from # the log entry. Here, we explicitly catch a ValueError from # ElementTree and raise a generic SCMError so that this # specific failure to parse the XML log output is # differentiated from the nominal case. raise SCMError('Failed to parse svn log - %s.' % e) logentry = root.find('logentry') if logentry is not None: return int(logentry.attrib['revision']) raise ValueError def scan_for_server(self, repository_info): """Find the Review Board server matching this repository. Args: repository_info (rbtools.clients.RepositoryInfo): The repository information structure. Returns: unicode: The Review Board server URL, if available. """ # Scan first for dot files, since it's faster and will cover the # user's $HOME/.reviewboardrc server_url = super(SVNClient, self).scan_for_server(repository_info) if server_url: return server_url return self.scan_for_server_property(repository_info) def scan_for_server_property(self, repository_info): """Scan for the reviewboard:url property in the repository. This method looks for the reviewboard:url property, which is an alternate (legacy) way of configuring the Review Board server URL inside a subversion repository. Args: repository_info (rbtools.clients.RepositoryInfo): The repository information structure. Returns: unicode: The Review Board server URL, if available. """ def get_url_prop(path): url = self._run_svn(['propget', 'reviewboard:url', path], with_errors=False, extra_ignore_errors=(1,)).strip() return url or None for path in walk_parents(os.getcwd()): if not os.path.exists(os.path.join(path, '.svn')): break prop = get_url_prop(path) if prop: return prop return get_url_prop(repository_info.path) def get_raw_commit_message(self, revisions): """Return the raw commit message(s) for the given revisions. Args: revisions (dict): Revisions to get the commit messages for. This will contain ``tip`` and ``base`` keys. Returns: unicode: The commit messages for all the requested revisions. """ base = six.text_type(revisions['base']) tip = six.text_type(revisions['tip']) if (tip == SVNClient.REVISION_WORKING_COPY or tip.startswith(SVNClient.REVISION_CHANGELIST_PREFIX)): return '' command = ['-r', '%s:%s' % (base, tip)] if getattr(self.options, 'repository_url', None): command.append(self.options.repository_url) log = self.svn_log_xml(command) try: root = ElementTree.fromstring(log) except ValueError as e: raise SCMError('Failed to parse svn log: %s' % e) # We skip the first commit message, because we want commit messages # corresponding to the changes that will be included in the diff. messages = root.findall('.//msg')[1:] return '\n\n'.join(message.text for message in messages) def diff(self, revisions, include_files=[], exclude_patterns=[], no_renames=False, extra_args=[]): """Perform a diff in a Subversion repository. If the given revision spec is empty, this will do a diff of the modified files in the working directory. If the spec is a changelist, it will do a diff of the modified files in that changelist. If the spec is a single revision, it will show the changes in that revision. If the spec is two revisions, this will do a diff between the two revisions. SVN repositories do not support branches of branches in a way that makes parent diffs possible, so we never return a parent diff. Args: revisions (dict): A dictionary of revisions, as returned by :py:meth:`parse_revision_spec`. include_files (list of unicode, optional): A list of files to whitelist during the diff generation. exclude_patterns (list of unicode, optional): A list of shell-style glob patterns to blacklist during diff generation. extra_args (list, unused): Additional arguments to be passed to the diff generation. Unused for SVN. Returns: dict: A dictionary containing the following keys: ``diff`` (:py:class:`bytes`): The contents of the diff to upload. """ repository_info = self.get_repository_info() # SVN paths are always relative to the root of the repository, so we # compute the current path we are checked out at and use that as the # current working directory. We use / for the base_dir because we do # not normalize the paths to be filesystem paths, but instead use SVN # paths. exclude_patterns = normalize_patterns(exclude_patterns, '/', repository_info.base_path) # Keep track of information needed for handling empty files later. empty_files_revisions = { 'base': None, 'tip': None, } base = six.text_type(revisions['base']) tip = six.text_type(revisions['tip']) diff_cmd = ['diff', '--diff-cmd=diff', '--notice-ancestry'] changelist = None if tip == self.REVISION_WORKING_COPY: # Posting the working copy diff_cmd.extend(['-r', base]) elif tip.startswith(self.REVISION_CHANGELIST_PREFIX): # Posting a changelist changelist = tip[len(self.REVISION_CHANGELIST_PREFIX):] diff_cmd.extend(['--changelist', changelist]) else: # Diff between two separate revisions. Behavior depends on whether # or not there's a working copy if self.options.repository_url: # No working copy--create 'old' and 'new' URLs if len(include_files) == 1: # If there's a single file or directory passed in, we use # that as part of the URL instead of as a separate # filename. repository_info.set_base_path(include_files[0]) include_files = [] new_url = (repository_info.path + repository_info.base_path + '@' + tip) # When the source revision is '0', assume the user wants to # upload a diff containing all the files in 'base_path' as # new files. If the base path within the repository is added to # both the old and new URLs, `svn diff` will error out, since # the base_path didn't exist at revision 0. To avoid that # error, use the repository's root URL as the source for the # diff. if base == '0': old_url = repository_info.path + '@' + base else: old_url = (repository_info.path + repository_info.base_path + '@' + base) diff_cmd.extend([old_url, new_url]) empty_files_revisions['base'] = '(revision %s)' % base empty_files_revisions['tip'] = '(revision %s)' % tip else: # Working copy--do a normal range diff diff_cmd.extend(['-r', '%s:%s' % (base, tip)]) empty_files_revisions['base'] = '(revision %s)' % base empty_files_revisions['tip'] = '(revision %s)' % tip diff_cmd.extend(include_files) # Check for and validate --svn-show-copies-as-adds option, or evaluate # working copy to determine if scheduled commit will contain # addition-with-history commit. When this case occurs then # --svn-show-copies-as-adds must be specified. Note: this only # pertains to local modifications in a working copy and not diffs # between specific numeric revisions. if (((tip == self.REVISION_WORKING_COPY) or changelist) and is_valid_version(self.subversion_client_version, self.SHOW_COPIES_AS_ADDS_MIN_VERSION)): svn_show_copies_as_adds = getattr( self.options, 'svn_show_copies_as_adds', None) if svn_show_copies_as_adds is None: if self.history_scheduled_with_commit(changelist, include_files, exclude_patterns): sys.stderr.write( 'One or more files in your changeset has history ' 'scheduled with commit. Please try again with ' '"--svn-show-copies-as-adds=y/n".\n') sys.exit(1) else: if svn_show_copies_as_adds in 'Yy': diff_cmd.append('--show-copies-as-adds') diff = self._run_svn(diff_cmd, split_lines=True, results_unicode=False, log_output_on_error=False) diff = self.handle_renames(diff) if self.supports_empty_files(): diff = self._handle_empty_files(diff, diff_cmd, empty_files_revisions) diff = self.convert_to_absolute_paths(diff, repository_info) if exclude_patterns: diff = filter_diff(diff, self.INDEX_FILE_RE, exclude_patterns) return { 'diff': b''.join(diff), } def history_scheduled_with_commit(self, changelist, include_files, exclude_patterns): """Return whether any files have history scheduled. Args: changelist (unicode): The changelist name, if specified. include_files (list of unicode): A list of files to whitelist during the diff generation. exclude_patterns (list of unicode): A list of shell-style glob patterns to blacklist during diff generation. Returns: bool: ``True`` if any new files have been scheduled including their history. """ status_cmd = ['status', '-q', '--ignore-externals'] if changelist: status_cmd.extend(['--changelist', changelist]) if include_files: status_cmd.extend(include_files) for p in self._run_svn(status_cmd, split_lines=True, results_unicode=False): try: if p[3:4] == b'+': if exclude_patterns: # We found a file with history, but first we must make # sure that it is not being excluded. filename = p[8:].rstrip().decode(_fs_encoding) should_exclude = filename_match_any_patterns( filename, exclude_patterns, self.get_repository_info().base_path) if not should_exclude: return True else: return True except IndexError: # This may be some other output, or just doesn't have the # data we're looking for. Move along. pass return False def find_copyfrom(self, path): """Find the source filename for copied files. The output of 'svn info' reports the "Copied From" header when invoked on the exact path that was copied. If the current file was copied as a part of a parent or any further ancestor directory, 'svn info' will not report the origin. Thus it is needed to ascend from the path until either a copied path is found or there are no more path components to try. Args: path (unicode): The filename of the copied file. Returns: unicode: The filename of the source of the copy. """ def smart_join(p1, p2): if p2: return os.path.join(p1, p2) else: return p1 path1 = path path2 = None while path1: info = self.svn_info(path1, ignore_errors=True) or {} url = info.get('Copied From URL', None) if url: root = info['Repository Root'] from_path1 = unquote(url[len(root):]) return smart_join(from_path1, path2) if info.get('Schedule', None) != 'normal': # Not added as a part of the parent directory, bail out return None # Strip one component from path1 to path2 path1, tmp = os.path.split(path1) if path1 == '' or path1 == '/': path1 = None else: path2 = smart_join(tmp, path2) return None def handle_renames(self, diff_content): """Fix up diff headers to properly show renames. The output of :command:`svn diff` is incorrect when the file in question came into being via svn mv/cp. Although the patch for these files are relative to its parent, the diff header doesn't reflect this. This function fixes the relevant section headers of the patch to portray this relationship. Args: diff_content (bytes): The content of the diffs. Returns: bytes: The processed diff. """ # svn diff against a repository URL on two revisions appears to # handle moved files properly, so only adjust the diff file names # if they were created using a working copy. if self.options.repository_url: return diff_content result = [] num_lines = len(diff_content) i = 0 while i < num_lines: if (i + 4 < num_lines and self.INDEX_FILE_RE.match(diff_content[i]) and diff_content[i + 1][:-1] == self.INDEX_SEP and self.DIFF_ORIG_FILE_LINE_RE.match(diff_content[i + 2]) and self.DIFF_NEW_FILE_LINE_RE.match(diff_content[i + 3])): from_line = diff_content[i + 2] to_line = diff_content[i + 3] # If the file is marked completely removed, bail out with the # original diff. The reason for this is that # ``svn diff --notice-ancestry`` generates two diffs for a # replaced file: one as a complete deletion, and one as a new # addition. If it was replaced with history, though, we need to # preserve the file name in the "deletion" part, or the patch # won't apply. if self.DIFF_COMPLETE_REMOVAL_RE.match(diff_content[i + 4]): result.extend(diff_content[i:i + 5]) else: to_file, _ = self.parse_filename_header(to_line[4:]) copied_from = self.find_copyfrom(to_file) result.append(diff_content[i]) result.append(diff_content[i + 1]) if copied_from is not None: result.append(from_line.replace( to_file.encode(_fs_encoding), copied_from.encode(_fs_encoding))) else: result.append(from_line) result.append(to_line) result.append(diff_content[i + 4]) i += 5 else: result.append(diff_content[i]) i += 1 return result def _handle_empty_files(self, diff_content, diff_cmd, revisions): """Handle added and deleted 0-length files in the diff output. Since the diff output from :command:`svn diff` does not give enough context for 0-length files, we add extra information to the patch. For example, the original diff output of an added 0-length file is:: Index: foo\\n ===================================================================\\n The modified diff of an added 0-length file will be:: Index: foo\\t(added)\\n ===================================================================\\n --- foo\\t(<base_revision>)\\n +++ foo\\t(<tip_revision>)\\n Args: diff_content (list of bytes): The content of the diff, split into lines. diff_cmd (list of unicode): A partial command line to run :command:`svn diff`. revisions (dict): A dictionary of revisions, as returned by :py:meth:`parse_revision_spec`. Returns: list of bytes: The processed diff lines. """ # Get a list of all deleted files in this diff so we can differentiate # between added empty files and deleted empty files. diff_cmd.append('--no-diff-deleted') diff_with_deleted = self._run_svn(diff_cmd, ignore_errors=True, none_on_ignored_error=True, results_unicode=False) if not diff_with_deleted: return diff_content deleted_files = re.findall(br'^Index:\s+(\S+)\s+\(deleted\)$', diff_with_deleted, re.M) result = [] i = 0 num_lines = len(diff_content) while i < num_lines: line = diff_content[i] if (line.startswith(b'Index: ') and (i + 2 == num_lines or (i + 2 < num_lines and diff_content[i + 2].startswith(b'Index: ')))): # An empty file. Get and add the extra diff information. index_line = line.strip() filename = index_line.split(b' ', 1)[1].strip() if filename in deleted_files: # Deleted empty file. result.append(b'%s\t(deleted)\n' % index_line) if not revisions['base'] and not revisions['tip']: tip = '(working copy)' info = self.svn_info(filename, ignore_errors=True) if info and 'Revision' in info: base = '(revision %s)' % info['Revision'] else: continue else: base = revisions['base'] tip = revisions['tip'] else: # Added empty file. result.append(b'%s\t(added)\n' % index_line) if not revisions['base'] and not revisions['tip']: base = tip = '(revision 0)' else: base = revisions['base'] tip = revisions['tip'] result.append(b'%s\n' % self.INDEX_SEP) result.append(b'--- %s\t%s\n' % (filename.encode(_fs_encoding), base.encode('utf-8'))) result.append(b'+++ %s\t%s\n' % (filename.encode(_fs_encoding), tip.encode('utf-8'))) # Skip the next line (the index separator) since we've already # copied it. i += 2 else: result.append(line) i += 1 return result def convert_to_absolute_paths(self, diff_content, repository_info): """Convert relative paths in a diff output to absolute paths. This handles paths that have been svn switched to other parts of the repository. Args: diff_content (bytes): The content of the diff. repository_info (SVNRepositoryInfo): The repository info. Returns: bytes: The processed diff. """ result = [] for line in diff_content: front = None orig_line = line if (self.DIFF_NEW_FILE_LINE_RE.match(line) or self.DIFF_ORIG_FILE_LINE_RE.match(line) or line.startswith(b'Index: ')): front, line = line.split(b' ', 1) if front: if line.startswith(b'/'): # Already absolute line = front + b' ' + line else: # Filename and rest of line (usually the revision # component) file, rest = self.parse_filename_header(line) # If working with a diff generated outside of a working # copy, then file paths are already absolute, so just # add initial slash. if self.options.repository_url: path = unquote( posixpath.join(repository_info.base_path, file)) else: info = self.svn_info(file, True) if info is None: result.append(orig_line) continue url = info['URL'] root = info['Repository Root'] path = unquote(url[len(root):]) line = b'%s %s%s' % (front, path.encode(_fs_encoding), rest) result.append(line) return result def svn_info(self, path, ignore_errors=False): """Return a dict which is the result of 'svn info' at a given path. Args: path (unicode): The path to the file being accessed. ignore_errors (bool, optional): Whether to ignore errors returned by ``svn info``. Returns: dict: The parsed ``svn info`` output. """ # SVN's internal path recognizers think that any file path that # includes an '@' character will be path@rev, and skips everything that # comes after the '@'. This makes it hard to do operations on files # which include '@' in the name (such as image@2x.png). if '@' in path and not path[-1] == '@': path += '@' if path not in self._svn_info_cache: result = self._run_svn(['info', path], split_lines=True, ignore_errors=ignore_errors, none_on_ignored_error=True) if result is None: self._svn_info_cache[path] = None else: svninfo = {} for info in result: parts = info.strip().split(': ', 1) if len(parts) == 2: key, value = parts svninfo[key] = value self._svn_info_cache[path] = svninfo return self._svn_info_cache[path] def parse_filename_header(self, diff_line): """Parse the filename header from a diff. Args: diff_line (bytes): The line of the diff being parsed. Returns: tuple of (unicode, bytes): The parsed header line. The filename will be decoded using the system filesystem encoding. """ parts = None if b'\t' in diff_line: # There's a \t separating the filename and info. This is the # best case scenario, since it allows for filenames with spaces # without much work. The info can also contain tabs after the # initial one; ignore those when splitting the string. parts = diff_line.split(b'\t', 1) if b' ' in diff_line: # There are spaces being used to separate the filename and info. # This is technically wrong, so all we can do is assume that # 1) the filename won't have multiple consecutive spaces, and # 2) there are at least 2 spaces separating the filename and info. parts = re.split(b' +', diff_line, 1) if parts: return (parts[0].decode(_fs_encoding), b'\t' + parts[1]) # Strip off ending newline, and return it as the second component. return (diff_line.split(b'\n')[0].decode(_fs_encoding), b'\n') def _get_p_number(self, base_path, base_dir): """Return the argument for --strip in svn patch. This determines the number of path components to remove from file paths in the diff to be applied. """ if base_path == '/': # We always need to strip off the leading forward slash. return 1 else: # We strip all leading directories from base_path. The last # directory will not be suffixed with a slash. return base_path.count('/') + 1 def _exclude_files_not_in_tree(self, patch_file, base_path): """Process a diff and remove entries not in the current directory. Args: patch_file (unicode): The filename of the patch file to process. This file will be overwritten by the processed patch. base_path (unicode): The relative path between the root of the repository and the directory that the patch was created in. Returns: tuple of bool: A tuple with two values, representing whether any files have been excluded and whether the resulting diff is empty. """ excluded_files = False empty_patch = True # If our base path does not have a trailing slash (which it won't # unless we are at a checkout root), we append a slash so that we can # determine if files are under the base_path. We do this so that files # like /trunkish (which begins with /trunk) do not mistakenly get # placed in /trunk if that is the base_path. if not base_path.endswith('/'): base_path += '/' filtered_patch_name = make_tempfile() with open(filtered_patch_name, 'w') as filtered_patch: with open(patch_file, 'r') as original_patch: include_file = True for line in original_patch.readlines(): m = self.INDEX_FILE_RE.match(line) if m: filename = m.group(1).decode('utf-8') include_file = filename.startswith(base_path) if not include_file: excluded_files = True else: empty_patch = False if include_file: filtered_patch.write(line) os.rename(filtered_patch_name, patch_file) return (excluded_files, empty_patch) def apply_patch(self, patch_file, base_path, base_dir, p=None, revert=False): """Apply the patch and return a PatchResult indicating its success. Args: patch_file (unicode): The name of the patch file to apply. base_path (unicode): The base path that the diff was generated in. base_dir (unicode): The path of the current working directory relative to the root of the repository. p (unicode, optional): The prefix level of the diff. revert (bool, optional): Whether the patch should be reverted rather than applied. Returns: rbtools.clients.PatchResult: The result of the patch operation. """ if not is_valid_version(self.subversion_client_version, self.PATCH_MIN_VERSION): raise MinimumVersionError( 'Using "rbt patch" with the SVN backend requires at least ' 'svn 1.7.0') if base_dir and not base_dir.startswith(base_path): # The patch was created in either a higher level directory or a # directory not under this one. We should exclude files from the # patch that are not under this directory. excluded, empty = self._exclude_files_not_in_tree(patch_file, base_path) if excluded: logging.warn('This patch was generated in a different ' 'directory. To prevent conflicts, all files ' 'not under the current directory have been ' 'excluded. To apply all files in this ' 'patch, apply this patch from the %s directory.', base_dir) if empty: logging.warn('All files were excluded from the patch.') cmd = ['patch'] p_num = p or self._get_p_number(base_path, base_dir) if p_num >= 0: cmd.append('--strip=%s' % p_num) if revert: cmd.append('--reverse-diff') cmd.append(six.text_type(patch_file)) rc, patch_output = self._run_svn(cmd, return_error_code=True) if self.supports_empty_files(): try: with open(patch_file, 'rb') as f: patch = f.read() except IOError as e: logging.error('Unable to read file %s: %s', patch_file, e) return self.apply_patch_for_empty_files(patch, p_num, revert=revert) # TODO: What is svn's equivalent of a garbage patch message? return PatchResult(applied=(rc == 0), patch_output=patch_output) def apply_patch_for_empty_files(self, patch, p_num, revert=False): """Return whether any empty files in the patch are applied. Args: patch (bytes): The contents of the patch. p_num (unicode): The prefix level of the diff. revert (bool, optional): Whether the patch should be reverted rather than applied. Returns: ``True`` if there are empty files in the patch. ``False`` if there were no empty files, or if an error occurred while applying the patch. """ patched_empty_files = False if revert: added_files = self.DELETED_FILES_RE.findall(patch) deleted_files = self.ADDED_FILES_RE.findall(patch) else: added_files = self.ADDED_FILES_RE.findall(patch) deleted_files = self.DELETED_FILES_RE.findall(patch) if added_files: added_files = self._strip_p_num_slashes(added_files, int(p_num)) make_empty_files(added_files) # We require --force here because svn will complain if we run # `svn add` on a file that has already been added or deleted. result = self._run_svn(['add', '--force'] + added_files, ignore_errors=True, none_on_ignored_error=True) if result is None: logging.error('Unable to execute "svn add" on: %s', ', '.join(added_files)) else: patched_empty_files = True if deleted_files: deleted_files = self._strip_p_num_slashes(deleted_files, int(p_num)) # We require --force here because svn will complain if we run # `svn delete` on a file that has already been added or deleted. result = self._run_svn(['delete', '--force'] + deleted_files, ignore_errors=True, none_on_ignored_error=True) if result is None: logging.error('Unable to execute "svn delete" on: %s', ', '.join(deleted_files)) else: patched_empty_files = True return patched_empty_files def supports_empty_files(self): """Check if the server supports added/deleted empty files. Returns: bool: Whether the Review Board server supports empty added or deleted files. """ return (self.capabilities and self.capabilities.has_capability('scmtools', 'svn', 'empty_files')) def _run_svn(self, svn_args, *args, **kwargs): """Run the ``svn`` command. Args: svn_args (list of unicode): A list of additional arguments to add to the SVN command line. *args (list): Additional positional arguments to pass through to :py:func:`rbtools.utils.process.execute`. **kwargs (dict): Additional keyword arguments to pass through to :py:func:`rbtools.utils.process.execute`. Returns: tuple: The value returned by :py:func:`rbtools.utils.process.execute`. """ cmdline = ['svn', '--non-interactive'] + svn_args if getattr(self.options, 'svn_username', None): cmdline += ['--username', self.options.svn_username] if getattr(self.options, 'svn_prompt_password', None): self.options.svn_prompt_password = False self.options.svn_password = get_pass('SVN Password: ') if getattr(self.options, 'svn_password', None): cmdline += ['--password', self.options.svn_password] return execute(cmdline, *args, **kwargs) def svn_log_xml(self, svn_args, *args, **kwargs): """Run SVN log non-interactively and retrieve XML output. We cannot run SVN log interactively and retrieve XML output because the authentication prompts will be intermixed with the XML output and cause XML parsing to fail. This function returns None (as if ``none_on_ignored_error`` were ``True``) if an error occurs that is not an authentication error. Args: svn_args (list of unicode): A list of additional arguments to add to the SVN command line. *args (list): Additional positional arguments to pass through to :py:func:`rbtools.utils.process.execute`. **kwargs (dict): Additional keyword arguments to pass through to :py:func:`rbtools.utils.process.execute`. Returns: bytes: The resulting log output. Raises: rbtools.clients.errors.AuthenticationError: Authentication to the remote repository failed. """ command = ['log', '--xml'] + svn_args rc, result, errors = self._run_svn(command, *args, return_error_code=True, with_errors=False, return_errors=True, ignore_errors=True, results_unicode=False, **kwargs) if rc: # SVN Error E215004: --non-interactive was passed but the remote # repository requires authentication. if errors.startswith(b'svn: E215004'): raise AuthenticationError( 'Could not authenticate against remote SVN repository. ' 'Please provide the --svn-username and either the ' '--svn-password or --svn-prompt-password command line ' 'options.') return None return result def check_options(self): """Verify the command line options. Raises: rbtools.clients.errors.OptionsCheckError: The supplied command line options were incorrect. In particular, if a file has history scheduled with the commit, the user needs to explicitly choose what behavior they want. """ if getattr(self.options, 'svn_show_copies_as_adds', None): if (len(self.options.svn_show_copies_as_adds) > 1 or self.options.svn_show_copies_as_adds not in 'YyNn'): raise OptionsCheckError( 'Invalid value \'%s\' for --svn-show-copies-as-adds ' 'option. Valid values are \'y\' or \'n\'.' % self.options.svn_show_copies_as_adds) class SVNRepositoryInfo(RepositoryInfo): """Information on a Subversion repository. This stores information on the path and, optionally, UUID of a Subversion repository. It can match a local repository against those on a Review Board server. Attributes: repository_id (int): ID of the repository in the API. This is used primarily for testing purposes, and is not guaranteed to be set. uuid (unicode): UUID of the Subversion repository. """ def __init__(self, path=None, base_path=None, uuid=None, local_path=None, supports_parent_diffs=False, repository_id=None): """Initialize the repository information. Args: path (unicode): Subversion checkout path. base_path (unicode): Root of the Subversion repository. local_path (unicode): The local filesystem path for the repository. This can sometimes be the same as ``path``, but may not be (since that can contain a remote repository path). uuid (unicode): UUID of the Subversion repository. supports_parent_diffs (bool, optional): Whether or not the repository supports parent diffs. repository_id (int, optional): ID of the repository in the API. This is used primarily for testing purposes, and is not guaranteed to be set. """ super(SVNRepositoryInfo, self).__init__( path=path, base_path=base_path, local_path=local_path, supports_parent_diffs=supports_parent_diffs) self.uuid = uuid self.repository_id = repository_id def find_server_repository_info(self, server): """Return server-side information on the current Subversion repository. The point of this function is to find a repository on the server that matches self, even if the paths aren't the same. (For example, if self uses an 'http' path, but the server uses a 'file' path for the same repository.) It does this by comparing repository UUIDs. If the repositories use the same path, you'll get back self, otherwise you'll get a different SVNRepositoryInfo object (with a different path). Args: server (rbtools.api.resource.RootResource): The root resource for the Review Board server. Returns: SVNRepositoryInfo: The server-side information for this repository. """ # Since all_items is a generator, and we need to process the list of # repositories twice, we're going to keep a cached list of repositories # that we'll add to as we iterate through the first time. That way, # we can iterate through a second time, without performing another # call to the server. # # Hopefully we'll match a repository in the first (less expensive) loop # and won't need it. # # Note also that we're not fetching all pages up-front, as that could # lead to a lot of unnecessary API requests if the repository in # question is found before the last page of results in the first for # loop. repositories = server.get_repositories(tool='Subversion').all_items cached_repos = [] # Do two paths. The first will be to try to find a matching entry # by path/mirror path. If we don't find anything, then the second will # be to find a matching UUID. for repository in repositories: if (self.path == repository['path'] or ('mirror_path' in repository and self.path == repository['mirror_path'])): self.repository_id = repository.id return self cached_repos.append(repository) # We didn't find our locally matched repository, so scan based on UUID. for repository in cached_repos: try: info = repository.get_info() if not info or self.uuid != info['uuid']: continue except APIError: continue repos_base_path = info['url'][len(info['root_url']):] relpath = self._get_relative_path(self.base_path, repos_base_path) if relpath: return SVNRepositoryInfo( path=info['url'], base_path=relpath, local_path=self.local_path, uuid=self.uuid, repository_id=repository.id) # We didn't find a matching repository on the server. We'll just return # self and hope for the best. In reality, we'll likely fail, but we # did all we could really do. return self def _get_repository_info(self, server, repository): try: return server.get_repository_info(repository['id']) except APIError as e: # If the server couldn't fetch the repository info, it will return # code 210. Ignore those. # Other more serious errors should still be raised, though. if e.error_code == 210: return None raise e def _get_relative_path(self, path, root): pathdirs = self._split_on_slash(path) rootdirs = self._split_on_slash(root) # root is empty, so anything relative to that is itself if len(rootdirs) == 0: return path # If one of the directories doesn't match, then path is not relative # to root. if rootdirs != pathdirs[:len(rootdirs)]: return None # All the directories matched, so the relative path is whatever # directories are left over. The base_path can't be empty, though, so # if the paths are the same, return '/' if len(pathdirs) == len(rootdirs): return '/' else: return '/' + '/'.join(pathdirs[len(rootdirs):]) def _split_on_slash(self, path): # Split on slashes, but ignore multiple slashes and throw away any # trailing slashes. split = re.split('/+', path) if split[-1] == '': split = split[:-1] return split