]> git.datanom.net - wpp.git/commitdiff
test access
authorMichael Rasmussen <mir@datanom.net>
Wed, 11 Jul 2018 22:09:44 +0000 (00:09 +0200)
committerMichael Rasmussen <mir@datanom.net>
Wed, 11 Jul 2018 22:09:44 +0000 (00:09 +0200)
Signed-off-by: Michael Rasmussen <mir@datanom.net>
app/templates/admin.html [new file with mode: 0644]
app/tools.py [new file with mode: 0644]
db_repository/versions/005_migration.py [new file with mode: 0644]
db_repository/versions/006_migration.py [new file with mode: 0644]
test [new file with mode: 0644]

diff --git a/app/templates/admin.html b/app/templates/admin.html
new file mode 100644 (file)
index 0000000..22331e2
--- /dev/null
@@ -0,0 +1,13 @@
+{% extends "base.html" %}
+
+{% block title %}
+{{ title }}
+{% endblock %}
+
+{% block content %}
+    {% include 'menu.html' %}
+    {% include 'flash.html' %}
+    <div class="container">
+        <h1 class="center">{{ title }}</h1>
+    </div> <!-- /container -->
+{% endblock %}
diff --git a/app/tools.py b/app/tools.py
new file mode 100644 (file)
index 0000000..e10bbc0
--- /dev/null
@@ -0,0 +1,224 @@
+from app import db
+from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl
+
+class DBQueryException(Exception):
+    pass
+    
+class DBQuery:
+    
+    def get_albums(self, object):
+        albums = []
+        if hasattr(object, '__tablename__'):
+            if isinstance(object, User):
+                portfolios = Portfolio.query.filter_by(owner=object)
+                for p in portfolios:
+                    albums.extend(Album.query.filter_by(portfolio=p))
+            elif isinstance(object, Portfolio):
+                albums = Album.query.filter_by(portfolio=object)
+            elif isinstance(object, Photo):
+                albums.append(object.album)
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            result = []
+            for album in albums:
+                result.append(album)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_portfolios(self, object):
+        portfolios = []
+        if hasattr(object, '__tablename__'):
+            if isinstance(object, User):
+                portfolios = Portfolio.query.filter_by(owner=object)
+            elif isinstance(object, Album):
+                portfolios.append(object.portfolio)
+            elif isinstance(object, Photo):
+                album = self.get_albums(object)
+                portfolios.append(album[0].portfolio)
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            result = []
+            for portfolio in portfolios:
+                result.append(portfolio)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_owner(self, object):
+        if hasattr(object, '__tablename__'):
+            if isinstance(object, Portfolio):
+                return object.owner
+            elif isinstance(object, Album):
+                p = object.portfolio
+                return p.owner
+            elif isinstance(object, Photo):
+                a = object.album
+                p = a.portfolio
+                return p.owner
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_users(self, object):
+        if hasattr(object, '__tablename__'):
+            users = []
+            if isinstance(object, Album):
+                objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
+                for o in objects:
+                    users.append(o.user)
+            elif isinstance(object, Photo):
+                objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
+                for o in objects:
+                    users.append(o.user)
+            elif isinstance(object, Portfolio):
+                objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
+                for o in objects:
+                    users.append(o.user)
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return users
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def has_access(self, user, object):
+        if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+            result = False
+            if isinstance(object, Portfolio):
+                acl = self.get_acl(user, object)
+                if acl is not None:
+                    result = True
+            elif isinstance(object, Album):
+                acl = self.get_acl(user, object)
+                if acl is not None:
+                    result = True
+                else:
+                    portfolio = self.get_portfolios(object)
+                    acl = self.get_acl(user, portfolio[0])
+                    if acl is not None:
+                        result = True
+            elif isinstance(object, Photo):
+                acl = self.get_acl(user, object)
+                if acl is not None:
+                    result = True
+                else:
+                    albums = self.get_albums(object)
+                    acl = self.get_acl(user, albums[0])
+                    if acl is not None:
+                        result = True
+                    else:
+                        portfolio = self.get_portfolios(object)
+                        acl = self.get_acl(user, portfolio[0])
+                        if acl is not None:
+                            result = True
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+        
+    def can_read(self, user, object):
+        return self.has_access(user, object)
+
+    def can_write(self, user, object):
+        if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+            result = False
+            if isinstance(object, Portfolio):
+                acl = self.get_acl(user, object)
+                if acl is not None and acl > Role.read:
+                    result = True
+            elif isinstance(object, Album):
+                acl = self.get_acl(user, object)
+                if acl is not None and acl > Role.read:
+                    result = True
+                else:
+                    portfolio = self.get_portfolios(object)
+                    acl = self.get_acl(user, portfolio[0])
+                    if acl is not None and acl > Role.read:
+                        result = True
+            elif isinstance(object, Photo):
+                acl = self.get_acl(user, object)
+                if acl is not None and acl > Role.read:
+                    result = True
+                else:
+                    albums = self.get_albums(object)
+                    acl = self.get_acl(user, albums[0])
+                    if acl is not None and acl > Role.read:
+                        result = True
+                    else:
+                        portfolio = self.get_portfolios(object)
+                        acl = self.get_acl(user, portfolio[0])
+                        if acl is not None and acl > Role.read:
+                            result = True
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_acl(self, user, object): 
+        if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+            if isinstance(object, Album):
+                if user.is_admin or user == self.get_owner(object):
+                    return Role.admin
+                acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
+                for a in acl:
+                    if AccessRight.query.get(a.id).user_id == user.id:
+                        return a.right
+                # Check if access given by parent (Portfolio)
+                return self.get_acl(user, object.portfolio)
+            elif isinstance(object, Photo):
+                if user.is_admin or user == self.get_owner(object):
+                    return Role.admin
+                acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
+                for a in acl:
+                    if AccessRight.query.get(a.id).user_id == user.id:
+                        return a.right
+                # Check if access given by parent (Album)
+                return self.get_acl(user, object.album)
+            elif isinstance(object, Portfolio):
+                if user.is_admin or user == self.get_owner(object):
+                    return Role.admin
+                acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
+                for a in acl:
+                    if AccessRight.query.get(a.id).user_id == user.id:
+                        return a.right
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return None
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_albums_for_user(self, user, owner = False, hidden = False):
+        if isinstance(user, User):
+            albums = []
+            # Find all albums with direct access
+            objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join(
+                Album, AlbumAcl.c.album == Album.id).join(
+                AccessRight, AccessRight.id == AlbumAcl.c.acl).join(
+                Portfolio, Portfolio.id == Album.portfolio_id).filter(
+                AccessRight.user == user).all()
+            for object in objects:
+                if not owner and object.Portfolio.owner == user:
+                    continue
+                if not hidden and not object.Album.visible:
+                    continue
+                albums.append(object.Album)
+            # Find all albums with access through inherited access rights from portfolio
+            temp = []
+            objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join(
+                Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join(
+                AccessRight, AccessRight.id == PortfolioAcl.c.acl).join(
+                Album, Album.portfolio_id == Portfolio.id).all()
+            for object in objects:
+                if not owner and object.Portfolio.owner == user:
+                    continue
+                if not hidden and not object.Album.visible:
+                    continue
+                temp.append(object.Album)
+            # Merge albums with temp removing duplicates
+            albums = albums + list(set(temp) - set(albums))
+            return albums
+        else:
+            raise DBQueryException("%s: not supported" % object.__tablename__)
diff --git a/db_repository/versions/005_migration.py b/db_repository/versions/005_migration.py
new file mode 100644 (file)
index 0000000..96092d7
--- /dev/null
@@ -0,0 +1,19 @@
+from sqlalchemy import *
+from migrate import *
+
+
+from migrate.changeset import schema
+pre_meta = MetaData()
+post_meta = MetaData()
+
+def upgrade(migrate_engine):
+    # Upgrade operations go here. Don't create your own engine; bind
+    # migrate_engine to your metadata
+    pre_meta.bind = migrate_engine
+    post_meta.bind = migrate_engine
+
+
+def downgrade(migrate_engine):
+    # Operations to reverse the above upgrade go here.
+    pre_meta.bind = migrate_engine
+    post_meta.bind = migrate_engine
diff --git a/db_repository/versions/006_migration.py b/db_repository/versions/006_migration.py
new file mode 100644 (file)
index 0000000..96092d7
--- /dev/null
@@ -0,0 +1,19 @@
+from sqlalchemy import *
+from migrate import *
+
+
+from migrate.changeset import schema
+pre_meta = MetaData()
+post_meta = MetaData()
+
+def upgrade(migrate_engine):
+    # Upgrade operations go here. Don't create your own engine; bind
+    # migrate_engine to your metadata
+    pre_meta.bind = migrate_engine
+    post_meta.bind = migrate_engine
+
+
+def downgrade(migrate_engine):
+    # Operations to reverse the above upgrade go here.
+    pre_meta.bind = migrate_engine
+    post_meta.bind = migrate_engine
diff --git a/test b/test
new file mode 100644 (file)
index 0000000..e69de29
This page took 0.043388 seconds and 5 git commands to generate.