EVOLUTION-MANAGER
Edit File: __init__.py
from radicale.storage import BaseCollection from contextlib import contextmanager import base64 import hashlib import hmac import posixpath import threading from datetime import datetime, date, time import time import os import sys import platform if platform.system() == "Linux": sys.path.insert(0,'/usr/lib/python3/site-packages') import requests from urllib import request from radicale.storage import ( BaseCollection, md5, ITEM_CACHE_VERSION, left_encode_int, sanitize_path, Item, ComponentNotFoundError, get_etag, UnsafePathError, groupby, get_uid, os, FileBackedRwLock, path_to_filesystem, json, check_and_sanitize_props, scandir, is_safe_filesystem_path_component, pickle, chain, NamedTemporaryFile, subprocess, shlex, get_uid_from_object, xmlutils, binascii, TemporaryDirectory, contextlib, random_uuid4 ) if os.name == "posix": from radicale.storage import fcntl import vobject class Collection(BaseCollection): """Collection stored in several files per calendar.""" @classmethod def static_init(cls): # init storage lock folder = os.path.expanduser(cls.configuration.get( "storage", "filesystem_folder")) cls._makedirs_synced(folder) lock_path = None if cls.configuration.getboolean("storage", "filesystem_locking"): lock_path = os.path.join(folder, ".Radicale.lock") close_lock_file = cls.configuration.getboolean( "storage", "filesystem_close_lock_file") cls._lock = FileBackedRwLock(lock_path, close_lock_file) # init cache lock cls._cache_locks = {} cls._cache_locks_lock = threading.Lock() def create_auth_token(self, pkey, machine_key): machine_key = bytes(machine_key, 'UTF-8') now = datetime.strftime(datetime.now(), "%Y%m%d%H%M%S") message = bytes('{0}\n{1}'.format(now, pkey), 'UTF-8') _hmac = hmac.new(machine_key, message, hashlib.sha1) signature = str(base64.urlsafe_b64encode(_hmac.digest()), 'UTF-8') signature = signature.replace('-', '+') signature = signature.replace('_', '/') token = 'ASC {0}:{1}:{2}'.format(pkey, now, signature) self.logger.info('Auth token: %r', token) return token def __init__(self, path, principal=None, folder=None, filesystem_path=None): # DEPRECATED: Remove principal and folder attributes if folder is None: folder = self._get_collection_root_folder() # Path should already be sanitized self.path = sanitize_path(path).strip("/") self._encoding = self.configuration.get("encoding", "stock") # DEPRECATED: Use ``self._encoding`` instead self.encoding = self._encoding if filesystem_path is None: filesystem_path = path_to_filesystem(folder, self.path) self._filesystem_path = filesystem_path self._props_path = os.path.join( self._filesystem_path, ".Radicale.props") self._meta_cache = None self._etag_cache = None self._item_cache_cleaned = False @classmethod def _get_collection_root_folder(cls): filesystem_folder = os.path.expanduser( cls.configuration.get("storage", "filesystem_folder")) return os.path.join(filesystem_folder, "collection-root") @contextmanager def _atomic_write(self, path, mode="w", newline=None, sync_directory=True): directory = os.path.dirname(path) tmp = NamedTemporaryFile( mode=mode, dir=directory, delete=False, prefix=".Radicale.tmp-", newline=newline, encoding=None if "b" in mode else self._encoding) try: yield tmp tmp.flush() try: self._fsync(tmp.fileno()) except OSError as e: raise RuntimeError("Fsync'ing file %r failed: %s" % (path, e)) from e tmp.close() os.replace(tmp.name, path) except BaseException: tmp.close() os.remove(tmp.name) raise if sync_directory: self._sync_directory(directory) @staticmethod def _find_available_file_name(exists_fn, suffix=""): # Prevent infinite loop for _ in range(1000): file_name = random_uuid4() + suffix if not exists_fn(file_name): return file_name # something is wrong with the PRNG raise RuntimeError("No unique random sequence found") @classmethod def _fsync(cls, fd): if cls.configuration.getboolean("storage", "filesystem_fsync"): if os.name == "posix" and hasattr(fcntl, "F_FULLFSYNC"): fcntl.fcntl(fd, fcntl.F_FULLFSYNC) else: os.fsync(fd) @classmethod def _sync_directory(cls, path): """Sync directory to disk. This only works on POSIX and does nothing on other systems. """ if not cls.configuration.getboolean("storage", "filesystem_fsync"): return if os.name == "posix": try: fd = os.open(path, 0) try: cls._fsync(fd) finally: os.close(fd) except OSError as e: raise RuntimeError("Fsync'ing directory %r failed: %s" % (path, e)) from e @classmethod def _makedirs_synced(cls, filesystem_path): """Recursively create a directory and its parents in a sync'ed way. This method acts silently when the folder already exists. """ if os.path.isdir(filesystem_path): return parent_filesystem_path = os.path.dirname(filesystem_path) # Prevent infinite loop if filesystem_path != parent_filesystem_path: # Create parent dirs recursively cls._makedirs_synced(parent_filesystem_path) # Possible race! os.makedirs(filesystem_path, exist_ok=True) cls._sync_directory(parent_filesystem_path) @classmethod def discover(cls, path, depth="0", child_context_manager=( lambda path, href=None: contextlib.ExitStack())): # Path should already be sanitized sane_path = sanitize_path(path).strip("/") attributes = sane_path.split("/") if sane_path else [] folder = cls._get_collection_root_folder() # Create the root collection cls._makedirs_synced(folder) try: filesystem_path = path_to_filesystem(folder, sane_path) except ValueError as e: # Path is unsafe cls.logger.debug("Unsafe path %r requested from storage: %s", sane_path, e, exc_info=True) return # Check if the path exists and if it leads to a collection or an item if not os.path.isdir(filesystem_path): if attributes and os.path.isfile(filesystem_path): href = attributes.pop() else: return else: href = None sane_path = "/".join(attributes) collection = cls(sane_path) if href: yield collection.get(href) return yield collection if depth == "0": return for href in collection.list(): with child_context_manager(sane_path, href): yield collection.get(href) for href in scandir(filesystem_path, only_dirs=True): if not is_safe_filesystem_path_component(href): if not href.startswith(".Radicale"): cls.logger.debug("Skipping collection %r in %r", href, sane_path) continue child_path = posixpath.join(sane_path, href) with child_context_manager(child_path): yield cls(child_path) @classmethod def verify(cls): item_errors = collection_errors = 0 @contextlib.contextmanager def exception_cm(path, href=None): nonlocal item_errors, collection_errors try: yield except Exception as e: if href: item_errors += 1 name = "item %r in %r" % (href, path.strip("/")) else: collection_errors += 1 name = "collection %r" % path.strip("/") cls.logger.error("Invalid %s: %s", name, e, exc_info=True) remaining_paths = [""] while remaining_paths: path = remaining_paths.pop(0) cls.logger.debug("Verifying collection %r", path) with exception_cm(path): saved_item_errors = item_errors collection = None for item in cls.discover(path, "1", exception_cm): if not collection: collection = item collection.get_meta() continue if isinstance(item, BaseCollection): remaining_paths.append(item.path) else: cls.logger.debug("Verified item %r in %r", item.href, path) if item_errors == saved_item_errors: collection.sync() return item_errors == 0 and collection_errors == 0 @classmethod def create_collection(cls, href, collection=None, props=None): folder = cls._get_collection_root_folder() # Path should already be sanitized sane_path = sanitize_path(href).strip("/") filesystem_path = path_to_filesystem(folder, sane_path) if not props: cls._makedirs_synced(filesystem_path) return cls(sane_path) parent_dir = os.path.dirname(filesystem_path) cls._makedirs_synced(parent_dir) # Create a temporary directory with an unsafe name with TemporaryDirectory( prefix=".Radicale.tmp-", dir=parent_dir) as tmp_dir: # The temporary directory itself can't be renamed tmp_filesystem_path = os.path.join(tmp_dir, "collection") os.makedirs(tmp_filesystem_path) self = cls(sane_path, filesystem_path=tmp_filesystem_path) self.set_meta_all(props) if collection: if props.get("tag") == "VCALENDAR": collection, = collection items = [] for content in ("vevent", "vtodo", "vjournal"): items.extend( getattr(collection, "%s_list" % content, [])) items_by_uid = groupby(sorted(items, key=get_uid), get_uid) vobject_items = {} for uid, items in items_by_uid: new_collection = vobject.iCalendar() for item in items: new_collection.add(item) # href must comply to is_safe_filesystem_path_component # and no file name collisions must exist between hrefs href = self._find_available_file_name( vobject_items.get, suffix=".ics") vobject_items[href] = new_collection self._upload_all_nonatomic(vobject_items) elif props.get("tag") == "VADDRESSBOOK": vobject_items = {} for card in collection: # href must comply to is_safe_filesystem_path_component # and no file name collisions must exist between hrefs href = self._find_available_file_name( vobject_items.get, suffix=".vcf") vobject_items[href] = card self._upload_all_nonatomic(vobject_items) # This operation is not atomic on the filesystem level but it's # very unlikely that one rename operations succeeds while the # other fails or that only one gets written to disk. if os.path.exists(filesystem_path): os.rename(filesystem_path, os.path.join(tmp_dir, "delete")) os.rename(tmp_filesystem_path, filesystem_path) cls._sync_directory(parent_dir) return cls(sane_path) def upload_all_nonatomic(self, vobject_items): """DEPRECATED: Use ``_upload_all_nonatomic``""" return self._upload_all_nonatomic(vobject_items) def _upload_all_nonatomic(self, vobject_items): """Upload a new set of items. This takes a mapping of href and vobject items and uploads them nonatomic and without existence checks. """ cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "item") self._makedirs_synced(cache_folder) for href, vobject_item in vobject_items.items(): if not is_safe_filesystem_path_component(href): raise UnsafePathError(href) try: cache_content = self._item_cache_content(href, vobject_item) _, _, _, text, _, _, _, _ = cache_content except Exception as e: raise ValueError( "Failed to store item %r in temporary collection %r: %s" % (href, self.path, e)) from e with self._atomic_write(os.path.join(cache_folder, href), "wb", sync_directory=False) as f: pickle.dump(cache_content, f) path = path_to_filesystem(self._filesystem_path, href) with self._atomic_write( path, newline="", sync_directory=False) as f: f.write(text) self._sync_directory(cache_folder) self._sync_directory(self._filesystem_path) @classmethod def move(cls, item, to_collection, to_href): if not is_safe_filesystem_path_component(to_href): raise UnsafePathError(to_href) os.replace( path_to_filesystem(item.collection._filesystem_path, item.href), path_to_filesystem(to_collection._filesystem_path, to_href)) cls._sync_directory(to_collection._filesystem_path) if item.collection._filesystem_path != to_collection._filesystem_path: cls._sync_directory(item.collection._filesystem_path) # Move the item cache entry cache_folder = os.path.join(item.collection._filesystem_path, ".Radicale.cache", "item") to_cache_folder = os.path.join(to_collection._filesystem_path, ".Radicale.cache", "item") cls._makedirs_synced(to_cache_folder) try: os.replace(os.path.join(cache_folder, item.href), os.path.join(to_cache_folder, to_href)) except FileNotFoundError: pass else: cls._makedirs_synced(to_cache_folder) if cache_folder != to_cache_folder: cls._makedirs_synced(cache_folder) # Track the change to_collection._update_history_etag(to_href, item) item.collection._update_history_etag(item.href, None) to_collection._clean_history_cache() if item.collection._filesystem_path != to_collection._filesystem_path: item.collection._clean_history_cache() @classmethod def _clean_cache(cls, folder, names, max_age=None): """Delete all ``names`` in ``folder`` that are older than ``max_age``. """ age_limit = time.time() - max_age if max_age is not None else None modified = False for name in names: if not is_safe_filesystem_path_component(name): continue if age_limit is not None: try: # Race: Another process might have deleted the file. mtime = os.path.getmtime(os.path.join(folder, name)) except FileNotFoundError: continue if mtime > age_limit: continue cls.logger.debug("Found expired item in cache: %r", name) # Race: Another process might have deleted or locked the # file. try: os.remove(os.path.join(folder, name)) except (FileNotFoundError, PermissionError): continue modified = True if modified: cls._sync_directory(folder) def _update_history_etag(self, href, item): """Updates and retrieves the history etag from the history cache. The history cache contains a file for each current and deleted item of the collection. These files contain the etag of the item (empty string for deleted items) and a history etag, which is a hash over the previous history etag and the etag separated by "/". """ history_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "history") try: with open(os.path.join(history_folder, href), "rb") as f: cache_etag, history_etag = pickle.load(f) except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e: if isinstance(e, (pickle.UnpicklingError, ValueError)): self.logger.warning( "Failed to load history cache entry %r in %r: %s", href, self.path, e, exc_info=True) cache_etag = "" # Initialize with random data to prevent collisions with cleaned # expired items. history_etag = binascii.hexlify(os.urandom(16)).decode("ascii") etag = item.etag if item else "" if etag != cache_etag: self._makedirs_synced(history_folder) history_etag = get_etag(history_etag + "/" + etag).strip("\"") try: # Race: Other processes might have created and locked the file. with self._atomic_write(os.path.join(history_folder, href), "wb") as f: pickle.dump([etag, history_etag], f) except PermissionError: pass return history_etag def _get_deleted_history_hrefs(self): """Returns the hrefs of all deleted items that are still in the history cache.""" history_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "history") try: for href in scandir(history_folder): if not is_safe_filesystem_path_component(href): continue if os.path.isfile(os.path.join(self._filesystem_path, href)): continue yield href except FileNotFoundError: pass def _clean_history_cache(self): # Delete all expired cache entries of deleted items. history_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "history") self._clean_cache(history_folder, self._get_deleted_history_hrefs(), max_age=self.configuration.getint( "storage", "max_sync_token_age")) def sync(self, old_token=None): # The sync token has the form http://radicale.org/ns/sync/TOKEN_NAME # where TOKEN_NAME is the md5 hash of all history etags of present and # past items of the collection. def check_token_name(token_name): if len(token_name) != 32: return False for c in token_name: if c not in "0123456789abcdef": return False return True old_token_name = None if old_token: # Extract the token name from the sync token if not old_token.startswith("http://radicale.org/ns/sync/"): raise ValueError("Malformed token: %r" % old_token) old_token_name = old_token[len("http://radicale.org/ns/sync/"):] if not check_token_name(old_token_name): raise ValueError("Malformed token: %r" % old_token) # Get the current state and sync-token of the collection. state = {} token_name_hash = md5() # Find the history of all existing and deleted items for href, item in chain( ((item.href, item) for item in self.get_all()), ((href, None) for href in self._get_deleted_history_hrefs())): history_etag = self._update_history_etag(href, item) state[href] = history_etag token_name_hash.update((href + "/" + history_etag).encode("utf-8")) token_name = token_name_hash.hexdigest() token = "http://radicale.org/ns/sync/%s" % token_name if token_name == old_token_name: # Nothing changed return token, () token_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "sync-token") token_path = os.path.join(token_folder, token_name) old_state = {} if old_token_name: # load the old token state old_token_path = os.path.join(token_folder, old_token_name) try: # Race: Another process might have deleted the file. with open(old_token_path, "rb") as f: old_state = pickle.load(f) except (FileNotFoundError, pickle.UnpicklingError, ValueError) as e: if isinstance(e, (pickle.UnpicklingError, ValueError)): self.logger.warning( "Failed to load stored sync token %r in %r: %s", old_token_name, self.path, e, exc_info=True) # Delete the damaged file try: os.remove(old_token_path) except (FileNotFoundError, PermissionError): pass raise ValueError("Token not found: %r" % old_token) # write the new token state or update the modification time of # existing token state if not os.path.exists(token_path): self._makedirs_synced(token_folder) try: # Race: Other processes might have created and locked the file. with self._atomic_write(token_path, "wb") as f: pickle.dump(state, f) except PermissionError: pass else: # clean up old sync tokens and item cache self._clean_cache(token_folder, os.listdir(token_folder), max_age=self.configuration.getint( "storage", "max_sync_token_age")) self._clean_history_cache() else: # Try to update the modification time try: # Race: Another process might have deleted the file. os.utime(token_path) except FileNotFoundError: pass changes = [] # Find all new, changed and deleted (that are still in the item cache) # items for href, history_etag in state.items(): if history_etag != old_state.get(href): changes.append(href) # Find all deleted items that are no longer in the item cache for href, history_etag in old_state.items(): if href not in state: changes.append(href) return token, changes def list(self): for href in scandir(self._filesystem_path, only_files=True): if not is_safe_filesystem_path_component(href): if not href.startswith(".Radicale"): self.logger.debug( "Skipping item %r in %r", href, self.path) continue yield href def get(self, href, verify_href=True): item, metadata = self._get_with_metadata(href, verify_href=verify_href) return item def _item_cache_hash(self, raw_text): _hash = md5() _hash.update(left_encode_int(ITEM_CACHE_VERSION)) _hash.update(raw_text) return _hash.hexdigest() def _item_cache_content(self, href, vobject_item, cache_hash=None): text = vobject_item.serialize() if cache_hash is None: cache_hash = self._item_cache_hash(text.encode(self._encoding)) etag = get_etag(text) uid = get_uid_from_object(vobject_item) name = vobject_item.name tag, start, end = xmlutils.find_tag_and_time_range(vobject_item) return cache_hash, uid, etag, text, name, tag, start, end def _store_item_cache(self, href, vobject_item, cache_hash=None): cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "item") content = self._item_cache_content(href, vobject_item, cache_hash) self._makedirs_synced(cache_folder) try: # Race: Other processes might have created and locked the # file. with self._atomic_write(os.path.join(cache_folder, href), "wb") as f: pickle.dump(content, f) except PermissionError: pass return content _cache_locks = {} _cache_locks_lock = threading.Lock() @contextmanager def _acquire_cache_lock(self, ns=""): with contextlib.ExitStack() as lock_stack: with contextlib.ExitStack() as locks_lock_stack: locks_lock_stack.enter_context(self._cache_locks_lock) lock_id = ns + "/" + self.path lock = self._cache_locks.get(lock_id) if not lock: cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache") self._makedirs_synced(cache_folder) lock_path = None if self.configuration.getboolean( "storage", "filesystem_locking"): lock_path = os.path.join( cache_folder, ".Radicale.lock" + (".%s" % ns if ns else "")) lock = FileBackedRwLock(lock_path) self._cache_locks[lock_id] = lock lock_stack.enter_context(lock.acquire_lock( "w", lambda: locks_lock_stack.pop_all().close())) try: yield finally: with self._cache_locks_lock: lock_stack.pop_all().close() if not lock.in_use(): del self._cache_locks[lock_id] def _load_item_cache(self, href, input_hash): cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "item") cache_hash = uid = etag = text = name = tag = start = end = None try: with open(os.path.join(cache_folder, href), "rb") as f: cache_hash, *content = pickle.load(f) if cache_hash == input_hash: uid, etag, text, name, tag, start, end = content except FileNotFoundError as e: pass except (pickle.UnpicklingError, ValueError) as e: self.logger.warning( "Failed to load item cache entry %r in %r: %s", href, self.path, e, exc_info=True) return cache_hash, uid, etag, text, name, tag, start, end def _clean_item_cache(self): cache_folder = os.path.join(self._filesystem_path, ".Radicale.cache", "item") self._clean_cache(cache_folder, ( href for href in scandir(cache_folder) if not os.path.isfile(os.path.join(self._filesystem_path, href)))) def _get_with_metadata(self, href, verify_href=True): """Like ``get`` but additonally returns the following metadata: tag, start, end: see ``xmlutils.find_tag_and_time_range``. If extraction of the metadata failed, the values are all ``None``.""" if verify_href: try: if not is_safe_filesystem_path_component(href): raise UnsafePathError(href) path = path_to_filesystem(self._filesystem_path, href) except ValueError as e: self.logger.debug( "Can't translate name %r safely to filesystem in %r: %s", href, self.path, e, exc_info=True) return None, None else: path = os.path.join(self._filesystem_path, href) try: with open(path, "rb") as f: raw_text = f.read() except (FileNotFoundError, IsADirectoryError): return None, None except PermissionError: # Windows raises ``PermissionError`` when ``path`` is a directory if (os.name == "nt" and os.path.isdir(path) and os.access(path, os.R_OK)): return None, None raise # The hash of the component in the file system. This is used to check, # if the entry in the cache is still valid. input_hash = self._item_cache_hash(raw_text) cache_hash, uid, etag, text, name, tag, start, end = \ self._load_item_cache(href, input_hash) vobject_item = None if input_hash != cache_hash: with contextlib.ExitStack() as lock_stack: # Lock the item cache to prevent multpile processes from # generating the same data in parallel. # This improves the performance for multiple requests. if self._lock.locked() == "r": lock_stack.enter_context(self._acquire_cache_lock("item")) # Check if another process created the file in the meantime cache_hash, uid, etag, text, name, tag, start, end = \ self._load_item_cache(href, input_hash) if input_hash != cache_hash: try: vobject_items = tuple(vobject.readComponents( raw_text.decode(self._encoding))) if len(vobject_items) != 1: raise RuntimeError("Content contains %d components" % len(vobject_items)) vobject_item = vobject_items[0] check_and_sanitize_item(vobject_item, uid=uid, tag=self.get_meta("tag")) cache_hash, uid, etag, text, name, tag, start, end = \ self._store_item_cache( href, vobject_item, input_hash) except Exception as e: raise RuntimeError("Failed to load item %r in %r: %s" % (href, self.path, e)) from e # Clean cache entries once after the data in the file # system was edited externally. if not self._item_cache_cleaned: self._item_cache_cleaned = True self._clean_item_cache() last_modified = time.strftime( "%a, %d %b %Y %H:%M:%S GMT", time.gmtime(os.path.getmtime(path))) return Item( self, href=href, last_modified=last_modified, etag=etag, text=text, item=vobject_item, uid=uid, name=name, component_name=tag), (tag, start, end) def get_multi2(self, hrefs): # It's faster to check for file name collissions here, because # we only need to call os.listdir once. files = None for href in hrefs: if files is None: # List dir after hrefs returned one item, the iterator may be # empty and the for-loop is never executed. files = os.listdir(self._filesystem_path) path = os.path.join(self._filesystem_path, href) if (not is_safe_filesystem_path_component(href) or href not in files and os.path.lexists(path)): self.logger.debug( "Can't translate name safely to filesystem: %r", href) yield (href, None) else: yield (href, self.get(href, verify_href=False)) def get_all(self): # We don't need to check for collissions, because the the file names # are from os.listdir. return (self.get(href, verify_href=False) for href in self.list()) def get_all_filtered(self, filters): tag, start, end, simple = xmlutils.simplify_prefilters( filters, collection_tag=self.get_meta("tag")) if not tag: # no filter yield from ((item, simple) for item in self.get_all()) return for item, (itag, istart, iend) in ( self._get_with_metadata(href, verify_href=False) for href in self.list()): if tag == itag and istart < end and iend > start: yield item, simple and (start <= istart or iend <= end) def set_to_portals(self, path): portal_url = self.configuration.get("storage", "portal_url") machine_key = self.configuration.get("auth", "machine_key") auth_token = self.create_auth_token("radicale", machine_key) headers = {'Authorization': auth_token} url = portal_url+"/change_to_storage?change={}".format (path) resp = requests.get(url, headers=headers) def delete_event_portals(self, path): portal_url = self.configuration.get("storage", "portal_url") machine_key = self.configuration.get("auth", "machine_key") auth_token = self.create_auth_token("radicale", machine_key) headers = {'Authorization': auth_token} url = portal_url+"/caldav_delete_event?eventInfo={}".format (path) resp = requests.get(url, headers=headers) def upload(self, href, vobject_item): if not is_safe_filesystem_path_component(href): raise UnsafePathError(href) try: cache_hash, uid, etag, text, name, tag, _, _ = \ self._store_item_cache(href, vobject_item) except Exception as e: raise ValueError("Failed to store item %r in collection %r: %s" % (href, self.path, e)) from e path = path_to_filesystem(self._filesystem_path, href) with self._atomic_write(path, newline="") as fd: fd.write(text) # Clean the cache after the actual item is stored, or the cache entry # will be removed again. self._clean_item_cache() item = Item(self, href=href, etag=etag, text=text, item=vobject_item, uid=uid, name=name, component_name=tag) self.set_to_portals(self.path+"/"+href) # Track the change self._update_history_etag(href, item) self._clean_history_cache() return item def delete(self, href=None): if href != None: self.delete_event_portals(self.path+"/"+href) if href is None: # Delete the collection parent_dir = os.path.dirname(self._filesystem_path) try: os.rmdir(self._filesystem_path) except OSError: with TemporaryDirectory( prefix=".Radicale.tmp-", dir=parent_dir) as tmp: os.rename(self._filesystem_path, os.path.join( tmp, os.path.basename(self._filesystem_path))) self._sync_directory(parent_dir) else: self._sync_directory(parent_dir) else: # Delete an item if not is_safe_filesystem_path_component(href): raise UnsafePathError(href) path = path_to_filesystem(self._filesystem_path, href) if not os.path.isfile(path): raise ComponentNotFoundError(href) os.remove(path) self._sync_directory(os.path.dirname(path)) # Track the change self._update_history_etag(href, None) self._clean_history_cache() def get_meta(self, key=None): # reuse cached value if the storage is read-only if self._lock.locked() == "w" or self._meta_cache is None: try: try: with open(self._props_path, encoding=self._encoding) as f: self._meta_cache = json.load(f) except FileNotFoundError: self._meta_cache = {} check_and_sanitize_props(self._meta_cache) except ValueError as e: raise RuntimeError("Failed to load properties of collection " "%r: %s" % (self.path, e)) from e return self._meta_cache.get(key) if key else self._meta_cache def set_meta_all(self, props): with self._atomic_write(self._props_path, "w") as f: json.dump(props, f, sort_keys=True) @property def last_modified(self): relevant_files = chain( (self._filesystem_path,), (self._props_path,) if os.path.exists(self._props_path) else (), (os.path.join(self._filesystem_path, h) for h in self.list())) last = max(map(os.path.getmtime, relevant_files)) return time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(last)) @property def etag(self): # reuse cached value if the storage is read-only if self._lock.locked() == "w" or self._etag_cache is None: self._etag_cache = super().etag return self._etag_cache @classmethod @contextmanager def acquire_lock(cls, mode, user=None): with cls._lock.acquire_lock(mode): yield # execute hook hook = cls.configuration.get("storage", "hook") if mode == "w" and hook: folder = os.path.expanduser(cls.configuration.get( "storage", "filesystem_folder")) cls.logger.debug("Running hook") debug = cls.logger.isEnabledFor(logging.DEBUG) p = subprocess.Popen( hook % {"user": shlex.quote(user or "Anonymous")}, stdin=subprocess.DEVNULL, stdout=subprocess.PIPE if debug else subprocess.DEVNULL, stderr=subprocess.PIPE if debug else subprocess.DEVNULL, shell=True, universal_newlines=True, cwd=folder) stdout_data, stderr_data = p.communicate() if stdout_data: cls.logger.debug("Captured stdout hook:\n%s", stdout_data) if stderr_data: cls.logger.debug("Captured stderr hook:\n%s", stderr_data) if p.returncode != 0: raise subprocess.CalledProcessError(p.returncode, p.args)