]> git.datanom.net - wpp.git/blobdiff - app/tools.py
test access
[wpp.git] / app / tools.py
diff --git a/app/tools.py b/app/tools.py
new file mode 100644 (file)
index 0000000..e10bbc0
--- /dev/null
@@ -0,0 +1,224 @@
+from app import db
+from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl
+
+class DBQueryException(Exception):
+    pass
+    
+class DBQuery:
+    
+    def get_albums(self, object):
+        albums = []
+        if hasattr(object, '__tablename__'):
+            if isinstance(object, User):
+                portfolios = Portfolio.query.filter_by(owner=object)
+                for p in portfolios:
+                    albums.extend(Album.query.filter_by(portfolio=p))
+            elif isinstance(object, Portfolio):
+                albums = Album.query.filter_by(portfolio=object)
+            elif isinstance(object, Photo):
+                albums.append(object.album)
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            result = []
+            for album in albums:
+                result.append(album)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_portfolios(self, object):
+        portfolios = []
+        if hasattr(object, '__tablename__'):
+            if isinstance(object, User):
+                portfolios = Portfolio.query.filter_by(owner=object)
+            elif isinstance(object, Album):
+                portfolios.append(object.portfolio)
+            elif isinstance(object, Photo):
+                album = self.get_albums(object)
+                portfolios.append(album[0].portfolio)
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            result = []
+            for portfolio in portfolios:
+                result.append(portfolio)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_owner(self, object):
+        if hasattr(object, '__tablename__'):
+            if isinstance(object, Portfolio):
+                return object.owner
+            elif isinstance(object, Album):
+                p = object.portfolio
+                return p.owner
+            elif isinstance(object, Photo):
+                a = object.album
+                p = a.portfolio
+                return p.owner
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_users(self, object):
+        if hasattr(object, '__tablename__'):
+            users = []
+            if isinstance(object, Album):
+                objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
+                for o in objects:
+                    users.append(o.user)
+            elif isinstance(object, Photo):
+                objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
+                for o in objects:
+                    users.append(o.user)
+            elif isinstance(object, Portfolio):
+                objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
+                for o in objects:
+                    users.append(o.user)
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return users
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def has_access(self, user, object):
+        if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+            result = False
+            if isinstance(object, Portfolio):
+                acl = self.get_acl(user, object)
+                if acl is not None:
+                    result = True
+            elif isinstance(object, Album):
+                acl = self.get_acl(user, object)
+                if acl is not None:
+                    result = True
+                else:
+                    portfolio = self.get_portfolios(object)
+                    acl = self.get_acl(user, portfolio[0])
+                    if acl is not None:
+                        result = True
+            elif isinstance(object, Photo):
+                acl = self.get_acl(user, object)
+                if acl is not None:
+                    result = True
+                else:
+                    albums = self.get_albums(object)
+                    acl = self.get_acl(user, albums[0])
+                    if acl is not None:
+                        result = True
+                    else:
+                        portfolio = self.get_portfolios(object)
+                        acl = self.get_acl(user, portfolio[0])
+                        if acl is not None:
+                            result = True
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+        
+    def can_read(self, user, object):
+        return self.has_access(user, object)
+
+    def can_write(self, user, object):
+        if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+            result = False
+            if isinstance(object, Portfolio):
+                acl = self.get_acl(user, object)
+                if acl is not None and acl > Role.read:
+                    result = True
+            elif isinstance(object, Album):
+                acl = self.get_acl(user, object)
+                if acl is not None and acl > Role.read:
+                    result = True
+                else:
+                    portfolio = self.get_portfolios(object)
+                    acl = self.get_acl(user, portfolio[0])
+                    if acl is not None and acl > Role.read:
+                        result = True
+            elif isinstance(object, Photo):
+                acl = self.get_acl(user, object)
+                if acl is not None and acl > Role.read:
+                    result = True
+                else:
+                    albums = self.get_albums(object)
+                    acl = self.get_acl(user, albums[0])
+                    if acl is not None and acl > Role.read:
+                        result = True
+                    else:
+                        portfolio = self.get_portfolios(object)
+                        acl = self.get_acl(user, portfolio[0])
+                        if acl is not None and acl > Role.read:
+                            result = True
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return result
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_acl(self, user, object): 
+        if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+            if isinstance(object, Album):
+                if user.is_admin or user == self.get_owner(object):
+                    return Role.admin
+                acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
+                for a in acl:
+                    if AccessRight.query.get(a.id).user_id == user.id:
+                        return a.right
+                # Check if access given by parent (Portfolio)
+                return self.get_acl(user, object.portfolio)
+            elif isinstance(object, Photo):
+                if user.is_admin or user == self.get_owner(object):
+                    return Role.admin
+                acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
+                for a in acl:
+                    if AccessRight.query.get(a.id).user_id == user.id:
+                        return a.right
+                # Check if access given by parent (Album)
+                return self.get_acl(user, object.album)
+            elif isinstance(object, Portfolio):
+                if user.is_admin or user == self.get_owner(object):
+                    return Role.admin
+                acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
+                for a in acl:
+                    if AccessRight.query.get(a.id).user_id == user.id:
+                        return a.right
+            else:
+                raise DBQueryException("%s: not supported" % object.__tablename__)
+            return None
+        else:
+            raise DBQueryException("{}: not supported".format(object))
+
+    def get_albums_for_user(self, user, owner = False, hidden = False):
+        if isinstance(user, User):
+            albums = []
+            # Find all albums with direct access
+            objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join(
+                Album, AlbumAcl.c.album == Album.id).join(
+                AccessRight, AccessRight.id == AlbumAcl.c.acl).join(
+                Portfolio, Portfolio.id == Album.portfolio_id).filter(
+                AccessRight.user == user).all()
+            for object in objects:
+                if not owner and object.Portfolio.owner == user:
+                    continue
+                if not hidden and not object.Album.visible:
+                    continue
+                albums.append(object.Album)
+            # Find all albums with access through inherited access rights from portfolio
+            temp = []
+            objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join(
+                Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join(
+                AccessRight, AccessRight.id == PortfolioAcl.c.acl).join(
+                Album, Album.portfolio_id == Portfolio.id).all()
+            for object in objects:
+                if not owner and object.Portfolio.owner == user:
+                    continue
+                if not hidden and not object.Album.visible:
+                    continue
+                temp.append(object.Album)
+            # Merge albums with temp removing duplicates
+            albums = albums + list(set(temp) - set(albums))
+            return albums
+        else:
+            raise DBQueryException("%s: not supported" % object.__tablename__)
This page took 0.03636 seconds and 5 git commands to generate.