from __future__ import annotations
import time
from typing import TYPE_CHECKING, Any, Callable, ClassVar
from django.conf import settings
from django.contrib.auth import get_user_model
from django.contrib.auth.models import AbstractBaseUser, BaseUserManager
from django.db import connection, models, transaction
from django.dispatch import Signal
from django.utils.translation import gettext_lazy as _
from django_tenants.models import TenantMixin
from django_tenants.utils import (
get_public_schema_name,
get_tenant_model,
tenant_context,
)
from tenant_users.constants import TENANT_CACHE_NAME, TENANT_DELETE_ERROR_MESSAGE
from tenant_users.permissions.models import (
PermissionsMixinFacade,
UserTenantPermissions,
)
# An existing user removed from a tenant
tenant_user_removed = Signal()
# An existing user added to a tenant
tenant_user_added = Signal()
# A new user is created
tenant_user_created = Signal()
# An existing user is deleted
tenant_user_deleted = Signal()
if TYPE_CHECKING:
from typing import TypeVar
# TypeVar for user profile models - only for type checkers
UserProfileT = TypeVar("UserProfileT", bound="UserProfile")
class InactiveError(Exception):
pass
class ExistsError(Exception):
pass
class DeleteError(Exception):
pass
class SchemaError(Exception):
pass
def schema_required(func: Callable[..., Any]) -> Callable[..., Any]:
"""Decorator to ensure a tenant method executes within its own schema context.
This decorator automatically manages tenant schema switching for methods that
need to operate on tenant-specific data. It temporarily switches the database
connection to the tenant's schema, executes the decorated method, and properly
restores the original connection state.
Why this exists:
- TenantBase methods like add_user(), remove_user() must create and access
UserTenantPermissions instances within the tenant's schema
- Without this decorator, callers would need to manually manage tenant context
before calling these methods, risking data corruption or missing records
- Uses django-tenants' tenant_context to properly manage both
connection.schema_name and connection.tenant state
Args:
func: The tenant method to decorate. Should be a method on a tenant instance.
Returns:
The decorated function that executes within the tenant's schema context.
"""
def inner(self, *args, **kwargs):
with tenant_context(self):
return func(self, *args, **kwargs)
return inner
[docs]
class TenantBase(TenantMixin):
"""Contains global data and settings for the tenant model."""
slug = models.SlugField(_("Tenant URL Name"), blank=True)
# The owner of the tenant. Only they can delete it. This can be changed,
# but it can't be blank. There should always be an owner.
owner = models.ForeignKey(
settings.AUTH_USER_MODEL,
on_delete=models.CASCADE,
)
created = models.DateTimeField(auto_now_add=True)
modified = models.DateTimeField(auto_now=True)
# Schema will be automatically created and synced when it is saved
auto_create_schema = True
# Schema will be automatically deleted when related tenant is deleted
auto_drop_schema = True
[docs]
def delete(self, *args, force_drop: bool = False, **kwargs) -> None:
"""Override deleting of Tenant object.
Args:
force_drop (bool): If True, forces the deletion of the object. Defaults to False.
*args: Variable length argument list.
**kwargs: Arbitrary keyword arguments.
"""
if force_drop:
super().delete(force_drop, *args, **kwargs)
else:
raise DeleteError(TENANT_DELETE_ERROR_MESSAGE)
[docs]
@schema_required
@transaction.atomic
def add_user(
self, user_obj, *, is_superuser: bool = False, is_staff: bool = False
) -> None:
"""Add user to tenant.
Args:
user_obj: The user object to be added to the tenant.
is_superuser (bool): If True, assigns superuser privileges to the user. Defaults to False.
is_staff (bool): If True, assigns staff status to the user. Defaults to False.
"""
# User already is linked here..
if self.user_set.filter(pk=user_obj.pk).exists():
raise ExistsError(
f"User already added to tenant: {user_obj}",
)
# User not linked to this tenant, so we need to create
# tenant permissions
UserTenantPermissions.objects.create(
profile=user_obj,
is_staff=is_staff,
is_superuser=is_superuser,
)
# Link user to tenant
user_obj.tenants.add(self)
tenant_user_added.send(
sender=self.__class__,
user=user_obj,
tenant=self,
)
[docs]
@schema_required
@transaction.atomic
def remove_user(self, user_obj) -> None:
"""Remove user from tenant."""
# Test that user is already in the tenant
self.user_set.get(pk=user_obj.pk)
# Dont allow removing an owner from a tenant. This must be done
# Through delete tenant or transfer_ownership
if user_obj.pk == self.owner.pk:
raise DeleteError(
f"Cannot remove owner from tenant: {self.owner}",
)
user_tenant_perms = UserTenantPermissions.objects.get(profile=user_obj)
# Remove all current groups from user..
groups = user_tenant_perms.groups
groups.clear()
# Unlink from tenant
UserTenantPermissions.objects.filter(pk=user_tenant_perms.pk).delete()
user_obj.tenants.remove(self)
# Remove tenant specific cached attributes
if TENANT_CACHE_NAME in user_obj.__dict__:
cache = user_obj.__dict__[TENANT_CACHE_NAME]
if self.schema_name in cache:
del cache[self.schema_name]
tenant_user_removed.send(
sender=self.__class__,
user=user_obj,
tenant=self,
)
[docs]
@transaction.atomic
def delete_tenant(self) -> None:
"""Mark tenant for deletion.
We don't actually delete the tenant out of the database, but we
associate them with a the public schema user and change their url
to reflect their delete datetime and previous owner
The caller should verify that the user deleting the tenant owns
the tenant.
"""
# Prevent public tenant schema from being deleted
if self.schema_name == get_public_schema_name():
raise ValueError("Cannot delete public tenant schema")
for user_obj in self.user_set.all():
# Don't delete owner at this point
if user_obj.pk == self.owner.pk:
continue
self.remove_user(user_obj)
# Seconds since epoch, time() returns a float, so we convert to
# an int first to truncate the decimal portion
time_string = str(int(time.time()))
new_url = f"{time_string}-{self.owner.pk!s}-{self.domain_url}" # type: ignore[has-type]
self.domain_url = new_url
# The schema generated each time (even with same url slug) will
# be unique so we do not have to worry about a conflict with that
# Set the owner to the system user (public schema owner)
public_tenant = get_tenant_model().objects.get(
schema_name=get_public_schema_name(),
)
old_owner = self.owner
# Transfer ownership to system
self.transfer_ownership(public_tenant.owner)
# Remove old owner as a user if the owner still exists after
# the transfer
if self.user_set.filter(pk=user_obj.pk).exists():
self.remove_user(old_owner)
[docs]
@schema_required
@transaction.atomic
def transfer_ownership(self, new_owner) -> None:
old_owner = self.owner
# Remove current owner superuser status but retain any assigned role(s)
old_owner_tenant_permissions = old_owner.tenant_perms
old_owner_tenant_permissions.is_superuser = False
old_owner_tenant_permissions.save(update_fields=["is_superuser"])
self.owner = new_owner
# If original has no permissions left, remove user from tenant
if not old_owner_tenant_permissions.groups.exists():
self.remove_user(old_owner)
try:
# Set new user as superuser in this tenant if user already exists
user = self.user_set.get(pk=new_owner.pk)
user_tenant = user.usertenantpermissions
user_tenant.is_superuser = True
user_tenant.save(update_fields=["is_superuser"])
except get_user_model().DoesNotExist:
# New user is not a part of the system, add them as a user..
self.add_user(new_owner, is_superuser=True)
self.save(update_fields=["owner"])
[docs]
class UserProfileManager(BaseUserManager):
@transaction.atomic
def _create_user(
self,
email: str,
password: str | None,
*,
is_staff: bool = False,
is_superuser: bool = False,
is_verified: bool = False,
**extra_fields,
):
# Do some schema validation to protect against calling create user from
# inside a tenant. Must create public tenant permissions during user
# creation. This happens during assign role. This function cannot be
# used until a public schema already exists
if connection.schema_name != get_public_schema_name(): # type: ignore[attr-defined]
raise SchemaError(
"Schema must be public for UserProfileManager user creation",
)
if not email:
raise ValueError("Users must have an email address.")
email = self.normalize_email(email)
UserModel = get_user_model()
profile = UserModel.objects.filter(email=email).first()
if profile and profile.is_active:
raise ExistsError("User already exists!")
# Profile might exist but not be active. If a profile does exist
# all previous history logs will still be associated with the user,
# but will not be accessible because the user won't be linked to
# any tenants from the user's previous membership. There are two
# exceptions to this. 1) The user gets re-invited to a tenant it
# previously had access to (this is good thing IMO). 2) The public
# schema if they had previous activity associated would be available
profile = profile if profile else UserModel()
profile.email = email
profile.is_active = True
profile.is_verified = is_verified
profile.set_password(password)
for attr, value in extra_fields.items():
setattr(profile, attr, value)
profile.save()
# Get public tenant tenant and link the user (no perms)
public_tenant = get_tenant_model().objects.get(
schema_name=get_public_schema_name(),
)
public_tenant.add_user(profile)
# Public tenant permissions object was created when we assigned a
# role to the user above, if we are a staff/superuser we set it here
if is_staff or is_superuser:
user_tenant = profile.usertenantpermissions
user_tenant.is_staff = is_staff
user_tenant.is_superuser = is_superuser
user_tenant.save()
tenant_user_created.send(sender=self.__class__, user=profile)
return profile
[docs]
def create_user(
self,
email: str | None = None,
password: str | None = None,
*,
is_staff: bool = False,
**extra_fields,
):
if not email:
raise ValueError("Users must have an email address.")
return self._create_user(
email=email,
password=password,
is_staff=is_staff,
is_superuser=False,
is_verified=False,
**extra_fields,
)
[docs]
def create_superuser(self, password: str, email: str, **extra_fields):
if not email:
raise ValueError("Users must have an email address.")
return self._create_user(
email=email,
password=password,
is_staff=True,
is_superuser=True,
is_verified=True,
**extra_fields,
)
[docs]
@transaction.atomic
def delete_user(self, user_obj) -> None:
# Check to make sure we don't try to delete the public tenant owner
# that would be bad....
public_tenant = get_tenant_model().objects.get(
schema_name=get_public_schema_name(),
)
if user_obj.pk == public_tenant.owner.pk:
raise DeleteError("Cannot delete the public tenant owner!")
# This includes the linked public tenant 'tenant'. It will delete the
# Tenant permissions and unlink when user is deleted
for tenant in user_obj.tenants.all():
# If user owns the tenant, we call delete on the tenant
# which will delete the user from the tenant as well
if tenant.owner.pk == user_obj.pk:
# Delete tenant will handle any other linked users to
# that tenant
tenant.delete_tenant()
else:
# Unlink user from all roles in any tenant it doesn't own
tenant.remove_user(user_obj)
# Set is_active, don't actually delete the object
user_obj.is_active = False
user_obj.save(update_fields=["is_active"])
tenant_user_deleted.send(sender=self.__class__, user=user_obj)
# This cant be located in the users app otherwise it would get loaded into
# both the public schema and all tenant schemas. We want profiles only
# in the public schema alongside the TenantBase model
[docs]
class UserProfile(AbstractBaseUser, PermissionsMixinFacade):
"""Authentication model for django-tenant-users stored in the public tenant schema.
This class represents an authentication-only model that is centrally located in the public tenant schema,
yet maintains a link to the UserTenantPermissions model for authorization. It enables a singular global
user profile across all tenants while allowing permissions to be managed on a per-tenant basis. This design
ensures a unified user identity across different tenants with distinct permission sets in each tenant context.
Access to a user's permissions requires routing the request through the relevant tenant. The implementation
necessitates using the ModelBackend for proper integration.
Inherits:
AbstractBaseUser: Django's base class for user models, providing core user authentication features.
PermissionsMixinFacade: A facade to adapt Django's PermissionMixin for multi-tenant environments.
"""
USERNAME_FIELD = "email"
objects: ClassVar[UserProfileManager] = UserProfileManager()
tenants: models.ManyToManyField[Any, Any] = models.ManyToManyField(
settings.TENANT_MODEL,
verbose_name=_("tenants"),
blank=True,
help_text=_("The tenants this user belongs to."),
related_name="user_set",
)
email = models.EmailField(
_("Email Address"),
unique=True,
db_index=True,
)
is_active = models.BooleanField(_("active"), default=True)
# Tracks whether the user's email has been verified
is_verified = models.BooleanField(_("verified"), default=False)
[docs]
def has_verified_email(self) -> bool:
return self.is_verified
[docs]
def delete(
self, *args, force_drop: bool = False, **kwargs
) -> tuple[int, dict[str, int]]:
if force_drop:
return super().delete(*args, **kwargs)
raise DeleteError(
"UserProfile.objects.delete_user() should be used.",
)
def __str__(self) -> str:
return self.email
[docs]
def get_short_name(self) -> str:
return self.email
[docs]
def get_full_name(self) -> str:
"""Return string representation."""
return str(self)