bruh
This commit is contained in:
190
filedb.py
Normal file
190
filedb.py
Normal file
@@ -0,0 +1,190 @@
|
||||
import os
|
||||
import uuid
|
||||
import shutil
|
||||
from werkzeug.security import check_password_hash as verify_password
|
||||
from werkzeug.security import generate_password_hash
|
||||
fdb_loc = "./noprinterdata/"
|
||||
|
||||
os.makedirs(fdb_loc,exist_ok=True)
|
||||
|
||||
class PostNotFoundError(FileNotFoundError):
|
||||
pass
|
||||
|
||||
class Post:
|
||||
def __init__(self,id,title,content_location,type,tags,date_created,creator_id):
|
||||
self.id = id
|
||||
self.title = title
|
||||
self.contentloc = content_location
|
||||
self.contentType = type
|
||||
self.tags = tags
|
||||
self.creator = creator_id
|
||||
self.dateCreated = date_created
|
||||
|
||||
class User:
|
||||
def __init__(self,id,username,password_hash):
|
||||
self.id = id
|
||||
self.username = username
|
||||
self.password_hash = password_hash
|
||||
|
||||
def get_post(id):
|
||||
posts_path = os.path.join(fdb_loc,"posts")
|
||||
os.makedirs(posts_path,exist_ok=True)
|
||||
if os.path.isdir(os.path.join(posts_path,id+"/")):
|
||||
title = ""
|
||||
if os.path.isfile(os.path.join(posts_path,id,"title.txt")):
|
||||
with open(os.path.join(posts_path,id,"title.txt"),"r",encoding="utf-8") as f:
|
||||
title = f.read()
|
||||
tags = []
|
||||
if os.path.isfile(os.path.join(posts_path,id,"tags.txt")):
|
||||
with open(os.path.join(posts_path,id,"tags.txt"),"r",encoding="utf-8") as f:
|
||||
tags = f.read().splitlines()
|
||||
date_created = None
|
||||
if os.path.isfile(os.path.join(posts_path,id,"date_created.txt")):
|
||||
with open(os.path.join(posts_path,id,"date_created.txt"),"r",encoding="utf-8") as f:
|
||||
date_created = f.read()
|
||||
# Check if theres any content file (file called post with any extension)
|
||||
content_location = None
|
||||
content_type = None
|
||||
creator_id = None
|
||||
for file in os.listdir(os.path.join(posts_path,id)):
|
||||
if file.startswith("post."):
|
||||
content_location = os.path.join(posts_path,id,file)
|
||||
content_type = file.split(".")[-1]
|
||||
break
|
||||
if os.path.isfile(os.path.join(posts_path,id,"creator_id.txt")):
|
||||
with open(os.path.join(posts_path,id,"creator_id.txt"),"r",encoding="utf-8") as f:
|
||||
creator_id = f.read()
|
||||
return Post(id,title,content_location,content_type,tags,date_created,creator_id)
|
||||
else:
|
||||
raise PostNotFoundError()
|
||||
|
||||
def createPost(title,file_location,creator,tags):
|
||||
posts_path = os.path.join(fdb_loc,"posts")
|
||||
os.makedirs(posts_path,exist_ok=True)
|
||||
id = str(uuid.uuid4())
|
||||
post_path = os.path.join(posts_path,id)
|
||||
os.makedirs(post_path,exist_ok=True)
|
||||
# Save title
|
||||
with open(os.path.join(post_path,"title.txt"),"w",encoding="utf-8") as f:
|
||||
f.write(title)
|
||||
# Save tags
|
||||
with open(os.path.join(post_path,"tags.txt"),"w",encoding="utf-8") as f:
|
||||
f.write("\n".join(tags))
|
||||
# Save creator id
|
||||
with open(os.path.join(post_path,"creator_id.txt"),"w",encoding="utf-8") as f:
|
||||
f.write(creator)
|
||||
# Save date created
|
||||
from datetime import datetime
|
||||
with open(os.path.join(post_path,"date_created.txt"),"w",encoding="utf-8") as f:
|
||||
f.write(datetime.utcnow().isoformat()+"Z")
|
||||
# Save content file
|
||||
ext = file_location.split(".")[-1]
|
||||
content_location = os.path.join(post_path,f"post.{ext}")
|
||||
with open(file_location,"rb") as src:
|
||||
with open(content_location,"wb") as dst:
|
||||
dst.write(src.read())
|
||||
return id
|
||||
|
||||
def deletePost(id):
|
||||
posts_path = os.path.join(fdb_loc,"posts")
|
||||
post_path = os.path.join(posts_path,id)
|
||||
if os.path.isdir(post_path):
|
||||
# Delete all files in the directory
|
||||
for file in os.listdir(post_path):
|
||||
os.remove(os.path.join(post_path,file))
|
||||
# Delete the directory
|
||||
shutil.rmtree(post_path)
|
||||
else:
|
||||
raise PostNotFoundError()
|
||||
|
||||
def listPosts():
|
||||
posts_path = os.path.join(fdb_loc,"posts")
|
||||
os.makedirs(posts_path,exist_ok=True)
|
||||
post_ids = []
|
||||
for entry in os.listdir(posts_path):
|
||||
if os.path.isdir(os.path.join(posts_path,entry)):
|
||||
# Get post id
|
||||
post_ids.append(get_post(entry))
|
||||
return post_ids
|
||||
|
||||
def listPostsByCreator(creator_id):
|
||||
posts_path = os.path.join(fdb_loc,"posts")
|
||||
os.makedirs(posts_path,exist_ok=True)
|
||||
post_ids = []
|
||||
for entry in os.listdir(posts_path):
|
||||
if os.path.isdir(os.path.join(posts_path,entry)):
|
||||
# Check creator id
|
||||
if os.path.isfile(os.path.join(posts_path,entry,"creator_id.txt")):
|
||||
with open(os.path.join(posts_path,entry,"creator_id.txt"),"r",encoding="utf-8") as f:
|
||||
if f.read() == creator_id:
|
||||
post_ids.append(entry)
|
||||
return post_ids
|
||||
|
||||
def createUser(username,password):
|
||||
password_hash = generate_password_hash(password)
|
||||
users_path = os.path.join(fdb_loc,"users")
|
||||
os.makedirs(users_path,exist_ok=True)
|
||||
id = str(uuid.uuid4())
|
||||
user_path = os.path.join(users_path,id)
|
||||
if os.path.isdir(user_path):
|
||||
raise FileExistsError("User already exists")
|
||||
os.makedirs(user_path,exist_ok=True)
|
||||
with open(os.path.join(user_path,"password_hash.txt"),"w",encoding="utf-8") as f:
|
||||
f.write(password_hash)
|
||||
with open(os.path.join(user_path,"username.txt"),"w",encoding="utf-8") as f:
|
||||
f.write(username)
|
||||
return username
|
||||
|
||||
def lookup(id):
|
||||
users_path = os.path.join(fdb_loc,"users")
|
||||
os.makedirs(users_path,exist_ok=True)
|
||||
user_path = os.path.join(users_path,id)
|
||||
if os.path.isdir(user_path):
|
||||
password_hash = None
|
||||
usern = None
|
||||
if os.path.isfile(os.path.join(user_path,"username.txt")):
|
||||
with open(os.path.join(user_path,"username.txt"),"r",encoding="utf-8") as f:
|
||||
usern = f.read()
|
||||
if os.path.isfile(os.path.join(user_path,"password_hash.txt")):
|
||||
with open(os.path.join(user_path,"password_hash.txt"),"r",encoding="utf-8") as f:
|
||||
password_hash = f.read()
|
||||
return User(id,usern,password_hash)
|
||||
else:
|
||||
raise FileNotFoundError("User not found")
|
||||
|
||||
def deleteUser(id):
|
||||
users_path = os.path.join(fdb_loc,"users")
|
||||
user_path = os.path.join(users_path,id)
|
||||
if os.path.isdir(user_path):
|
||||
# Delete all files in the directory
|
||||
for file in os.listdir(user_path):
|
||||
os.remove(os.path.join(user_path,file))
|
||||
# Delete the directory
|
||||
shutil.rmtree(user_path)
|
||||
else:
|
||||
raise FileNotFoundError("User not found")
|
||||
|
||||
def lookupByUsername(username):
|
||||
users_path = os.path.join(fdb_loc,"users")
|
||||
os.makedirs(users_path,exist_ok=True)
|
||||
for entry in os.listdir(users_path):
|
||||
if os.path.isdir(os.path.join(users_path,entry)):
|
||||
user_path = os.path.join(users_path,entry)
|
||||
if os.path.isfile(os.path.join(user_path,"username.txt")):
|
||||
with open(os.path.join(user_path,"username.txt"),"r",encoding="utf-8") as f:
|
||||
if f.read() == username:
|
||||
password_hash = None
|
||||
if os.path.isfile(os.path.join(user_path,"password_hash.txt")):
|
||||
with open(os.path.join(user_path,"password_hash.txt"),"r",encoding="utf-8") as f2:
|
||||
password_hash = f2.read()
|
||||
return User(entry,username,password_hash)
|
||||
raise FileNotFoundError("User not found")
|
||||
|
||||
def listUsers():
|
||||
users_path = os.path.join(fdb_loc,"users")
|
||||
os.makedirs(users_path,exist_ok=True)
|
||||
user_ids = []
|
||||
for entry in os.listdir(users_path):
|
||||
if os.path.isdir(os.path.join(users_path,entry)):
|
||||
user_ids.append(lookup(entry))
|
||||
return user_ids
|
||||
Reference in New Issue
Block a user