# -*- coding: utf-8 -*- # Copyright (c) 2018 Michael Rasmussen # This file is part of SecureMail. # SecureMail is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # SecureMail is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with SecureMail. If not, see . # sqlite # create table account ( # id int auto_increment, # token char(128) unique not null, # cipher text not null, # primary key (id)); # # mysql # create table account ( # id int auto_increment, # token char(128) unique not null, # cipher text not null, # primary key (id)); # # postgresql # create table account ( # id serial, # token char(128) unique not null, # cipher bytea not null, # primary key (id)); import base64 from config import DBTYPE, DBNAME try: from config import DBUID except ImportError: DBUID = 'backend' try: from config import DBPWD except ImportError: DBPWD = 'clV77B2ZJQxr' try: from config import DBHOST except ImportError: DBHOST = 'localhost' try: from config import DBPORT except ImportError: if DBTYPE == 'mysql': DBPORT = 3306 elif DBTYPE == 'postgresql': DBPORT = 5432 from cryptonize import Cryptonize class Singleton: def __init__(self, klass): self.klass = klass self.instance = None def __call__(self, *args, **kwargs): if self.instance == None: self.instance = self.klass(*args, **kwargs) return self.instance @Singleton class DB: conn = None def get_connection(self): if self.conn is None: if DBTYPE == 'mysql': import MySQLdb self.conn = MySQLdb.connect(host=DBHOST, port=DBPORT, user=DBUID, password=DBPWD, database=DBNAME) elif DBTYPE == 'postgresql': import psycopg2 self.conn = psycopg2.connect(host=DBHOST, port=DBPORT, user=DBUID, password=DBPWD, dbname=DBNAME) elif DBTYPE == 'sqlite': import apsw self.conn = apsw.Connection('./{0}.db'.format(DBNAME)) else: raise ValueError('{0}: Unsupported database'.format(DBTYPE)) return self.conn def __del__(self): if self.conn is not None: self.conn.close() class DBInterface: @staticmethod def load_user(key): conn = DB().get_connection() cursor = conn.cursor() cursor.execute("select a.cipher from account a where token = '{0}'".format(key)) row = cursor.fetchone() if row is None: obj = None else: c = Cryptonize() msg = base64.b64decode(row[0]) obj = c.create_EncryptedMessage(msg) cursor.close() return obj @staticmethod def store_user(key, cipher): if DBTYPE == 'mysql': from MySQLdb import Error as DBError elif DBTYPE == 'postgresql': from psycopg2 import Error as DBError elif DBTYPE == 'sqlite': from apsw import Error as DBError conn = DB().get_connection() cursor = conn.cursor() raw = base64.b64encode(cipher) try: if DBTYPE != 'sqlite': cursor.execute("insert into account(token, cipher) values(%s, %s)", (key, raw)) conn.commit() else: cursor.execute('begin') cursor.execute("insert into account(token, cipher) values(?, ?)", (key, raw)) cursor.execute('commit') except DBError as e: print (e) if DBTYPE != 'sqlite': conn.rollback() else: cursor.execute('rollback') raise e finally: cursor.close()