From b1f721474b7d0fd386f607d531a3a92f9ac11e52 Mon Sep 17 00:00:00 2001 From: Michael Rasmussen Date: Thu, 12 Jul 2018 00:09:44 +0200 Subject: [PATCH] test access Signed-off-by: Michael Rasmussen --- app/templates/admin.html | 13 ++ app/tools.py | 224 ++++++++++++++++++++++++ db_repository/versions/005_migration.py | 19 ++ db_repository/versions/006_migration.py | 19 ++ test | 0 5 files changed, 275 insertions(+) create mode 100644 app/templates/admin.html create mode 100644 app/tools.py create mode 100644 db_repository/versions/005_migration.py create mode 100644 db_repository/versions/006_migration.py create mode 100644 test diff --git a/app/templates/admin.html b/app/templates/admin.html new file mode 100644 index 0000000..22331e2 --- /dev/null +++ b/app/templates/admin.html @@ -0,0 +1,13 @@ +{% extends "base.html" %} + +{% block title %} +{{ title }} +{% endblock %} + +{% block content %} + {% include 'menu.html' %} + {% include 'flash.html' %} +
+

{{ title }}

+
+{% endblock %} diff --git a/app/tools.py b/app/tools.py new file mode 100644 index 0000000..e10bbc0 --- /dev/null +++ b/app/tools.py @@ -0,0 +1,224 @@ +from app import db +from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl + +class DBQueryException(Exception): + pass + +class DBQuery: + + def get_albums(self, object): + albums = [] + if hasattr(object, '__tablename__'): + if isinstance(object, User): + portfolios = Portfolio.query.filter_by(owner=object) + for p in portfolios: + albums.extend(Album.query.filter_by(portfolio=p)) + elif isinstance(object, Portfolio): + albums = Album.query.filter_by(portfolio=object) + elif isinstance(object, Photo): + albums.append(object.album) + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + result = [] + for album in albums: + result.append(album) + return result + else: + raise DBQueryException("{}: not supported".format(object)) + + def get_portfolios(self, object): + portfolios = [] + if hasattr(object, '__tablename__'): + if isinstance(object, User): + portfolios = Portfolio.query.filter_by(owner=object) + elif isinstance(object, Album): + portfolios.append(object.portfolio) + elif isinstance(object, Photo): + album = self.get_albums(object) + portfolios.append(album[0].portfolio) + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + result = [] + for portfolio in portfolios: + result.append(portfolio) + return result + else: + raise DBQueryException("{}: not supported".format(object)) + + def get_owner(self, object): + if hasattr(object, '__tablename__'): + if isinstance(object, Portfolio): + return object.owner + elif isinstance(object, Album): + p = object.portfolio + return p.owner + elif isinstance(object, Photo): + a = object.album + p = a.portfolio + return p.owner + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + else: + raise DBQueryException("{}: not supported".format(object)) + + def get_users(self, object): + if hasattr(object, '__tablename__'): + users = [] + if isinstance(object, Album): + objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) + for o in objects: + users.append(o.user) + elif isinstance(object, Photo): + objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) + for o in objects: + users.append(o.user) + elif isinstance(object, Portfolio): + objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) + for o in objects: + users.append(o.user) + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + return users + else: + raise DBQueryException("{}: not supported".format(object)) + + def has_access(self, user, object): + if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): + result = False + if isinstance(object, Portfolio): + acl = self.get_acl(user, object) + if acl is not None: + result = True + elif isinstance(object, Album): + acl = self.get_acl(user, object) + if acl is not None: + result = True + else: + portfolio = self.get_portfolios(object) + acl = self.get_acl(user, portfolio[0]) + if acl is not None: + result = True + elif isinstance(object, Photo): + acl = self.get_acl(user, object) + if acl is not None: + result = True + else: + albums = self.get_albums(object) + acl = self.get_acl(user, albums[0]) + if acl is not None: + result = True + else: + portfolio = self.get_portfolios(object) + acl = self.get_acl(user, portfolio[0]) + if acl is not None: + result = True + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + return result + else: + raise DBQueryException("{}: not supported".format(object)) + + def can_read(self, user, object): + return self.has_access(user, object) + + def can_write(self, user, object): + if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): + result = False + if isinstance(object, Portfolio): + acl = self.get_acl(user, object) + if acl is not None and acl > Role.read: + result = True + elif isinstance(object, Album): + acl = self.get_acl(user, object) + if acl is not None and acl > Role.read: + result = True + else: + portfolio = self.get_portfolios(object) + acl = self.get_acl(user, portfolio[0]) + if acl is not None and acl > Role.read: + result = True + elif isinstance(object, Photo): + acl = self.get_acl(user, object) + if acl is not None and acl > Role.read: + result = True + else: + albums = self.get_albums(object) + acl = self.get_acl(user, albums[0]) + if acl is not None and acl > Role.read: + result = True + else: + portfolio = self.get_portfolios(object) + acl = self.get_acl(user, portfolio[0]) + if acl is not None and acl > Role.read: + result = True + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + return result + else: + raise DBQueryException("{}: not supported".format(object)) + + def get_acl(self, user, object): + if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): + if isinstance(object, Album): + if user.is_admin or user == self.get_owner(object): + return Role.admin + acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) + for a in acl: + if AccessRight.query.get(a.id).user_id == user.id: + return a.right + # Check if access given by parent (Portfolio) + return self.get_acl(user, object.portfolio) + elif isinstance(object, Photo): + if user.is_admin or user == self.get_owner(object): + return Role.admin + acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) + for a in acl: + if AccessRight.query.get(a.id).user_id == user.id: + return a.right + # Check if access given by parent (Album) + return self.get_acl(user, object.album) + elif isinstance(object, Portfolio): + if user.is_admin or user == self.get_owner(object): + return Role.admin + acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) + for a in acl: + if AccessRight.query.get(a.id).user_id == user.id: + return a.right + else: + raise DBQueryException("%s: not supported" % object.__tablename__) + return None + else: + raise DBQueryException("{}: not supported".format(object)) + + def get_albums_for_user(self, user, owner = False, hidden = False): + if isinstance(user, User): + albums = [] + # Find all albums with direct access + objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join( + Album, AlbumAcl.c.album == Album.id).join( + AccessRight, AccessRight.id == AlbumAcl.c.acl).join( + Portfolio, Portfolio.id == Album.portfolio_id).filter( + AccessRight.user == user).all() + for object in objects: + if not owner and object.Portfolio.owner == user: + continue + if not hidden and not object.Album.visible: + continue + albums.append(object.Album) + # Find all albums with access through inherited access rights from portfolio + temp = [] + objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join( + Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join( + AccessRight, AccessRight.id == PortfolioAcl.c.acl).join( + Album, Album.portfolio_id == Portfolio.id).all() + for object in objects: + if not owner and object.Portfolio.owner == user: + continue + if not hidden and not object.Album.visible: + continue + temp.append(object.Album) + # Merge albums with temp removing duplicates + albums = albums + list(set(temp) - set(albums)) + return albums + else: + raise DBQueryException("%s: not supported" % object.__tablename__) diff --git a/db_repository/versions/005_migration.py b/db_repository/versions/005_migration.py new file mode 100644 index 0000000..96092d7 --- /dev/null +++ b/db_repository/versions/005_migration.py @@ -0,0 +1,19 @@ +from sqlalchemy import * +from migrate import * + + +from migrate.changeset import schema +pre_meta = MetaData() +post_meta = MetaData() + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; bind + # migrate_engine to your metadata + pre_meta.bind = migrate_engine + post_meta.bind = migrate_engine + + +def downgrade(migrate_engine): + # Operations to reverse the above upgrade go here. + pre_meta.bind = migrate_engine + post_meta.bind = migrate_engine diff --git a/db_repository/versions/006_migration.py b/db_repository/versions/006_migration.py new file mode 100644 index 0000000..96092d7 --- /dev/null +++ b/db_repository/versions/006_migration.py @@ -0,0 +1,19 @@ +from sqlalchemy import * +from migrate import * + + +from migrate.changeset import schema +pre_meta = MetaData() +post_meta = MetaData() + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; bind + # migrate_engine to your metadata + pre_meta.bind = migrate_engine + post_meta.bind = migrate_engine + + +def downgrade(migrate_engine): + # Operations to reverse the above upgrade go here. + pre_meta.bind = migrate_engine + post_meta.bind = migrate_engine diff --git a/test b/test new file mode 100644 index 0000000..e69de29 -- 2.39.2