Module mongoset.database
Expand source code
from typing import List, Optional, Union
import pymongo
import pymongo.results
from bson import ObjectId
class Database:
def __init__(self, uri="", db_name="database"):
# write_concern = pymongo.write_concern.WriteConcern(w="majority", fsync=True)
write_concern = pymongo.write_concern.WriteConcern(w=3, fsync=True)
read_concern = pymongo.read_concern.ReadConcern(level="majority")
self.client = pymongo.MongoClient(
uri, w="majority", fsync=True, wTimeoutMS=0, readPreference="primary"
)
self.db_name = db_name
def __getitem__(self, table):
return Table(self.client, db_name=self.db_name, table_name=table)
def drop(self):
"""
Drops collection
"""
self.client.drop_database(self.db_name)
class Table:
def __init__(
self,
client: pymongo.MongoClient,
db_name: str = "database",
table_name: str = "table",
):
self.client = client
self.db_name = db_name
self.table_name = table_name
self.table = self.client[self.db_name][self.table_name]
def insert(self, row: dict) -> Union[bool, str]:
"""
Inserts row. Returns true if the write is acknowledged.
"""
insert_response = self.table.insert_one(row)
if insert_response.acknowledged:
return str(insert_response.inserted_id)
else:
return False
def upsert(self, row: dict, key: List[str] = None) -> bool:
"""
Upserts a row. Returns the number of documents modified. By default, if _id is not passed, it'll attempt to insert. Will return 0 if a document was inserted.
"""
row = self._convert_id_to_obj(row)
if (not key) and "_id" in row:
key = ["_id"]
elif not key:
raise ValueError("No key provided")
f = {a: b for a, b in [(i, row[i]) for i in key]}
# update_response = self.table.update_one(f, {"$set": row}, upsert=True)
update_response = self.table.update_one(f, {"$set": row}, upsert=True)
# if (not update_response.modified_count) and (not update_response.matched_count):
# return self.insert(row)
# else:
# return update_response.modified_count
return bool(update_response.modified_count)
def update(self, row: dict, key: List[str] = None) -> bool:
"""
Updates a row. Returns the number of documents modified. Good for locking and other sensitive operations.
"""
row = self._convert_id_to_obj(row)
if (not key) and "_id" in row:
key = ["_id"]
elif not key:
raise ValueError("No key provided")
f = {a: b for a, b in [(i, row[i]) for i in key]}
# update_response = self.table.update_one(f, {"$set": row}, upsert=True)
update_response = self.table.update_one(f, {"$set": row})
return bool(update_response.modified_count)
def find_one(self, projection=None, **filter_expr) -> dict:
"""
Returns the first match
"""
filter_expr = self._convert_id_to_obj(self._eval_filter_expr(filter_expr))
response = self.table.find_one(filter_expr, projection)
if response:
return self._convert_id_to_str(dict(response))
return None
def find(self, projection=None, **filter_expr) -> List[dict]:
"""
Searches. Does not support comparison operators yet.
"""
filter_expr = self._convert_id_to_obj(self._eval_filter_expr(filter_expr))
return [
self._convert_id_to_str(dict(i))
for i in self.table.find(filter_expr, projection)
]
def all(self) -> List[dict]:
"""
Returns everything in the table
"""
return [dict(self._convert_id_to_str(i)) for i in self.table.find()]
def delete(self, **filter_expr) -> int:
"""
Deletes everything that matches. Returns the number of items deleted.
"""
if not filter_expr:
raise ValueError(
"Error! Empty filter expression! Call db.clear() if you want to delete everything"
)
delete_response = self.table.delete_many(
self._convert_id_to_obj(self._eval_filter_expr(filter_expr))
)
return delete_response.deleted_count
def clear(self) -> int:
"""
Clears the entire table. Returns the number of items that were deleted.
"""
return self.table.delete_many({}).deleted_count
def count(self, **filter_expr) -> int:
"""
Counts the number of items that match the filter expression
"""
return int(self.table.count_documents(self._eval_filter_expr(filter_expr)))
def index(self, key: str, unique=True, index_type=pymongo.ASCENDING):
"""
Creates an index on the collection. If unique is set to True, an unique index is created and enforced.
"""
index = pymongo.operations.IndexModel((key), name=key + "_index", unique=unique)
self.table.create_index(index)
def deindex(self, key: str):
"""
Removes an index in the collection.
"""
self.table.drop_index(key + "_index")
def deindex_all(self):
"""
Deletes all indexes
"""
self.table.drop_indexes()
__len__ = count
@staticmethod
def _eval_filter_expr(filer_expr: dict) -> dict:
for key, val in filer_expr.items():
if isinstance(val, Expression):
val = val.to_dict()
filer_expr[key] = val
if isinstance(val, tuple):
new_val = dict()
for expr in val:
assert isinstance(expr, Expression)
new_val.update(expr.to_dict())
filer_expr[key] = new_val
return filer_expr
@staticmethod
def _convert_id_to_str(data: dict) -> dict:
if "_id" in data:
data["_id"] = str(data["_id"])
return data
@staticmethod
def _convert_id_to_obj(data: dict) -> dict:
if "_id" in data:
data["_id"] = ObjectId(data["_id"])
return data
class Expression:
def __init__(self, key, val):
self.key: str = key
self.val: str = val
def to_dict(self) -> dict:
return {self.key: self.val}
Classes
class Database (uri='', db_name='database')-
Expand source code
class Database: def __init__(self, uri="", db_name="database"): # write_concern = pymongo.write_concern.WriteConcern(w="majority", fsync=True) write_concern = pymongo.write_concern.WriteConcern(w=3, fsync=True) read_concern = pymongo.read_concern.ReadConcern(level="majority") self.client = pymongo.MongoClient( uri, w="majority", fsync=True, wTimeoutMS=0, readPreference="primary" ) self.db_name = db_name def __getitem__(self, table): return Table(self.client, db_name=self.db_name, table_name=table) def drop(self): """ Drops collection """ self.client.drop_database(self.db_name)Methods
def drop(self)-
Drops collection
Expand source code
def drop(self): """ Drops collection """ self.client.drop_database(self.db_name)
class Expression (key, val)-
Expand source code
class Expression: def __init__(self, key, val): self.key: str = key self.val: str = val def to_dict(self) -> dict: return {self.key: self.val}Methods
def to_dict(self) ‑> dict-
Expand source code
def to_dict(self) -> dict: return {self.key: self.val}
class Table (client: pymongo.mongo_client.MongoClient, db_name: str = 'database', table_name: str = 'table')-
Expand source code
class Table: def __init__( self, client: pymongo.MongoClient, db_name: str = "database", table_name: str = "table", ): self.client = client self.db_name = db_name self.table_name = table_name self.table = self.client[self.db_name][self.table_name] def insert(self, row: dict) -> Union[bool, str]: """ Inserts row. Returns true if the write is acknowledged. """ insert_response = self.table.insert_one(row) if insert_response.acknowledged: return str(insert_response.inserted_id) else: return False def upsert(self, row: dict, key: List[str] = None) -> bool: """ Upserts a row. Returns the number of documents modified. By default, if _id is not passed, it'll attempt to insert. Will return 0 if a document was inserted. """ row = self._convert_id_to_obj(row) if (not key) and "_id" in row: key = ["_id"] elif not key: raise ValueError("No key provided") f = {a: b for a, b in [(i, row[i]) for i in key]} # update_response = self.table.update_one(f, {"$set": row}, upsert=True) update_response = self.table.update_one(f, {"$set": row}, upsert=True) # if (not update_response.modified_count) and (not update_response.matched_count): # return self.insert(row) # else: # return update_response.modified_count return bool(update_response.modified_count) def update(self, row: dict, key: List[str] = None) -> bool: """ Updates a row. Returns the number of documents modified. Good for locking and other sensitive operations. """ row = self._convert_id_to_obj(row) if (not key) and "_id" in row: key = ["_id"] elif not key: raise ValueError("No key provided") f = {a: b for a, b in [(i, row[i]) for i in key]} # update_response = self.table.update_one(f, {"$set": row}, upsert=True) update_response = self.table.update_one(f, {"$set": row}) return bool(update_response.modified_count) def find_one(self, projection=None, **filter_expr) -> dict: """ Returns the first match """ filter_expr = self._convert_id_to_obj(self._eval_filter_expr(filter_expr)) response = self.table.find_one(filter_expr, projection) if response: return self._convert_id_to_str(dict(response)) return None def find(self, projection=None, **filter_expr) -> List[dict]: """ Searches. Does not support comparison operators yet. """ filter_expr = self._convert_id_to_obj(self._eval_filter_expr(filter_expr)) return [ self._convert_id_to_str(dict(i)) for i in self.table.find(filter_expr, projection) ] def all(self) -> List[dict]: """ Returns everything in the table """ return [dict(self._convert_id_to_str(i)) for i in self.table.find()] def delete(self, **filter_expr) -> int: """ Deletes everything that matches. Returns the number of items deleted. """ if not filter_expr: raise ValueError( "Error! Empty filter expression! Call db.clear() if you want to delete everything" ) delete_response = self.table.delete_many( self._convert_id_to_obj(self._eval_filter_expr(filter_expr)) ) return delete_response.deleted_count def clear(self) -> int: """ Clears the entire table. Returns the number of items that were deleted. """ return self.table.delete_many({}).deleted_count def count(self, **filter_expr) -> int: """ Counts the number of items that match the filter expression """ return int(self.table.count_documents(self._eval_filter_expr(filter_expr))) def index(self, key: str, unique=True, index_type=pymongo.ASCENDING): """ Creates an index on the collection. If unique is set to True, an unique index is created and enforced. """ index = pymongo.operations.IndexModel((key), name=key + "_index", unique=unique) self.table.create_index(index) def deindex(self, key: str): """ Removes an index in the collection. """ self.table.drop_index(key + "_index") def deindex_all(self): """ Deletes all indexes """ self.table.drop_indexes() __len__ = count @staticmethod def _eval_filter_expr(filer_expr: dict) -> dict: for key, val in filer_expr.items(): if isinstance(val, Expression): val = val.to_dict() filer_expr[key] = val if isinstance(val, tuple): new_val = dict() for expr in val: assert isinstance(expr, Expression) new_val.update(expr.to_dict()) filer_expr[key] = new_val return filer_expr @staticmethod def _convert_id_to_str(data: dict) -> dict: if "_id" in data: data["_id"] = str(data["_id"]) return data @staticmethod def _convert_id_to_obj(data: dict) -> dict: if "_id" in data: data["_id"] = ObjectId(data["_id"]) return dataMethods
def all(self) ‑> List[dict]-
Returns everything in the table
Expand source code
def all(self) -> List[dict]: """ Returns everything in the table """ return [dict(self._convert_id_to_str(i)) for i in self.table.find()] def clear(self) ‑> int-
Clears the entire table. Returns the number of items that were deleted.
Expand source code
def clear(self) -> int: """ Clears the entire table. Returns the number of items that were deleted. """ return self.table.delete_many({}).deleted_count def count(self, **filter_expr) ‑> int-
Counts the number of items that match the filter expression
Expand source code
def count(self, **filter_expr) -> int: """ Counts the number of items that match the filter expression """ return int(self.table.count_documents(self._eval_filter_expr(filter_expr))) def deindex(self, key: str)-
Removes an index in the collection.
Expand source code
def deindex(self, key: str): """ Removes an index in the collection. """ self.table.drop_index(key + "_index") def deindex_all(self)-
Deletes all indexes
Expand source code
def deindex_all(self): """ Deletes all indexes """ self.table.drop_indexes() def delete(self, **filter_expr) ‑> int-
Deletes everything that matches. Returns the number of items deleted.
Expand source code
def delete(self, **filter_expr) -> int: """ Deletes everything that matches. Returns the number of items deleted. """ if not filter_expr: raise ValueError( "Error! Empty filter expression! Call db.clear() if you want to delete everything" ) delete_response = self.table.delete_many( self._convert_id_to_obj(self._eval_filter_expr(filter_expr)) ) return delete_response.deleted_count def find(self, projection=None, **filter_expr) ‑> List[dict]-
Searches. Does not support comparison operators yet.
Expand source code
def find(self, projection=None, **filter_expr) -> List[dict]: """ Searches. Does not support comparison operators yet. """ filter_expr = self._convert_id_to_obj(self._eval_filter_expr(filter_expr)) return [ self._convert_id_to_str(dict(i)) for i in self.table.find(filter_expr, projection) ] def find_one(self, projection=None, **filter_expr) ‑> dict-
Returns the first match
Expand source code
def find_one(self, projection=None, **filter_expr) -> dict: """ Returns the first match """ filter_expr = self._convert_id_to_obj(self._eval_filter_expr(filter_expr)) response = self.table.find_one(filter_expr, projection) if response: return self._convert_id_to_str(dict(response)) return None def index(self, key: str, unique=True, index_type=1)-
Creates an index on the collection. If unique is set to True, an unique index is created and enforced.
Expand source code
def index(self, key: str, unique=True, index_type=pymongo.ASCENDING): """ Creates an index on the collection. If unique is set to True, an unique index is created and enforced. """ index = pymongo.operations.IndexModel((key), name=key + "_index", unique=unique) self.table.create_index(index) def insert(self, row: dict) ‑> Union[bool, str]-
Inserts row. Returns true if the write is acknowledged.
Expand source code
def insert(self, row: dict) -> Union[bool, str]: """ Inserts row. Returns true if the write is acknowledged. """ insert_response = self.table.insert_one(row) if insert_response.acknowledged: return str(insert_response.inserted_id) else: return False def update(self, row: dict, key: List[str] = None) ‑> bool-
Updates a row. Returns the number of documents modified. Good for locking and other sensitive operations.
Expand source code
def update(self, row: dict, key: List[str] = None) -> bool: """ Updates a row. Returns the number of documents modified. Good for locking and other sensitive operations. """ row = self._convert_id_to_obj(row) if (not key) and "_id" in row: key = ["_id"] elif not key: raise ValueError("No key provided") f = {a: b for a, b in [(i, row[i]) for i in key]} # update_response = self.table.update_one(f, {"$set": row}, upsert=True) update_response = self.table.update_one(f, {"$set": row}) return bool(update_response.modified_count) def upsert(self, row: dict, key: List[str] = None) ‑> bool-
Upserts a row. Returns the number of documents modified. By default, if _id is not passed, it'll attempt to insert. Will return 0 if a document was inserted.
Expand source code
def upsert(self, row: dict, key: List[str] = None) -> bool: """ Upserts a row. Returns the number of documents modified. By default, if _id is not passed, it'll attempt to insert. Will return 0 if a document was inserted. """ row = self._convert_id_to_obj(row) if (not key) and "_id" in row: key = ["_id"] elif not key: raise ValueError("No key provided") f = {a: b for a, b in [(i, row[i]) for i in key]} # update_response = self.table.update_one(f, {"$set": row}, upsert=True) update_response = self.table.update_one(f, {"$set": row}, upsert=True) # if (not update_response.modified_count) and (not update_response.matched_count): # return self.insert(row) # else: # return update_response.modified_count return bool(update_response.modified_count)