EVOLUTION-MANAGER
Edit File: __init__.py
import base64 import hashlib import hmac from radicale.auth import BaseAuth import socket import re import json import base64 import sys import platform if platform.system() == "Linux": sys.path.insert(0,'/usr/lib/python3/site-packages') import requests from urllib import request from urllib.parse import urlsplit from datetime import datetime, date, time class Auth(BaseAuth): def create_auth_token(self, pkey, machine_key): machine_key = bytes(machine_key, 'UTF-8') now = datetime.strftime(datetime.utcnow(), "%Y%m%d%H%M%S") message = bytes('{0}\n{1}'.format(now, pkey), 'UTF-8') _hmac = hmac.new(machine_key, message, hashlib.sha1) signature = str(base64.urlsafe_b64encode(_hmac.digest()), 'UTF-8') signature = signature.replace('-', '+') signature = signature.replace('_', '/') token = 'ASC {0}:{1}:{2}'.format(pkey, now, signature) self.logger.info('Auth token: %r', token) return token def is_authenticated(self, user, password): portal_url = "" machine_key = self.configuration.get("auth", "machine_key") auth_token = self.create_auth_token("radicale", machine_key) if self.configuration.has_option("auth", "portal_url"): try: portal_url = self.configuration.get("auth", "portal_url") url = portal_url+"/is_caldav_authenticated" payload = {'User': user, 'Password': password} headers = {'Content-type': 'application/json', 'Authorization': auth_token} res = requests.post(url, data=json.dumps(payload), headers=headers) except: self.logger.error("Authenticated error. API system") res = False else: self.logger.error("Not found portal_url in config file") try: response = res.json() except: self.logger.error("Authenticated error. %r ", res.text) return False self.logger.info("Return auth result") if res.status_code != 200: self.logger.error("Error login response: %r", response) return False if 'error' in response: self.logger.error("Error login response: %r", response) return False else: if 'value' in response: if response['value'] != "true": self.logger.error("Error login response: %r", response) return False else: return True def read_raw_content(self, environ): content_length = int(environ.get("CONTENT_LENGTH") or 0) if not content_length: return b"" content = environ["wsgi.input"].read(content_length) if len(content) < content_length: raise RuntimeError("Request body too short: %d" % len(content)) return content def get_external_login(self, environ): rewrite_url = environ.get("HTTP_X_REWRITER_URL") rewrite_url = urlsplit(rewrite_url).hostname if urlsplit(rewrite_url).hostname is not None else urlsplit(rewrite_url).path self.logger.info("Rewrite url %r",rewrite_url) authorization = environ.get("HTTP_AUTHORIZATION") scheme = environ["wsgi.url_scheme"] or 'http' self.logger.info("Authorization. get_external_login: %r", authorization) if authorization is not None: if authorization.startswith("Basic"): authorization = authorization[len("Basic"):].strip() login, password = self.decode(base64.b64decode( authorization.encode("ascii")), environ).split(":", 1) email = login login = login+"@"+rewrite_url.lower() self.logger.info("Authorization") try: request_method = environ.get("REQUEST_METHOD") path_info = environ.get("PATH_INFO") if request_method == "PUT": if email != "admin@ascsystem": calendar = path_info.split('/')[-2] event = path_info.split('/')[-1] if calendar.endswith('-shared'): if event.endswith('_write.ics') == False: portal_url = self.configuration.get("auth", "portal_url") url = scheme + "://" + rewrite_url + "/api/2.0/calendar/outsideevent.json" self.logger.info("---url %r",url) content = self.decode(self.read_raw_content(environ), environ) payload = {'calendarGuid': calendar, 'eventGuid': event, 'ics': content} headers = {'Content-type': 'application/json'} res = requests.post(url, data=json.dumps(payload),auth=(email, password), headers=headers) except: self.logger.error("ERROR. Create event on shared calendar") return (login,password) self.logger.error("authorization is null") return () def is_authenticated2(self, login, user, password): return self.is_authenticated(user, password) def decode(self, text, environ): """Try to magically decode ``text`` according to given ``environ``.""" # List of charsets to try charsets = [] # First append content charset given in the request content_type = environ.get("CONTENT_TYPE") if content_type and "charset=" in content_type: charsets.append( content_type.split("charset=")[1].split(";")[0].strip()) # Then append default Radicale charset charsets.append(self.configuration.get("encoding", "request")) # Then append various fallbacks charsets.append("utf-8") charsets.append("iso8859-1") # Try to decode for charset in charsets: try: return text.decode(charset) except UnicodeDecodeError: pass raise UnicodeDecodeError