from app import db from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl class DBQueryException(Exception): pass class DBQuery: def get_albums(self, object): albums = [] if hasattr(object, '__tablename__'): if isinstance(object, User): portfolios = Portfolio.query.filter_by(owner=object) for p in portfolios: albums.extend(Album.query.filter_by(portfolio=p)) elif isinstance(object, Portfolio): albums = Album.query.filter_by(portfolio=object) elif isinstance(object, Photo): albums.append(object.album) else: raise DBQueryException("%s: not supported" % object.__tablename__) result = [] for album in albums: result.append(album) return result else: raise DBQueryException("{}: not supported".format(object)) def get_portfolios(self, object): portfolios = [] if hasattr(object, '__tablename__'): if isinstance(object, User): portfolios = Portfolio.query.filter_by(owner=object) elif isinstance(object, Album): portfolios.append(object.portfolio) elif isinstance(object, Photo): album = self.get_albums(object) portfolios.append(album[0].portfolio) else: raise DBQueryException("%s: not supported" % object.__tablename__) result = [] for portfolio in portfolios: result.append(portfolio) return result else: raise DBQueryException("{}: not supported".format(object)) def get_owner(self, object): if hasattr(object, '__tablename__'): if isinstance(object, Portfolio): return object.owner elif isinstance(object, Album): p = object.portfolio return p.owner elif isinstance(object, Photo): a = object.album p = a.portfolio return p.owner else: raise DBQueryException("%s: not supported" % object.__tablename__) else: raise DBQueryException("{}: not supported".format(object)) def get_users(self, object): if hasattr(object, '__tablename__'): users = [] if isinstance(object, Album): objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) for o in objects: users.append(o.user) elif isinstance(object, Photo): objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) for o in objects: users.append(o.user) elif isinstance(object, Portfolio): objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) for o in objects: users.append(o.user) else: raise DBQueryException("%s: not supported" % object.__tablename__) return users else: raise DBQueryException("{}: not supported".format(object)) def has_access(self, user, object): if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): result = False if isinstance(object, Portfolio): acl = self.get_acl(user, object) if acl is not None: result = True elif isinstance(object, Album): acl = self.get_acl(user, object) if acl is not None: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None: result = True elif isinstance(object, Photo): acl = self.get_acl(user, object) if acl is not None: result = True else: albums = self.get_albums(object) acl = self.get_acl(user, albums[0]) if acl is not None: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None: result = True else: raise DBQueryException("%s: not supported" % object.__tablename__) return result else: raise DBQueryException("{}: not supported".format(object)) def can_read(self, user, object): return self.has_access(user, object) def can_write(self, user, object): if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): result = False if isinstance(object, Portfolio): acl = self.get_acl(user, object) if acl is not None and acl > Role.read: result = True elif isinstance(object, Album): acl = self.get_acl(user, object) if acl is not None and acl > Role.read: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None and acl > Role.read: result = True elif isinstance(object, Photo): acl = self.get_acl(user, object) if acl is not None and acl > Role.read: result = True else: albums = self.get_albums(object) acl = self.get_acl(user, albums[0]) if acl is not None and acl > Role.read: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None and acl > Role.read: result = True else: raise DBQueryException("%s: not supported" % object.__tablename__) return result else: raise DBQueryException("{}: not supported".format(object)) def get_acl(self, user, object): if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): if isinstance(object, Album): if user.is_admin or user == self.get_owner(object): return Role.admin acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) for a in acl: if AccessRight.query.get(a.id).user_id == user.id: return a.right # Check if access given by parent (Portfolio) return self.get_acl(user, object.portfolio) elif isinstance(object, Photo): if user.is_admin or user == self.get_owner(object): return Role.admin acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) for a in acl: if AccessRight.query.get(a.id).user_id == user.id: return a.right # Check if access given by parent (Album) return self.get_acl(user, object.album) elif isinstance(object, Portfolio): if user.is_admin or user == self.get_owner(object): return Role.admin acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) for a in acl: if AccessRight.query.get(a.id).user_id == user.id: return a.right else: raise DBQueryException("%s: not supported" % object.__tablename__) return None else: raise DBQueryException("{}: not supported".format(object)) def get_albums_for_user(self, user, owner = False, hidden = False): if isinstance(user, User): albums = [] # Find all albums with direct access objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join( Album, AlbumAcl.c.album == Album.id).join( AccessRight, AccessRight.id == AlbumAcl.c.acl).join( Portfolio, Portfolio.id == Album.portfolio_id).filter( AccessRight.user == user).all() for object in objects: if not owner and object.Portfolio.owner == user: continue if not hidden and not object.Album.visible: continue albums.append(object.Album) # Find all albums with access through inherited access rights from portfolio temp = [] objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join( Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join( AccessRight, AccessRight.id == PortfolioAcl.c.acl).join( Album, Album.portfolio_id == Portfolio.id).all() for object in objects: if not owner and object.Portfolio.owner == user: continue if not hidden and not object.Album.visible: continue temp.append(object.Album) # Merge albums with temp removing duplicates albums = albums + list(set(temp) - set(albums)) return albums else: raise DBQueryException("%s: not supported" % object.__tablename__)