--- /dev/null
+from app import db
+from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl
+
+class DBQueryException(Exception):
+ pass
+
+class DBQuery:
+
+ def get_albums(self, object):
+ albums = []
+ if hasattr(object, '__tablename__'):
+ if isinstance(object, User):
+ portfolios = Portfolio.query.filter_by(owner=object)
+ for p in portfolios:
+ albums.extend(Album.query.filter_by(portfolio=p))
+ elif isinstance(object, Portfolio):
+ albums = Album.query.filter_by(portfolio=object)
+ elif isinstance(object, Photo):
+ albums.append(object.album)
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ result = []
+ for album in albums:
+ result.append(album)
+ return result
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def get_portfolios(self, object):
+ portfolios = []
+ if hasattr(object, '__tablename__'):
+ if isinstance(object, User):
+ portfolios = Portfolio.query.filter_by(owner=object)
+ elif isinstance(object, Album):
+ portfolios.append(object.portfolio)
+ elif isinstance(object, Photo):
+ album = self.get_albums(object)
+ portfolios.append(album[0].portfolio)
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ result = []
+ for portfolio in portfolios:
+ result.append(portfolio)
+ return result
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def get_owner(self, object):
+ if hasattr(object, '__tablename__'):
+ if isinstance(object, Portfolio):
+ return object.owner
+ elif isinstance(object, Album):
+ p = object.portfolio
+ return p.owner
+ elif isinstance(object, Photo):
+ a = object.album
+ p = a.portfolio
+ return p.owner
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def get_users(self, object):
+ if hasattr(object, '__tablename__'):
+ users = []
+ if isinstance(object, Album):
+ objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
+ for o in objects:
+ users.append(o.user)
+ elif isinstance(object, Photo):
+ objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
+ for o in objects:
+ users.append(o.user)
+ elif isinstance(object, Portfolio):
+ objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
+ for o in objects:
+ users.append(o.user)
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ return users
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def has_access(self, user, object):
+ if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+ result = False
+ if isinstance(object, Portfolio):
+ acl = self.get_acl(user, object)
+ if acl is not None:
+ result = True
+ elif isinstance(object, Album):
+ acl = self.get_acl(user, object)
+ if acl is not None:
+ result = True
+ else:
+ portfolio = self.get_portfolios(object)
+ acl = self.get_acl(user, portfolio[0])
+ if acl is not None:
+ result = True
+ elif isinstance(object, Photo):
+ acl = self.get_acl(user, object)
+ if acl is not None:
+ result = True
+ else:
+ albums = self.get_albums(object)
+ acl = self.get_acl(user, albums[0])
+ if acl is not None:
+ result = True
+ else:
+ portfolio = self.get_portfolios(object)
+ acl = self.get_acl(user, portfolio[0])
+ if acl is not None:
+ result = True
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ return result
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def can_read(self, user, object):
+ return self.has_access(user, object)
+
+ def can_write(self, user, object):
+ if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+ result = False
+ if isinstance(object, Portfolio):
+ acl = self.get_acl(user, object)
+ if acl is not None and acl > Role.read:
+ result = True
+ elif isinstance(object, Album):
+ acl = self.get_acl(user, object)
+ if acl is not None and acl > Role.read:
+ result = True
+ else:
+ portfolio = self.get_portfolios(object)
+ acl = self.get_acl(user, portfolio[0])
+ if acl is not None and acl > Role.read:
+ result = True
+ elif isinstance(object, Photo):
+ acl = self.get_acl(user, object)
+ if acl is not None and acl > Role.read:
+ result = True
+ else:
+ albums = self.get_albums(object)
+ acl = self.get_acl(user, albums[0])
+ if acl is not None and acl > Role.read:
+ result = True
+ else:
+ portfolio = self.get_portfolios(object)
+ acl = self.get_acl(user, portfolio[0])
+ if acl is not None and acl > Role.read:
+ result = True
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ return result
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def get_acl(self, user, object):
+ if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
+ if isinstance(object, Album):
+ if user.is_admin or user == self.get_owner(object):
+ return Role.admin
+ acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
+ for a in acl:
+ if AccessRight.query.get(a.id).user_id == user.id:
+ return a.right
+ # Check if access given by parent (Portfolio)
+ return self.get_acl(user, object.portfolio)
+ elif isinstance(object, Photo):
+ if user.is_admin or user == self.get_owner(object):
+ return Role.admin
+ acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
+ for a in acl:
+ if AccessRight.query.get(a.id).user_id == user.id:
+ return a.right
+ # Check if access given by parent (Album)
+ return self.get_acl(user, object.album)
+ elif isinstance(object, Portfolio):
+ if user.is_admin or user == self.get_owner(object):
+ return Role.admin
+ acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
+ for a in acl:
+ if AccessRight.query.get(a.id).user_id == user.id:
+ return a.right
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)
+ return None
+ else:
+ raise DBQueryException("{}: not supported".format(object))
+
+ def get_albums_for_user(self, user, owner = False, hidden = False):
+ if isinstance(user, User):
+ albums = []
+ # Find all albums with direct access
+ objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join(
+ Album, AlbumAcl.c.album == Album.id).join(
+ AccessRight, AccessRight.id == AlbumAcl.c.acl).join(
+ Portfolio, Portfolio.id == Album.portfolio_id).filter(
+ AccessRight.user == user).all()
+ for object in objects:
+ if not owner and object.Portfolio.owner == user:
+ continue
+ if not hidden and not object.Album.visible:
+ continue
+ albums.append(object.Album)
+ # Find all albums with access through inherited access rights from portfolio
+ temp = []
+ objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join(
+ Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join(
+ AccessRight, AccessRight.id == PortfolioAcl.c.acl).join(
+ Album, Album.portfolio_id == Portfolio.id).all()
+ for object in objects:
+ if not owner and object.Portfolio.owner == user:
+ continue
+ if not hidden and not object.Album.visible:
+ continue
+ temp.append(object.Album)
+ # Merge albums with temp removing duplicates
+ albums = albums + list(set(temp) - set(albums))
+ return albums
+ else:
+ raise DBQueryException("%s: not supported" % object.__tablename__)