]>
Commit | Line | Data |
---|---|---|
1 | from app import db | |
2 | from .models import User, Portfolio, Album, Photo, AccessRight, Role, AlbumAcl, PortfolioAcl | |
3 | ||
4 | class DBQueryException(Exception): | |
5 | pass | |
6 | ||
7 | class DBQuery: | |
8 | ||
9 | def get_albums(self, object): | |
10 | albums = [] | |
11 | if hasattr(object, '__tablename__'): | |
12 | if isinstance(object, User): | |
13 | portfolios = Portfolio.query.filter_by(owner=object) | |
14 | for p in portfolios: | |
15 | albums.extend(Album.query.filter_by(portfolio=p)) | |
16 | elif isinstance(object, Portfolio): | |
17 | albums = Album.query.filter_by(portfolio=object) | |
18 | elif isinstance(object, Photo): | |
19 | albums.append(object.album) | |
20 | else: | |
21 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
22 | result = [] | |
23 | for album in albums: | |
24 | result.append(album) | |
25 | return result | |
26 | else: | |
27 | raise DBQueryException("{}: not supported".format(object)) | |
28 | ||
29 | def get_portfolios(self, object): | |
30 | portfolios = [] | |
31 | if hasattr(object, '__tablename__'): | |
32 | if isinstance(object, User): | |
33 | portfolios = Portfolio.query.filter_by(owner=object) | |
34 | elif isinstance(object, Album): | |
35 | portfolios.append(object.portfolio) | |
36 | elif isinstance(object, Photo): | |
37 | album = self.get_albums(object) | |
38 | portfolios.append(album[0].portfolio) | |
39 | else: | |
40 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
41 | result = [] | |
42 | for portfolio in portfolios: | |
43 | result.append(portfolio) | |
44 | return result | |
45 | else: | |
46 | raise DBQueryException("{}: not supported".format(object)) | |
47 | ||
48 | def get_owner(self, object): | |
49 | if hasattr(object, '__tablename__'): | |
50 | if isinstance(object, Portfolio): | |
51 | return object.owner | |
52 | elif isinstance(object, Album): | |
53 | p = object.portfolio | |
54 | return p.owner | |
55 | elif isinstance(object, Photo): | |
56 | a = object.album | |
57 | p = a.portfolio | |
58 | return p.owner | |
59 | else: | |
60 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
61 | else: | |
62 | raise DBQueryException("{}: not supported".format(object)) | |
63 | ||
64 | def get_users(self, object): | |
65 | if hasattr(object, '__tablename__'): | |
66 | users = [] | |
67 | if isinstance(object, Album): | |
68 | objects = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) | |
69 | for o in objects: | |
70 | users.append(o.user) | |
71 | elif isinstance(object, Photo): | |
72 | objects = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) | |
73 | for o in objects: | |
74 | users.append(o.user) | |
75 | elif isinstance(object, Portfolio): | |
76 | objects = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) | |
77 | for o in objects: | |
78 | users.append(o.user) | |
79 | else: | |
80 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
81 | return users | |
82 | else: | |
83 | raise DBQueryException("{}: not supported".format(object)) | |
84 | ||
85 | def has_access(self, user, object): | |
86 | if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): | |
87 | result = False | |
88 | if isinstance(object, Portfolio): | |
89 | acl = self.get_acl(user, object) | |
90 | if acl is not None: | |
91 | result = True | |
92 | elif isinstance(object, Album): | |
93 | acl = self.get_acl(user, object) | |
94 | if acl is not None: | |
95 | result = True | |
96 | else: | |
97 | portfolio = self.get_portfolios(object) | |
98 | acl = self.get_acl(user, portfolio[0]) | |
99 | if acl is not None: | |
100 | result = True | |
101 | elif isinstance(object, Photo): | |
102 | acl = self.get_acl(user, object) | |
103 | if acl is not None: | |
104 | result = True | |
105 | else: | |
106 | albums = self.get_albums(object) | |
107 | acl = self.get_acl(user, albums[0]) | |
108 | if acl is not None: | |
109 | result = True | |
110 | else: | |
111 | portfolio = self.get_portfolios(object) | |
112 | acl = self.get_acl(user, portfolio[0]) | |
113 | if acl is not None: | |
114 | result = True | |
115 | else: | |
116 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
117 | return result | |
118 | else: | |
119 | raise DBQueryException("{}: not supported".format(object)) | |
120 | ||
121 | def can_read(self, user, object): | |
122 | return self.has_access(user, object) | |
123 | ||
124 | def can_write(self, user, object): | |
125 | if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): | |
126 | result = False | |
127 | if isinstance(object, Portfolio): | |
128 | acl = self.get_acl(user, object) | |
129 | if acl is not None and acl > Role.read: | |
130 | result = True | |
131 | elif isinstance(object, Album): | |
132 | acl = self.get_acl(user, object) | |
133 | if acl is not None and acl > Role.read: | |
134 | result = True | |
135 | else: | |
136 | portfolio = self.get_portfolios(object) | |
137 | acl = self.get_acl(user, portfolio[0]) | |
138 | if acl is not None and acl > Role.read: | |
139 | result = True | |
140 | elif isinstance(object, Photo): | |
141 | acl = self.get_acl(user, object) | |
142 | if acl is not None and acl > Role.read: | |
143 | result = True | |
144 | else: | |
145 | albums = self.get_albums(object) | |
146 | acl = self.get_acl(user, albums[0]) | |
147 | if acl is not None and acl > Role.read: | |
148 | result = True | |
149 | else: | |
150 | portfolio = self.get_portfolios(object) | |
151 | acl = self.get_acl(user, portfolio[0]) | |
152 | if acl is not None and acl > Role.read: | |
153 | result = True | |
154 | else: | |
155 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
156 | return result | |
157 | else: | |
158 | raise DBQueryException("{}: not supported".format(object)) | |
159 | ||
160 | def get_acl(self, user, object): | |
161 | if hasattr(user, '__tablename__') and hasattr(object, '__tablename__') and isinstance(user, User): | |
162 | if isinstance(object, Album): | |
163 | if user.is_admin or user == self.get_owner(object): | |
164 | return Role.admin | |
165 | acl = AccessRight.query.join(AccessRight.album).filter(Album.id == object.id) | |
166 | for a in acl: | |
167 | if AccessRight.query.get(a.id).user_id == user.id: | |
168 | return a.right | |
169 | # Check if access given by parent (Portfolio) | |
170 | return self.get_acl(user, object.portfolio) | |
171 | elif isinstance(object, Photo): | |
172 | if user.is_admin or user == self.get_owner(object): | |
173 | return Role.admin | |
174 | acl = AccessRight.query.join(AccessRight.photo).filter(Photo.id == object.id) | |
175 | for a in acl: | |
176 | if AccessRight.query.get(a.id).user_id == user.id: | |
177 | return a.right | |
178 | # Check if access given by parent (Album) | |
179 | return self.get_acl(user, object.album) | |
180 | elif isinstance(object, Portfolio): | |
181 | if user.is_admin or user == self.get_owner(object): | |
182 | return Role.admin | |
183 | acl = AccessRight.query.join(AccessRight.portfolio).filter(Portfolio.id == object.id) | |
184 | for a in acl: | |
185 | if AccessRight.query.get(a.id).user_id == user.id: | |
186 | return a.right | |
187 | else: | |
188 | raise DBQueryException("%s: not supported" % object.__tablename__) | |
189 | return None | |
190 | else: | |
191 | raise DBQueryException("{}: not supported".format(object)) | |
192 | ||
193 | def get_albums_for_user(self, user, owner = False, hidden = False): | |
194 | if isinstance(user, User): | |
195 | albums = [] | |
196 | # Find all albums with direct access | |
197 | objects = db.session.query(AlbumAcl, Album, AccessRight, Portfolio).join( | |
198 | Album, AlbumAcl.c.album == Album.id).join( | |
199 | AccessRight, AccessRight.id == AlbumAcl.c.acl).join( | |
200 | Portfolio, Portfolio.id == Album.portfolio_id).filter( | |
201 | AccessRight.user == user).all() | |
202 | for object in objects: | |
203 | if not owner and object.Portfolio.owner == user: | |
204 | continue | |
205 | if not hidden and not object.Album.visible: | |
206 | continue | |
207 | albums.append(object.Album) | |
208 | # Find all albums with access through inherited access rights from portfolio | |
209 | temp = [] | |
210 | objects = db.session.query(PortfolioAcl, Album, AccessRight, Portfolio).join( | |
211 | Portfolio, PortfolioAcl.c.portfolio == Portfolio.id).join( | |
212 | AccessRight, AccessRight.id == PortfolioAcl.c.acl).join( | |
213 | Album, Album.portfolio_id == Portfolio.id).all() | |
214 | for object in objects: | |
215 | if not owner and object.Portfolio.owner == user: | |
216 | continue | |
217 | if not hidden and not object.Album.visible: | |
218 | continue | |
219 | temp.append(object.Album) | |
220 | # Merge albums with temp removing duplicates | |
221 | albums = albums + list(set(temp) - set(albums)) | |
222 | return albums | |
223 | else: | |
224 | raise DBQueryException("%s: not supported" % object.__tablename__) |