]> git.datanom.net - wpp.git/blob - app/tools.py
test access
[wpp.git] / app / tools.py
1 from app import db
2 from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl
3
4 class DBQueryException(Exception):
5 pass
6
7 class DBQuery:
8
9 def get_albums(self, object):
10 albums = []
11 if hasattr(object, '__tablename__'):
12 if isinstance(object, User):
13 portfolios = Portfolio.query.filter_by(owner=object)
14 for p in portfolios:
15 albums.extend(Album.query.filter_by(portfolio=p))
16 elif isinstance(object, Portfolio):
17 albums = Album.query.filter_by(portfolio=object)
18 elif isinstance(object, Photo):
19 albums.append(object.album)
20 else:
21 raise DBQueryException("%s: not supported" % object.__tablename__)
22 result = []
23 for album in albums:
24 result.append(album)
25 return result
26 else:
27 raise DBQueryException("{}: not supported".format(object))
28
29 def get_portfolios(self, object):
30 portfolios = []
31 if hasattr(object, '__tablename__'):
32 if isinstance(object, User):
33 portfolios = Portfolio.query.filter_by(owner=object)
34 elif isinstance(object, Album):
35 portfolios.append(object.portfolio)
36 elif isinstance(object, Photo):
37 album = self.get_albums(object)
38 portfolios.append(album[0].portfolio)
39 else:
40 raise DBQueryException("%s: not supported" % object.__tablename__)
41 result = []
42 for portfolio in portfolios:
43 result.append(portfolio)
44 return result
45 else:
46 raise DBQueryException("{}: not supported".format(object))
47
48 def get_owner(self, object):
49 if hasattr(object, '__tablename__'):
50 if isinstance(object, Portfolio):
51 return object.owner
52 elif isinstance(object, Album):
53 p = object.portfolio
54 return p.owner
55 elif isinstance(object, Photo):
56 a = object.album
57 p = a.portfolio
58 return p.owner
59 else:
60 raise DBQueryException("%s: not supported" % object.__tablename__)
61 else:
62 raise DBQueryException("{}: not supported".format(object))
63
64 def get_users(self, object):
65 if hasattr(object, '__tablename__'):
66 users = []
67 if isinstance(object, Album):
68 objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
69 for o in objects:
70 users.append(o.user)
71 elif isinstance(object, Photo):
72 objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
73 for o in objects:
74 users.append(o.user)
75 elif isinstance(object, Portfolio):
76 objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
77 for o in objects:
78 users.append(o.user)
79 else:
80 raise DBQueryException("%s: not supported" % object.__tablename__)
81 return users
82 else:
83 raise DBQueryException("{}: not supported".format(object))
84
85 def has_access(self, user, object):
86 if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
87 result = False
88 if isinstance(object, Portfolio):
89 acl = self.get_acl(user, object)
90 if acl is not None:
91 result = True
92 elif isinstance(object, Album):
93 acl = self.get_acl(user, object)
94 if acl is not None:
95 result = True
96 else:
97 portfolio = self.get_portfolios(object)
98 acl = self.get_acl(user, portfolio[0])
99 if acl is not None:
100 result = True
101 elif isinstance(object, Photo):
102 acl = self.get_acl(user, object)
103 if acl is not None:
104 result = True
105 else:
106 albums = self.get_albums(object)
107 acl = self.get_acl(user, albums[0])
108 if acl is not None:
109 result = True
110 else:
111 portfolio = self.get_portfolios(object)
112 acl = self.get_acl(user, portfolio[0])
113 if acl is not None:
114 result = True
115 else:
116 raise DBQueryException("%s: not supported" % object.__tablename__)
117 return result
118 else:
119 raise DBQueryException("{}: not supported".format(object))
120
121 def can_read(self, user, object):
122 return self.has_access(user, object)
123
124 def can_write(self, user, object):
125 if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
126 result = False
127 if isinstance(object, Portfolio):
128 acl = self.get_acl(user, object)
129 if acl is not None and acl > Role.read:
130 result = True
131 elif isinstance(object, Album):
132 acl = self.get_acl(user, object)
133 if acl is not None and acl > Role.read:
134 result = True
135 else:
136 portfolio = self.get_portfolios(object)
137 acl = self.get_acl(user, portfolio[0])
138 if acl is not None and acl > Role.read:
139 result = True
140 elif isinstance(object, Photo):
141 acl = self.get_acl(user, object)
142 if acl is not None and acl > Role.read:
143 result = True
144 else:
145 albums = self.get_albums(object)
146 acl = self.get_acl(user, albums[0])
147 if acl is not None and acl > Role.read:
148 result = True
149 else:
150 portfolio = self.get_portfolios(object)
151 acl = self.get_acl(user, portfolio[0])
152 if acl is not None and acl > Role.read:
153 result = True
154 else:
155 raise DBQueryException("%s: not supported" % object.__tablename__)
156 return result
157 else:
158 raise DBQueryException("{}: not supported".format(object))
159
160 def get_acl(self, user, object):
161 if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User):
162 if isinstance(object, Album):
163 if user.is_admin or user == self.get_owner(object):
164 return Role.admin
165 acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id)
166 for a in acl:
167 if AccessRight.query.get(a.id).user_id == user.id:
168 return a.right
169 # Check if access given by parent (Portfolio)
170 return self.get_acl(user, object.portfolio)
171 elif isinstance(object, Photo):
172 if user.is_admin or user == self.get_owner(object):
173 return Role.admin
174 acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id)
175 for a in acl:
176 if AccessRight.query.get(a.id).user_id == user.id:
177 return a.right
178 # Check if access given by parent (Album)
179 return self.get_acl(user, object.album)
180 elif isinstance(object, Portfolio):
181 if user.is_admin or user == self.get_owner(object):
182 return Role.admin
183 acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id)
184 for a in acl:
185 if AccessRight.query.get(a.id).user_id == user.id:
186 return a.right
187 else:
188 raise DBQueryException("%s: not supported" % object.__tablename__)
189 return None
190 else:
191 raise DBQueryException("{}: not supported".format(object))
192
193 def get_albums_for_user(self, user, owner = False, hidden = False):
194 if isinstance(user, User):
195 albums = []
196 # Find all albums with direct access
197 objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join(
198 Album, AlbumAcl.c.album == Album.id).join(
199 AccessRight, AccessRight.id == AlbumAcl.c.acl).join(
200 Portfolio, Portfolio.id == Album.portfolio_id).filter(
201 AccessRight.user == user).all()
202 for object in objects:
203 if not owner and object.Portfolio.owner == user:
204 continue
205 if not hidden and not object.Album.visible:
206 continue
207 albums.append(object.Album)
208 # Find all albums with access through inherited access rights from portfolio
209 temp = []
210 objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join(
211 Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join(
212 AccessRight, AccessRight.id == PortfolioAcl.c.acl).join(
213 Album, Album.portfolio_id == Portfolio.id).all()
214 for object in objects:
215 if not owner and object.Portfolio.owner == user:
216 continue
217 if not hidden and not object.Album.visible:
218 continue
219 temp.append(object.Album)
220 # Merge albums with temp removing duplicates
221 albums = albums + list(set(temp) - set(albums))
222 return albums
223 else:
224 raise DBQueryException("%s: not supported" % object.__tablename__)
This page took 0.104685 seconds and 6 git commands to generate.