]> git.datanom.net - pwp.git/blame - app/tools.py
Half way through migration away from sqlalchemy
[pwp.git] / app / tools.py
CommitLineData
e5424f29 1from .models import User, Portfolio, Album, Photo, AccessRight, Role, MyAnonymous
fc01a3eb 2from app import app
e5424f29
MR
3#from flask_login import mixins
4
5class DBQueryException(Exception):
6 pass
7
8class DBQuery:
9
10 def get_albums(self, object):
11 albums = []
12 if hasattr(object, '__tablename__'):
13 if isinstance(object, User):
fc01a3eb 14 portfolios = Portfolio.query(user=object.id)
e5424f29
MR
15 for p in portfolios:
16 albums.extend(Album.query.filter_by(portfolio=p))
17 elif isinstance(object, Portfolio):
fc01a3eb 18 albums = Album.query(portfolio=object.id)
e5424f29
MR
19 elif isinstance(object, Photo):
20 albums.append(object.album)
21 else:
22 raise DBQueryException("%s: not supported" % object.__tablename__)
23 result = []
24 for album in albums:
25 result.append(album)
26 return result
27 else:
28 raise DBQueryException("{}: not supported".format(object))
29
30 def get_portfolios(self, object):
31 portfolios = []
32 if hasattr(object, '__tablename__'):
33 if isinstance(object, User):
fc01a3eb 34 portfolios = Portfolio.query(user=object.id)
e5424f29
MR
35 elif isinstance(object, Album):
36 portfolios.append(object.portfolio)
37 elif isinstance(object, Photo):
38 album = self.get_albums(object)
39 portfolios.append(album[0].portfolio)
40 else:
41 raise DBQueryException("%s: not supported" % object.__tablename__)
42 result = []
43 for portfolio in portfolios:
44 result.append(portfolio)
45 return result
46 else:
47 raise DBQueryException("{}: not supported".format(object))
48
49 def get_owner(self, object):
50 if hasattr(object, '__tablename__'):
51 if isinstance(object, Portfolio):
fc01a3eb 52 return object.user
e5424f29
MR
53 elif isinstance(object, Album):
54 p = object.portfolio
fc01a3eb 55 return p.user
e5424f29
MR
56 elif isinstance(object, Photo):
57 a = object.album
58 p = a.portfolio
fc01a3eb 59 return p.user
e5424f29
MR
60 else:
61 raise DBQueryException("%s: not supported" % object.__tablename__)
62 else:
63 raise DBQueryException("{}: not supported".format(object))
64
65 def get_users(self, object):
66 if hasattr(object, '__tablename__'):
67 users = []
68 if isinstance(object, Album):
fc01a3eb 69 # TODO: get rit of query.join
e5424f29
MR
70 objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
71 for o in objects:
72 users.append(o.user)
73 elif isinstance(object, Photo):
74 objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
75 for o in objects:
76 users.append(o.user)
77 elif isinstance(object, Portfolio):
78 objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
79 for o in objects:
80 users.append(o.user)
81 else:
82 raise DBQueryException("%s: not supported" % object.__tablename__)
83 return users
84 else:
85 raise DBQueryException("{}: not supported".format(object))
86
87 def has_access(self, user, object):
88 #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
89 if hasattr(object, '__tablename__'):
90 result = False
91 if isinstance(object, Portfolio):
92 acl = self.get_acl(user, object)
93 if acl is not None:
94 result = True
95 elif isinstance(object, Album):
96 acl = self.get_acl(user, object)
97 if acl is not None:
98 result = True
99 else:
100 portfolio = self.get_portfolios(object)
101 acl = self.get_acl(user, portfolio[0])
102 if acl is not None:
103 result = True
104 elif isinstance(object, Photo):
105 acl = self.get_acl(user, object)
106 if acl is not None:
107 result = True
108 else:
109 albums = self.get_albums(object)
110 acl = self.get_acl(user, albums[0])
111 if acl is not None:
112 result = True
113 else:
114 portfolio = self.get_portfolios(object)
115 acl = self.get_acl(user, portfolio[0])
116 if acl is not None:
117 result = True
118 else:
119 raise DBQueryException("%s: not supported" % object.__tablename__)
120 return result
121 else:
122 raise DBQueryException("{}: not supported".format(object))
123
124 def can_read(self, user, object):
125 return self.has_access(user, object)
126
127 def can_write(self, user, object):
128 #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
129 if hasattr(object, '__tablename__'):
130 result = False
131 if isinstance(object, Portfolio):
132 acl = self.get_acl(user, object)
133 if acl is not None and acl > Role.read:
134 result = True
135 elif isinstance(object, Album):
136 acl = self.get_acl(user, object)
137 if acl is not None and acl > Role.read:
138 result = True
139 else:
140 portfolio = self.get_portfolios(object)
141 acl = self.get_acl(user, portfolio[0])
142 if acl is not None and acl > Role.read:
143 result = True
144 elif isinstance(object, Photo):
145 acl = self.get_acl(user, object)
146 if acl is not None and acl > Role.read:
147 result = True
148 else:
149 albums = self.get_albums(object)
150 acl = self.get_acl(user, albums[0])
151 if acl is not None and acl > Role.read:
152 result = True
153 else:
154 portfolio = self.get_portfolios(object)
155 acl = self.get_acl(user, portfolio[0])
156 if acl is not None and acl > Role.read:
157 result = True
158 else:
159 raise DBQueryException("%s: not supported" % object.__tablename__)
160 return result
161 else:
162 raise DBQueryException("{}: not supported".format(object))
163
164 def get_acl(self, user, object):
165 #if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
166 if hasattr(object, '__tablename__'):
167 if isinstance(object, Album):
168 if user.is_admin or user == self.get_owner(object):
169 return Role.admin
170 acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
171 for a in acl:
172 if AccessRight.query.get(a.id).user_id == user.id:
173 return a.right
174 if object.visible and object.public:
175 return Role.read
176 # Check if access given by parent (Portfolio)
177 return self.get_acl(user, object.portfolio)
178 elif isinstance(object, Photo):
179 if user.is_admin or user == self.get_owner(object):
180 return Role.admin
181 acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
182 for a in acl:
183 if AccessRight.query.get(a.id).user_id == user.id:
184 return a.right
185 if user.is_anonymous and object.visible and object.public:
186 return Role.read
187 # Check if access given by parent (Album)
188 return self.get_acl(user, object.album)
189 elif isinstance(object, Portfolio):
190 if user.is_admin or user == self.get_owner(object):
191 return Role.admin
192 acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
193 for a in acl:
194 if AccessRight.query.get(a.id).user_id == user.id:
195 return a.right
196 if user.is_anonymous and object.visible and object.public:
197 return Role.read
198 else:
199 raise DBQueryException("%s: not supported" % object.__tablename__)
200 return None
201 else:
202 raise DBQueryException("{}: not supported".format(object))
203
204 def get_albums_for_user(self, user, owner = False, hidden = False):
205 albums = []
206 if isinstance(user, User):
207 # Find all albums with direct access
208 #objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join(
209 # Album, AlbumAcl.c.album == Album.id).join(
210 # AccessRight, AccessRight.id == AlbumAcl.c.acl).join(
211 # Portfolio, Portfolio.id == Album.portfolio_id).filter(
212 # AccessRight.user == user).all()
213 #for object in objects:
214 #if not owner and object.Portfolio.owner == user:
215 # continue
216 #if not hidden and not object.Album.visible:
217 # continue
218 # albums.append(object.Album)
219 # Find all albums with access through inherited access rights from portfolio
220 #temp = []
221 #objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join(
222 # Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join(
223 # AccessRight, AccessRight.id == PortfolioAcl.c.acl).join(
224 # Album, Album.portfolio_id == Portfolio.id).all()
225 #for object in objects:
226 # if not owner and object.Portfolio.owner == user:
227 # continue
228 # if not hidden and not object.Album.visible:
229 # continue
230 # temp.append(object.Album)
231 # Last get all visible and public albums
232 u = MyAnonymous()
233 temp = self.get_albums_for_user(u)
fc01a3eb 234 app.logger.info("1) Other albums: %s" % temp)
e5424f29
MR
235 albs = []
236 portfolios = self.get_portfolios(user)
237 for p in portfolios:
238 albs = albs + self.get_albums(p)
239 for a in albs:
240 if not owner and self.get_owner(a) == user:
241 continue
242 if not hidden and not a.visible:
243 continue
244 albums.append(a)
fc01a3eb 245 app.logger.info("2) Own albums: %s" % albums)
e5424f29
MR
246 # Remove own albums
247 temp = list(set(temp) - set(albs))
fc01a3eb 248 app.logger.info("3) Other albums: %s" % temp)
e5424f29
MR
249 # Merge albums with temp removing duplicates
250 albums = albums + temp
251 else:
252 try:
253 # Anonymous user
254 if user.is_anonymous:
255 # Find all albums which is public and visible
fc01a3eb 256 albums = Album.query(public=True, visible=True)
e5424f29
MR
257 except AttributeError:
258 raise DBQueryException("%s: Not a User object" % user)
fc01a3eb 259 app.logger.info("Anonymous albums: %s" % albums)
e5424f29
MR
260 return albums
261
262def is_safe_url(target):
263 # TODO: Implement test to deside whether url is safe or not
264 return True
265
266def dump(obj):
267 s = ''
268 if isinstance(obj, list):
269 for o in obj:
270 s += dump(o) + "\n"
271 else:
272 items = dir(obj)
273 for item in items:
274 s += '{0}'.format(item) + "\n"
275
276 return s
This page took 0.062932 seconds and 5 git commands to generate.