]>
git.datanom.net - wpp.git/blob - app/tools.py
e10bbc05ce8159c0ffd6b922669cf9f5b81d1382
2 from .models
import User
, Portfolio
, Album
, Photo
, AccessRight
, Role
, AlbumAcl
, PortfolioAcl
4 class DBQueryException(Exception):
9 def get_albums(self
, object):
11 if hasattr(object, '__tablename__'):
12 if isinstance(object, User
):
13 portfolios
= Portfolio
.query
.filter_by(owner
=object)
15 albums
.extend(Album
.query
.filter_by(portfolio
=p
))
16 elif isinstance(object, Portfolio
):
17 albums
= Album
.query
.filter_by(portfolio
=object)
18 elif isinstance(object, Photo
):
19 albums
.append(object.album
)
21 raise DBQueryException("%s: not supported" % object.__tablename
__)
27 raise DBQueryException("{}: not supported".format(object))
29 def get_portfolios(self
, object):
31 if hasattr(object, '__tablename__'):
32 if isinstance(object, User
):
33 portfolios
= Portfolio
.query
.filter_by(owner
=object)
34 elif isinstance(object, Album
):
35 portfolios
.append(object.portfolio
)
36 elif isinstance(object, Photo
):
37 album
= self
.get_albums(object)
38 portfolios
.append(album
[0].portfolio
)
40 raise DBQueryException("%s: not supported" % object.__tablename
__)
42 for portfolio
in portfolios
:
43 result
.append(portfolio
)
46 raise DBQueryException("{}: not supported".format(object))
48 def get_owner(self
, object):
49 if hasattr(object, '__tablename__'):
50 if isinstance(object, Portfolio
):
52 elif isinstance(object, Album
):
55 elif isinstance(object, Photo
):
60 raise DBQueryException("%s: not supported" % object.__tablename
__)
62 raise DBQueryException("{}: not supported".format(object))
64 def get_users(self
, object):
65 if hasattr(object, '__tablename__'):
67 if isinstance(object, Album
):
68 objects
= AccessRight
.query
.join(AccessRight
.album
).filter(Album
.id == object.id)
71 elif isinstance(object, Photo
):
72 objects
= AccessRight
.query
.join(AccessRight
.photo
).filter(Photo
.id == object.id)
75 elif isinstance(object, Portfolio
):
76 objects
= AccessRight
.query
.join(AccessRight
.portfolio
).filter(Portfolio
.id == object.id)
80 raise DBQueryException("%s: not supported" % object.__tablename
__)
83 raise DBQueryException("{}: not supported".format(object))
85 def has_access(self
, user
, object):
86 if hasattr(user
, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user
, User
):
88 if isinstance(object, Portfolio
):
89 acl
= self
.get_acl(user
, object)
92 elif isinstance(object, Album
):
93 acl
= self
.get_acl(user
, object)
97 portfolio
= self
.get_portfolios(object)
98 acl
= self
.get_acl(user
, portfolio
[0])
101 elif isinstance(object, Photo
):
102 acl
= self
.get_acl(user
, object)
106 albums
= self
.get_albums(object)
107 acl
= self
.get_acl(user
, albums
[0])
111 portfolio
= self
.get_portfolios(object)
112 acl
= self
.get_acl(user
, portfolio
[0])
116 raise DBQueryException("%s: not supported" % object.__tablename
__)
119 raise DBQueryException("{}: not supported".format(object))
121 def can_read(self
, user
, object):
122 return self
.has_access(user
, object)
124 def can_write(self
, user
, object):
125 if hasattr(user
, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user
, User
):
127 if isinstance(object, Portfolio
):
128 acl
= self
.get_acl(user
, object)
129 if acl
is not None and acl
> Role
.read
:
131 elif isinstance(object, Album
):
132 acl
= self
.get_acl(user
, object)
133 if acl
is not None and acl
> Role
.read
:
136 portfolio
= self
.get_portfolios(object)
137 acl
= self
.get_acl(user
, portfolio
[0])
138 if acl
is not None and acl
> Role
.read
:
140 elif isinstance(object, Photo
):
141 acl
= self
.get_acl(user
, object)
142 if acl
is not None and acl
> Role
.read
:
145 albums
= self
.get_albums(object)
146 acl
= self
.get_acl(user
, albums
[0])
147 if acl
is not None and acl
> Role
.read
:
150 portfolio
= self
.get_portfolios(object)
151 acl
= self
.get_acl(user
, portfolio
[0])
152 if acl
is not None and acl
> Role
.read
:
155 raise DBQueryException("%s: not supported" % object.__tablename
__)
158 raise DBQueryException("{}: not supported".format(object))
160 def get_acl(self
, user
, object):
161 if hasattr(user
, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user
, User
):
162 if isinstance(object, Album
):
163 if user
.is_admin
or user
== self
.get_owner(object):
165 acl
= AccessRight
.query
.join(AccessRight
.album
).filter(Album
.id == object.id)
167 if AccessRight
.query
.get(a
.id).user_id
== user
.id:
169 # Check if access given by parent (Portfolio)
170 return self
.get_acl(user
, object.portfolio
)
171 elif isinstance(object, Photo
):
172 if user
.is_admin
or user
== self
.get_owner(object):
174 acl
= AccessRight
.query
.join(AccessRight
.photo
).filter(Photo
.id == object.id)
176 if AccessRight
.query
.get(a
.id).user_id
== user
.id:
178 # Check if access given by parent (Album)
179 return self
.get_acl(user
, object.album
)
180 elif isinstance(object, Portfolio
):
181 if user
.is_admin
or user
== self
.get_owner(object):
183 acl
= AccessRight
.query
.join(AccessRight
.portfolio
).filter(Portfolio
.id == object.id)
185 if AccessRight
.query
.get(a
.id).user_id
== user
.id:
188 raise DBQueryException("%s: not supported" % object.__tablename
__)
191 raise DBQueryException("{}: not supported".format(object))
193 def get_albums_for_user(self
, user
, owner
= False, hidden
= False):
194 if isinstance(user
, User
):
196 # Find all albums with direct access
197 objects
= db
.session
.query(AlbumAcl
, Album
, AccessRight
, Portfolio
).join(
198 Album
, AlbumAcl
.c
.album
== Album
.id).join(
199 AccessRight
, AccessRight
.id == AlbumAcl
.c
.acl
).join(
200 Portfolio
, Portfolio
.id == Album
.portfolio_id
).filter(
201 AccessRight
.user
== user
).all()
202 for object in objects
:
203 if not owner
and object.Portfolio
.owner
== user
:
205 if not hidden
and not object.Album
.visible
:
207 albums
.append(object.Album
)
208 # Find all albums with access through inherited access rights from portfolio
210 objects
= db
.session
.query(PortfolioAcl
, Album
, AccessRight
, Portfolio
).join(
211 Portfolio
, PortfolioAcl
.c
.portfolio
== Portfolio
.id).join(
212 AccessRight
, AccessRight
.id == PortfolioAcl
.c
.acl
).join(
213 Album
, Album
.portfolio_id
== Portfolio
.id).all()
214 for object in objects
:
215 if not owner
and object.Portfolio
.owner
== user
:
217 if not hidden
and not object.Album
.visible
:
219 temp
.append(object.Album
)
220 # Merge albums with temp removing duplicates
221 albums
= albums
+ list(set(temp
) - set(albums
))
224 raise DBQueryException("%s: not supported" % object.__tablename
__)
This page took 0.125704 seconds and 6 git commands to generate.