AlekSIS
AlekSIS

Source code for aleksis.core.util.core_helpers

import os
from datetime import datetime, timedelta
from importlib import import_module, metadata
from itertools import groupby
from operator import itemgetter
from typing import Any, Callable, Dict, Optional, Sequence, Union
from warnings import warn

from django.conf import settings
from django.core.exceptions import ImproperlyConfigured
from django.core.files import File
from django.db.models import Model, QuerySet
from django.http import HttpRequest
from django.shortcuts import get_object_or_404
from django.utils import timezone
from django.utils.crypto import get_random_string
from django.utils.functional import lazy
from django.utils.module_loading import import_string

from cache_memoize import cache_memoize





[docs]def dt_show_toolbar(request: HttpRequest) -> bool: """Add a helper to determin if Django debug toolbar should be displayed. Extends the default behaviour by enabling DJDT for superusers independent of source IP. """ from debug_toolbar.middleware import show_toolbar # noqa if not settings.DEBUG: return False if show_toolbar(request): return True elif hasattr(request, "user") and request.user.is_superuser: return True return False
[docs]def get_app_packages() -> Sequence[str]: """Find all registered apps from the setuptools entrypoint.""" return [f"{ep.module}.{ep.attr}" for ep in metadata.entry_points().get("aleksis.app", [])]
[docs]def merge_app_settings( setting: str, original: Union[dict, list], deduplicate: bool = False ) -> Union[dict, list]: """Merge app settings. Get a named settings constant from all apps and merge it into the original. To use this, add a settings.py file to the app, in the same format as Django's main settings.py. Note: Only selected names will be imported frm it to minimise impact of potentially malicious apps! """ for app in get_app_packages(): pkg = ".".join(app.split(".")[:-2]) mod_settings = None while "." in pkg: try: mod_settings = import_module(pkg + ".settings") except ImportError: # Import errors are non-fatal. pkg = ".".join(pkg.split(".")[:-1]) continue break if not mod_settings: # The app does not have settings continue app_setting = getattr(mod_settings, setting, None) if not app_setting: # The app might not have this setting or it might be empty. Ignore it in that case. continue for entry in app_setting: if entry in original: if not deduplicate: raise AttributeError(f"{entry} already set in original.") else: if isinstance(original, list): original.append(entry) elif isinstance(original, dict): original[entry] = app_setting[entry] else: raise TypeError("Only dict and list settings can be merged.")
[docs]def get_site_preferences(): """Get the preferences manager of the current site.""" from django.contrib.sites.models import Site # noqa return Site.objects.get_current().preferences
[docs]def lazy_preference(section: str, name: str) -> Callable[[str, str], Any]: """Lazily get a config value from dynamic preferences. Useful to bind preferences to other global settings to make them available to third-party apps that are not aware of dynamic preferences. """ def _get_preference(section: str, name: str) -> Any: return get_site_preferences()[f"{section}__{name}"] # The type is guessed from the default value to improve lazy()'s behaviour # FIXME Reintroduce the behaviour described above return lazy(_get_preference, str)(section, name)
[docs]def get_or_create_favicon(title: str, default: str, is_favicon: bool = False) -> "Favicon": """Ensure that there is always a favicon object.""" from favicon.models import Favicon # noqa if not os.path.exists(default): warn("staticfiles are not ready yet, not creating default icons") return elif os.path.isdir(default): raise ImproperlyConfigured(f"staticfiles are broken: unexpected directory at {default}") favicon, created = Favicon.on_site.get_or_create( title=title, defaults={"isFavicon": is_favicon} ) changed = False if favicon.isFavicon != is_favicon: favicon.isFavicon = True changed = True if created: favicon.faviconImage.save(os.path.basename(default), File(open(default, "rb"))) changed = True if changed: favicon.save() return favicon
def get_pwa_icons(): from django.conf import settings # noqa favicon = get_or_create_favicon("pwa_icon", settings.DEFAULT_FAVICON_PATHS["pwa_icon"]) favicon_imgs = favicon.get_favicons(config_override=settings.PWA_ICONS_CONFIG) return favicon_imgs
[docs]def is_impersonate(request: HttpRequest) -> bool: """Check whether the user was impersonated by an admin.""" if hasattr(request, "user"): return getattr(request.user, "is_impersonate", False) else: return False
[docs]def has_person(obj: Union[HttpRequest, Model]) -> bool: """Check wehether a model object has a person attribute linking it to a Person object. The passed object can also be a HttpRequest object, in which case its associated User object is unwrapped and tested. """ if isinstance(obj, HttpRequest): if hasattr(obj, "user"): obj = obj.user else: return False if obj.is_anonymous: return False person = getattr(obj, "person", None) if person is None: return False elif getattr(person, "is_dummy", False): return False else: return True
[docs]def custom_information_processor(request: Union[HttpRequest, None]) -> dict: """Provide custom information in all templates.""" from ..models import CustomMenu pwa_icons = get_pwa_icons() regrouped_pwa_icons = {} for pwa_icon in pwa_icons: regrouped_pwa_icons.setdefault(pwa_icon.rel, {}) regrouped_pwa_icons[pwa_icon.rel][pwa_icon.size] = pwa_icon context = { "FOOTER_MENU": CustomMenu.get_default("footer"), "ADMINS": settings.ADMINS, "PWA_ICONS": regrouped_pwa_icons, "SENTRY_ENABLED": settings.SENTRY_ENABLED, "SITE_PREFERENCES": get_site_preferences(), } if settings.SENTRY_ENABLED: context["SENTRY_SETTINGS"] = settings.SENTRY_SETTINGS import sentry_sdk span = sentry_sdk.Hub.current.scope.span if span is not None: context["SENTRY_TRACE_ID"] = span.to_traceparent() return context
[docs]def now_tomorrow() -> datetime: """Return current time tomorrow.""" return timezone.now() + timedelta(days=1)
[docs]def objectgetter_optional( model: Model, default: Optional[Any] = None, default_eval: bool = False ) -> Callable[[HttpRequest, Optional[int]], Model]: """Get an object by pk, defaulting to None.""" def get_object(request: HttpRequest, id_: Optional[int] = None, **kwargs) -> Optional[Model]: if id_ is not None: return get_object_or_404(model, pk=id_) else: try: return eval(default) if default_eval else default # noqa:S307 except (AttributeError, KeyError, IndexError): return None return get_object
@cache_memoize(3600) def get_content_type_by_perm(perm: str) -> Union["ContentType", None]: from django.contrib.contenttypes.models import ContentType # noqa try: return ContentType.objects.get( app_label=perm.split(".", 1)[0], permission__codename=perm.split(".", 1)[1] ) except ContentType.DoesNotExist: return None
[docs]@cache_memoize(3600) def queryset_rules_filter( obj: Union[HttpRequest, Model], queryset: QuerySet, perm: str ) -> QuerySet: """Filter queryset by user and permission.""" wanted_objects = set() if isinstance(obj, HttpRequest) and hasattr(obj, "user"): obj = obj.user for item in queryset: if obj.has_perm(perm, item): wanted_objects.add(item.pk) return queryset.filter(pk__in=wanted_objects)
[docs]def generate_random_code(length, packet_size) -> str: """Generate random code for e.g. invitations.""" return get_random_string(packet_size * length).lower()
[docs]def unread_notifications_badge(request: HttpRequest) -> int: """Generate badge content with the number of unread notifications.""" return request.user.person.unread_notifications_count
[docs]def monkey_patch() -> None: # noqa """Monkey-patch dependencies for special behaviour.""" # Unwrap promises in JSON serializer instead of stringifying from django.core.serializers import json from django.utils.functional import Promise class DjangoJSONEncoder(json.DjangoJSONEncoder): def default(self, o: Any) -> Any: if isinstance(o, Promise) and hasattr(o, "copy"): return o.copy() return super().default(o) json.DjangoJSONEncoder = DjangoJSONEncoder
[docs]def get_allowed_object_ids(request: HttpRequest, models: list) -> list: """Get all objects of all given models the user of a given request is allowed to view.""" allowed_object_ids = [] for model in models: app_label = model._meta.app_label model_name = model.__name__.lower() # Loop through the pks of all objects of the current model the user is allowed to view # and put the corresponding ids into a django-haystack-style-formatted list allowed_object_ids += [ f"{app_label}.{model_name}.{pk}" for pk in queryset_rules_filter( request, model.objects.all(), f"{app_label}.view_{model_name}_rule" ).values_list("pk", flat=True) ] return allowed_object_ids
[docs]def process_custom_context_processors(context_processors: list) -> Dict[str, Any]: """Process custom context processors.""" context = {} processors = tuple(import_string(path) for path in context_processors) for processor in processors: context.update(processor(None)) return context