from .models import User, Portfolio, Album, Photo, AccessRight, Role, MyAnonymous from app import app #from flask_login import mixins class DBQueryException(Exception): pass class DBQuery: def get_albums(self, object): albums = [] if hasattr(object, '__tablename__'): if isinstance(object, User): portfolios = Portfolio.query(user=object.id) for p in portfolios: albums.extend(Album.query.filter_by(portfolio=p)) elif isinstance(object, Portfolio): albums = Album.query(portfolio=object.id) elif isinstance(object, Photo): albums.append(object.album) else: raise DBQueryException("%s: not supported" % object.__tablename__) result = [] for album in albums: result.append(album) return result else: raise DBQueryException("{}: not supported".format(object)) def get_portfolios(self, object): portfolios = [] if hasattr(object, '__tablename__'): if isinstance(object, User): portfolios = Portfolio.query(user=object.id) elif isinstance(object, Album): portfolios.append(object.portfolio) elif isinstance(object, Photo): album = self.get_albums(object) portfolios.append(album[0].portfolio) else: raise DBQueryException("%s: not supported" % object.__tablename__) result = [] for portfolio in portfolios: result.append(portfolio) return result else: raise DBQueryException("{}: not supported".format(object)) def get_owner(self, object): if hasattr(object, '__tablename__'): if isinstance(object, Portfolio): return object.user elif isinstance(object, Album): p = object.portfolio return p.user elif isinstance(object, Photo): a = object.album p = a.portfolio return p.user else: raise DBQueryException("%s: not supported" % object.__tablename__) else: raise DBQueryException("{}: not supported".format(object)) def get_users(self, object): if hasattr(object, '__tablename__'): users = [] if isinstance(object, Album): # TODO: get rit of query.join objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) for o in objects: users.append(o.user) elif isinstance(object, Photo): objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) for o in objects: users.append(o.user) elif isinstance(object, Portfolio): objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) for o in objects: users.append(o.user) else: raise DBQueryException("%s: not supported" % object.__tablename__) return users else: raise DBQueryException("{}: not supported".format(object)) def has_access(self, user, object): #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): if hasattr(object, '__tablename__'): result = False if isinstance(object, Portfolio): acl = self.get_acl(user, object) if acl is not None: result = True elif isinstance(object, Album): acl = self.get_acl(user, object) if acl is not None: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None: result = True elif isinstance(object, Photo): acl = self.get_acl(user, object) if acl is not None: result = True else: albums = self.get_albums(object) acl = self.get_acl(user, albums[0]) if acl is not None: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None: result = True else: raise DBQueryException("%s: not supported" % object.__tablename__) return result else: raise DBQueryException("{}: not supported".format(object)) def can_read(self, user, object): return self.has_access(user, object) def can_write(self, user, object): #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): if hasattr(object, '__tablename__'): result = False if isinstance(object, Portfolio): acl = self.get_acl(user, object) if acl is not None and acl > Role.read: result = True elif isinstance(object, Album): acl = self.get_acl(user, object) if acl is not None and acl > Role.read: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None and acl > Role.read: result = True elif isinstance(object, Photo): acl = self.get_acl(user, object) if acl is not None and acl > Role.read: result = True else: albums = self.get_albums(object) acl = self.get_acl(user, albums[0]) if acl is not None and acl > Role.read: result = True else: portfolio = self.get_portfolios(object) acl = self.get_acl(user, portfolio[0]) if acl is not None and acl > Role.read: result = True else: raise DBQueryException("%s: not supported" % object.__tablename__) return result else: raise DBQueryException("{}: not supported".format(object)) def get_acl(self, user, object): #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): if hasattr(object, '__tablename__'): if isinstance(object, Album): if user.is_admin or user == self.get_owner(object): return Role.admin acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) for a in acl: if AccessRight.query.get(a.id).user_id == user.id: return a.right if object.visible and object.public: return Role.read # Check if access given by parent (Portfolio) return self.get_acl(user, object.portfolio) elif isinstance(object, Photo): if user.is_admin or user == self.get_owner(object): return Role.admin acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) for a in acl: if AccessRight.query.get(a.id).user_id == user.id: return a.right if user.is_anonymous and object.visible and object.public: return Role.read # Check if access given by parent (Album) return self.get_acl(user, object.album) elif isinstance(object, Portfolio): if user.is_admin or user == self.get_owner(object): return Role.admin acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) for a in acl: if AccessRight.query.get(a.id).user_id == user.id: return a.right if user.is_anonymous and object.visible and object.public: return Role.read else: raise DBQueryException("%s: not supported" % object.__tablename__) return None else: raise DBQueryException("{}: not supported".format(object)) def get_albums_for_user(self, user, owner = False, hidden = False): albums = [] if isinstance(user, User): # Find all albums with direct access #objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join( # Album, AlbumAcl.c.album == Album.id).join( # AccessRight, AccessRight.id == AlbumAcl.c.acl).join( # Portfolio, Portfolio.id == Album.portfolio_id).filter( # AccessRight.user == user).all() #for object in objects: #if not owner and object.Portfolio.owner == user: # continue #if not hidden and not object.Album.visible: # continue # albums.append(object.Album) # Find all albums with access through inherited access rights from portfolio #temp = [] #objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join( # Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join( # AccessRight, AccessRight.id == PortfolioAcl.c.acl).join( # Album, Album.portfolio_id == Portfolio.id).all() #for object in objects: # if not owner and object.Portfolio.owner == user: # continue # if not hidden and not object.Album.visible: # continue # temp.append(object.Album) # Last get all visible and public albums u = MyAnonymous() temp = self.get_albums_for_user(u) app.logger.info("1) Other albums: %s" % temp) albs = [] portfolios = self.get_portfolios(user) for p in portfolios: albs = albs + self.get_albums(p) for a in albs: if not owner and self.get_owner(a) == user: continue if not hidden and not a.visible: continue albums.append(a) app.logger.info("2) Own albums: %s" % albums) # Remove own albums temp = list(set(temp) - set(albs)) app.logger.info("3) Other albums: %s" % temp) # Merge albums with temp removing duplicates albums = albums + temp else: try: # Anonymous user if user.is_anonymous: # Find all albums which is public and visible albums = Album.query(public=True, visible=True) except AttributeError: raise DBQueryException("%s: Not a User object" % user) app.logger.info("Anonymous albums: %s" % albums) return albums def is_safe_url(target): # TODO: Implement test to deside whether url is safe or not return True def dump(obj): s = '' if isinstance(obj, list): for o in obj: s += dump(o) + "\n" else: items = dir(obj) for item in items: s += '{0}'.format(item) + "\n" return s