twitools/dbtools/__init__.py
2017-03-23 18:14:22 +01:00

200 lines
5.4 KiB
Python

import setuptools
import sqlite3, pymysql, pymysql.cursors
SQLITE = 0
MYSQL = 1
MARIADB = MYSQL
MIN = 0
MAX = 1
class dbObject:
# --------------------------------------------- Initialization -------------------------------------------------
def initMySQL(self, host, port, user, pwd, db):
self.conn = pymysql.connect(host = host, port = port, user = user, password = pwd, db = db, charset = "utf8mb4", cursorclass = pymysql.cursors.DictCursor)
self.cur = self.conn.cursor()
self.dbtype = MYSQL
self.host = host
self.port = port
self.user = user
self.pwd = pwd
self.db = db
def initSQLite(self, path):
self.conn = sqlite3.connect(path)
self.cur = self.conn.cursor()
self.dbtype = SQLITE
self.path = path
def __init__(self, dbtype = SQLITE, path = None, host = None, port = None, user = None, pwd = None, db = None):
if dbtype == SQLITE:
self.initSQLite(path or 'Database.db')
elif dbtype == MYSQL:
self.initMySQL(host or 'localhost', port or 3306, user, pwd, db)
else:
raise ValueError("Unknown database type %s." % str(dbtype))
# ---------------------------------------------- No more initialization ----------------------------------------
def closeConnection(self):
return self.conn.close()
def commit(self):
return self.conn.commit()
def executeQuery(self, query):
return self.cur.execute(query)
def getAll(self):
return self.cur.fetchall()
def getNext(self):
return self.cur.fetchone()
def isInitialized(self):
try:
self.executeQuery("SELECT * FROM tweets")
return True
except:
return False
def getFLDate(self, val = MIN):
if val == MIN:
mode = "MIN"
else:
mode = "MAX"
if self.dbtype == SQLITE:
return setuptools.getDate(str(list(self.executeQuery("SELECT %s(SUBSTR(timestamp,0,11)) FROM tweets" % mode))[0][0]))
else:
self.executeQuery("SELECT %s(SUBSTR(timestamp,0,11)) FROM tweets" % mode)
return setuptools.getDate(str(self.getNext()["%s(SUBSTR(timestamp,0,11))" % mode]))
def getFollowers(db):
db.executeQuery("SELECT id FROM followers WHERE `until` = 0;")
for i in db.getAll():
yield int(i[0])
def getFollowing(db):
db.executeQuery("SELECT id FROM following WHERE `until` = 0;")
for i in db.getAll():
yield int(i[0])
def getLatestMessage(db):
db.executeQuery("SELECT max(id) FROM messages")
try:
return int(db.getNext()[0])
except:
return 0
def getLatestTweet(db):
db.executeQuery("SELECT max(tweet_id) FROM tweets")
try:
return int(db.getNext()[0])
except:
return 0
def matchNameID(db, name, id):
db.executeQuery("SELECT COUNT(*) FROM names WHERE id = '%s' AND name = '%s' AND until = 0;" % (id, name))
try:
if int(db.getNext()[0]) != 0:
return True
except:
pass
return False
def deleteUser(self, cid):
self.executeQuery("DELETE FROM tokens WHERE cid = %i;" % int(cid))
self.commit()
def storeUser(self, cid, ato, ase):
self.executeQuery("DELETE FROM tokens WHERE cid = %i;" % int(cid))
self.executeQuery("INSERT INTO tokens(cid, ato, ase) VALUES(%i, '%s', '%s');" % (int(cid), ato, ase))
self.commit()
def ato(self, cid):
try:
self.executeQuery("SELECT ato FROM tokens WHERE cid = %i;" % int(cid))
return self.cur.fetchone()[0]
except:
return False
def ase(self, cid):
try:
self.executeQuery("SELECT ase FROM tokens WHERE cid = %i;" % int(cid))
return self.cur.fetchone()[0]
except:
return False
def getBStatus(self, cid):
try:
self.executeQuery("SELECT broadcast FROM tokens WHERE cid = %i;" % int(cid))
return True if int(self.cur.fetchone()[0]) == 1 else False
except:
raise ValueError("No such user: %i" % int(cid))
def toggleBroadcasts(self, cid):
self.executeQuery("UPDATE tokens SET broadcast = NOT broadcast WHERE cid = %i;" % int(cid))
self.commit()
return self.getBStatus(cid)
def getTStatus(self, cid):
try:
self.executeQuery("SELECT tweet FROM tokens WHERE cid = %i;" % int(cid))
return True if int(self.cur.fetchone()[0]) == 1 else False
except:
raise ValueError("No such user: %i" % int(cid))
def toggleTweet(self, cid):
self.executeQuery("UPDATE tokens SET tweet = NOT tweet WHERE cid = %i;" % int(cid))
self.commit()
return self.getTStatus(cid)
def getMStatus(self, cid):
try:
self.executeQuery("SELECT mentions FROM tokens WHERE cid = %i;" % int(cid))
return True if int(self.cur.fetchone()[0]) == 1 else False
except:
raise ValueError("No such user: %i" % int(cid))
def toggleMentions(self, cid):
self.executeQuery("UPDATE tokens SET mentions = NOT mentions WHERE cid = %i;" % int(cid))
self.commit()
return self.getMStatus(cid)
def mentionsOn(self):
self.executeQuery("SELECT cid FROM tokens WHERE mentions;")
for u in self.getAll():
yield u[0]
def allUsers(self):
self.executeQuery("SELECT cid FROM tokens;")
for u in self.getAll():
yield u[0]
def addFish(self, cid):
self.executeQuery("UPDATE tokens SET fish = fish + 1 WHERE cid = %i;" % int(cid))
self.commit()
def storeToken(self, cid, ato):
self.executeQuery('INSERT INTO tokens(cid, ato) VALUES(%i, "%s");' % (int(cid), ato))
self.commit()
def dbHelper():
if setuptools.dbtype() == SQLITE:
return dbObject(dbtype=SQLITE, path=setuptools.dbpath())
elif setuptools.dbtype() == MYSQL:
return dbObject(dbtype=MYSQL, host=setuptools.dbhost(), user=setuptools.dbuser(), pwd=setuptools.dbpass(), db=setuptools.dbname())
else:
raise setuptools.SetupException()