]> git.datanom.net - pwp.git/blame - app/tools.py
initial commit
[pwp.git] / app / tools.py
CommitLineData
e5424f29
MR
1from .models import User, Portfolio, Album, Photo, AccessRight, Role, MyAnonymous
2#from flask_login import mixins
3
4class DBQueryException(Exception):
5 pass
6
7class DBQuery:
8
9 def get_albums(self, object):
10 albums = []
11 if hasattr(object, '__tablename__'):
12 if isinstance(object, User):
13 portfolios = Portfolio.query.filter_by(owner=object)
14 for p in portfolios:
15 albums.extend(Album.query.filter_by(portfolio=p))
16 elif isinstance(object, Portfolio):
17 albums = Album.query.filter_by(portfolio=object)
18 elif isinstance(object, Photo):
19 albums.append(object.album)
20 else:
21 raise DBQueryException("%s: not supported" % object.__tablename__)
22 result = []
23 for album in albums:
24 result.append(album)
25 return result
26 else:
27 raise DBQueryException("{}: not supported".format(object))
28
29 def get_portfolios(self, object):
30 portfolios = []
31 if hasattr(object, '__tablename__'):
32 if isinstance(object, User):
33 portfolios = Portfolio.query.filter_by(owner=object)
34 elif isinstance(object, Album):
35 portfolios.append(object.portfolio)
36 elif isinstance(object, Photo):
37 album = self.get_albums(object)
38 portfolios.append(album[0].portfolio)
39 else:
40 raise DBQueryException("%s: not supported" % object.__tablename__)
41 result = []
42 for portfolio in portfolios:
43 result.append(portfolio)
44 return result
45 else:
46 raise DBQueryException("{}: not supported".format(object))
47
48 def get_owner(self, object):
49 if hasattr(object, '__tablename__'):
50 if isinstance(object, Portfolio):
51 return object.owner
52 elif isinstance(object, Album):
53 p = object.portfolio
54 return p.owner
55 elif isinstance(object, Photo):
56 a = object.album
57 p = a.portfolio
58 return p.owner
59 else:
60 raise DBQueryException("%s: not supported" % object.__tablename__)
61 else:
62 raise DBQueryException("{}: not supported".format(object))
63
64 def get_users(self, object):
65 if hasattr(object, '__tablename__'):
66 users = []
67 if isinstance(object, Album):
68 objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
69 for o in objects:
70 users.append(o.user)
71 elif isinstance(object, Photo):
72 objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
73 for o in objects:
74 users.append(o.user)
75 elif isinstance(object, Portfolio):
76 objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
77 for o in objects:
78 users.append(o.user)
79 else:
80 raise DBQueryException("%s: not supported" % object.__tablename__)
81 return users
82 else:
83 raise DBQueryException("{}: not supported".format(object))
84
85 def has_access(self, user, object):
86 #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
87 if hasattr(object, '__tablename__'):
88 result = False
89 if isinstance(object, Portfolio):
90 acl = self.get_acl(user, object)
91 if acl is not None:
92 result = True
93 elif isinstance(object, Album):
94 acl = self.get_acl(user, object)
95 if acl is not None:
96 result = True
97 else:
98 portfolio = self.get_portfolios(object)
99 acl = self.get_acl(user, portfolio[0])
100 if acl is not None:
101 result = True
102 elif isinstance(object, Photo):
103 acl = self.get_acl(user, object)
104 if acl is not None:
105 result = True
106 else:
107 albums = self.get_albums(object)
108 acl = self.get_acl(user, albums[0])
109 if acl is not None:
110 result = True
111 else:
112 portfolio = self.get_portfolios(object)
113 acl = self.get_acl(user, portfolio[0])
114 if acl is not None:
115 result = True
116 else:
117 raise DBQueryException("%s: not supported" % object.__tablename__)
118 return result
119 else:
120 raise DBQueryException("{}: not supported".format(object))
121
122 def can_read(self, user, object):
123 return self.has_access(user, object)
124
125 def can_write(self, user, object):
126 #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
127 if hasattr(object, '__tablename__'):
128 result = False
129 if isinstance(object, Portfolio):
130 acl = self.get_acl(user, object)
131 if acl is not None and acl > Role.read:
132 result = True
133 elif isinstance(object, Album):
134 acl = self.get_acl(user, object)
135 if acl is not None and acl > Role.read:
136 result = True
137 else:
138 portfolio = self.get_portfolios(object)
139 acl = self.get_acl(user, portfolio[0])
140 if acl is not None and acl > Role.read:
141 result = True
142 elif isinstance(object, Photo):
143 acl = self.get_acl(user, object)
144 if acl is not None and acl > Role.read:
145 result = True
146 else:
147 albums = self.get_albums(object)
148 acl = self.get_acl(user, albums[0])
149 if acl is not None and acl > Role.read:
150 result = True
151 else:
152 portfolio = self.get_portfolios(object)
153 acl = self.get_acl(user, portfolio[0])
154 if acl is not None and acl > Role.read:
155 result = True
156 else:
157 raise DBQueryException("%s: not supported" % object.__tablename__)
158 return result
159 else:
160 raise DBQueryException("{}: not supported".format(object))
161
162 def get_acl(self, user, object):
163 #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
164 if hasattr(object, '__tablename__'):
165 if isinstance(object, Album):
166 if user.is_admin or user == self.get_owner(object):
167 return Role.admin
168 acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
169 for a in acl:
170 if AccessRight.query.get(a.id).user_id == user.id:
171 return a.right
172 if object.visible and object.public:
173 return Role.read
174 # Check if access given by parent (Portfolio)
175 return self.get_acl(user, object.portfolio)
176 elif isinstance(object, Photo):
177 if user.is_admin or user == self.get_owner(object):
178 return Role.admin
179 acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
180 for a in acl:
181 if AccessRight.query.get(a.id).user_id == user.id:
182 return a.right
183 if user.is_anonymous and object.visible and object.public:
184 return Role.read
185 # Check if access given by parent (Album)
186 return self.get_acl(user, object.album)
187 elif isinstance(object, Portfolio):
188 if user.is_admin or user == self.get_owner(object):
189 return Role.admin
190 acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
191 for a in acl:
192 if AccessRight.query.get(a.id).user_id == user.id:
193 return a.right
194 if user.is_anonymous and object.visible and object.public:
195 return Role.read
196 else:
197 raise DBQueryException("%s: not supported" % object.__tablename__)
198 return None
199 else:
200 raise DBQueryException("{}: not supported".format(object))
201
202 def get_albums_for_user(self, user, owner = False, hidden = False):
203 albums = []
204 if isinstance(user, User):
205 # Find all albums with direct access
206 #objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join(
207 # Album, AlbumAcl.c.album == Album.id).join(
208 # AccessRight, AccessRight.id == AlbumAcl.c.acl).join(
209 # Portfolio, Portfolio.id == Album.portfolio_id).filter(
210 # AccessRight.user == user).all()
211 #for object in objects:
212 #if not owner and object.Portfolio.owner == user:
213 # continue
214 #if not hidden and not object.Album.visible:
215 # continue
216 # albums.append(object.Album)
217 # Find all albums with access through inherited access rights from portfolio
218 #temp = []
219 #objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join(
220 # Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join(
221 # AccessRight, AccessRight.id == PortfolioAcl.c.acl).join(
222 # Album, Album.portfolio_id == Portfolio.id).all()
223 #for object in objects:
224 # if not owner and object.Portfolio.owner == user:
225 # continue
226 # if not hidden and not object.Album.visible:
227 # continue
228 # temp.append(object.Album)
229 # Last get all visible and public albums
230 u = MyAnonymous()
231 temp = self.get_albums_for_user(u)
232 print("1) Other albums: %s" % temp)
233 albs = []
234 portfolios = self.get_portfolios(user)
235 for p in portfolios:
236 albs = albs + self.get_albums(p)
237 for a in albs:
238 if not owner and self.get_owner(a) == user:
239 continue
240 if not hidden and not a.visible:
241 continue
242 albums.append(a)
243 print("2) Own albums: %s" % albums)
244 # Remove own albums
245 temp = list(set(temp) - set(albs))
246 print("3) Other albums: %s" % temp)
247 # Merge albums with temp removing duplicates
248 albums = albums + temp
249 else:
250 try:
251 # Anonymous user
252 if user.is_anonymous:
253 # Find all albums which is public and visible
254 albums = Album.query.filter_by(public=True,visible=True).all()
255 except AttributeError:
256 raise DBQueryException("%s: Not a User object" % user)
257 print("Anonymous albums: %s" % albums)
258 return albums
259
260def is_safe_url(target):
261 # TODO: Implement test to deside whether url is safe or not
262 return True
263
264def dump(obj):
265 s = ''
266 if isinstance(obj, list):
267 for o in obj:
268 s += dump(o) + "\n"
269 else:
270 items = dir(obj)
271 for item in items:
272 s += '{0}'.format(item) + "\n"
273
274 return s
This page took 0.060302 seconds and 5 git commands to generate.