190 lines
7.5 KiB
Python
190 lines
7.5 KiB
Python
|
|
import os
|
||
|
|
import uuid
|
||
|
|
import shutil
|
||
|
|
from werkzeug.security import check_password_hash as verify_password
|
||
|
|
from werkzeug.security import generate_password_hash
|
||
|
|
fdb_loc = "./noprinterdata/"
|
||
|
|
|
||
|
|
os.makedirs(fdb_loc,exist_ok=True)
|
||
|
|
|
||
|
|
class PostNotFoundError(FileNotFoundError):
|
||
|
|
pass
|
||
|
|
|
||
|
|
class Post:
|
||
|
|
def __init__(self,id,title,content_location,type,tags,date_created,creator_id):
|
||
|
|
self.id = id
|
||
|
|
self.title = title
|
||
|
|
self.contentloc = content_location
|
||
|
|
self.contentType = type
|
||
|
|
self.tags = tags
|
||
|
|
self.creator = creator_id
|
||
|
|
self.dateCreated = date_created
|
||
|
|
|
||
|
|
class User:
|
||
|
|
def __init__(self,id,username,password_hash):
|
||
|
|
self.id = id
|
||
|
|
self.username = username
|
||
|
|
self.password_hash = password_hash
|
||
|
|
|
||
|
|
def get_post(id):
|
||
|
|
posts_path = os.path.join(fdb_loc,"posts")
|
||
|
|
os.makedirs(posts_path,exist_ok=True)
|
||
|
|
if os.path.isdir(os.path.join(posts_path,id+"/")):
|
||
|
|
title = ""
|
||
|
|
if os.path.isfile(os.path.join(posts_path,id,"title.txt")):
|
||
|
|
with open(os.path.join(posts_path,id,"title.txt"),"r",encoding="utf-8") as f:
|
||
|
|
title = f.read()
|
||
|
|
tags = []
|
||
|
|
if os.path.isfile(os.path.join(posts_path,id,"tags.txt")):
|
||
|
|
with open(os.path.join(posts_path,id,"tags.txt"),"r",encoding="utf-8") as f:
|
||
|
|
tags = f.read().splitlines()
|
||
|
|
date_created = None
|
||
|
|
if os.path.isfile(os.path.join(posts_path,id,"date_created.txt")):
|
||
|
|
with open(os.path.join(posts_path,id,"date_created.txt"),"r",encoding="utf-8") as f:
|
||
|
|
date_created = f.read()
|
||
|
|
# Check if theres any content file (file called post with any extension)
|
||
|
|
content_location = None
|
||
|
|
content_type = None
|
||
|
|
creator_id = None
|
||
|
|
for file in os.listdir(os.path.join(posts_path,id)):
|
||
|
|
if file.startswith("post."):
|
||
|
|
content_location = os.path.join(posts_path,id,file)
|
||
|
|
content_type = file.split(".")[-1]
|
||
|
|
break
|
||
|
|
if os.path.isfile(os.path.join(posts_path,id,"creator_id.txt")):
|
||
|
|
with open(os.path.join(posts_path,id,"creator_id.txt"),"r",encoding="utf-8") as f:
|
||
|
|
creator_id = f.read()
|
||
|
|
return Post(id,title,content_location,content_type,tags,date_created,creator_id)
|
||
|
|
else:
|
||
|
|
raise PostNotFoundError()
|
||
|
|
|
||
|
|
def createPost(title,file_location,creator,tags):
|
||
|
|
posts_path = os.path.join(fdb_loc,"posts")
|
||
|
|
os.makedirs(posts_path,exist_ok=True)
|
||
|
|
id = str(uuid.uuid4())
|
||
|
|
post_path = os.path.join(posts_path,id)
|
||
|
|
os.makedirs(post_path,exist_ok=True)
|
||
|
|
# Save title
|
||
|
|
with open(os.path.join(post_path,"title.txt"),"w",encoding="utf-8") as f:
|
||
|
|
f.write(title)
|
||
|
|
# Save tags
|
||
|
|
with open(os.path.join(post_path,"tags.txt"),"w",encoding="utf-8") as f:
|
||
|
|
f.write("\n".join(tags))
|
||
|
|
# Save creator id
|
||
|
|
with open(os.path.join(post_path,"creator_id.txt"),"w",encoding="utf-8") as f:
|
||
|
|
f.write(creator)
|
||
|
|
# Save date created
|
||
|
|
from datetime import datetime
|
||
|
|
with open(os.path.join(post_path,"date_created.txt"),"w",encoding="utf-8") as f:
|
||
|
|
f.write(datetime.utcnow().isoformat()+"Z")
|
||
|
|
# Save content file
|
||
|
|
ext = file_location.split(".")[-1]
|
||
|
|
content_location = os.path.join(post_path,f"post.{ext}")
|
||
|
|
with open(file_location,"rb") as src:
|
||
|
|
with open(content_location,"wb") as dst:
|
||
|
|
dst.write(src.read())
|
||
|
|
return id
|
||
|
|
|
||
|
|
def deletePost(id):
|
||
|
|
posts_path = os.path.join(fdb_loc,"posts")
|
||
|
|
post_path = os.path.join(posts_path,id)
|
||
|
|
if os.path.isdir(post_path):
|
||
|
|
# Delete all files in the directory
|
||
|
|
for file in os.listdir(post_path):
|
||
|
|
os.remove(os.path.join(post_path,file))
|
||
|
|
# Delete the directory
|
||
|
|
shutil.rmtree(post_path)
|
||
|
|
else:
|
||
|
|
raise PostNotFoundError()
|
||
|
|
|
||
|
|
def listPosts():
|
||
|
|
posts_path = os.path.join(fdb_loc,"posts")
|
||
|
|
os.makedirs(posts_path,exist_ok=True)
|
||
|
|
post_ids = []
|
||
|
|
for entry in os.listdir(posts_path):
|
||
|
|
if os.path.isdir(os.path.join(posts_path,entry)):
|
||
|
|
# Get post id
|
||
|
|
post_ids.append(get_post(entry))
|
||
|
|
return post_ids
|
||
|
|
|
||
|
|
def listPostsByCreator(creator_id):
|
||
|
|
posts_path = os.path.join(fdb_loc,"posts")
|
||
|
|
os.makedirs(posts_path,exist_ok=True)
|
||
|
|
post_ids = []
|
||
|
|
for entry in os.listdir(posts_path):
|
||
|
|
if os.path.isdir(os.path.join(posts_path,entry)):
|
||
|
|
# Check creator id
|
||
|
|
if os.path.isfile(os.path.join(posts_path,entry,"creator_id.txt")):
|
||
|
|
with open(os.path.join(posts_path,entry,"creator_id.txt"),"r",encoding="utf-8") as f:
|
||
|
|
if f.read() == creator_id:
|
||
|
|
post_ids.append(entry)
|
||
|
|
return post_ids
|
||
|
|
|
||
|
|
def createUser(username,password):
|
||
|
|
password_hash = generate_password_hash(password)
|
||
|
|
users_path = os.path.join(fdb_loc,"users")
|
||
|
|
os.makedirs(users_path,exist_ok=True)
|
||
|
|
id = str(uuid.uuid4())
|
||
|
|
user_path = os.path.join(users_path,id)
|
||
|
|
if os.path.isdir(user_path):
|
||
|
|
raise FileExistsError("User already exists")
|
||
|
|
os.makedirs(user_path,exist_ok=True)
|
||
|
|
with open(os.path.join(user_path,"password_hash.txt"),"w",encoding="utf-8") as f:
|
||
|
|
f.write(password_hash)
|
||
|
|
with open(os.path.join(user_path,"username.txt"),"w",encoding="utf-8") as f:
|
||
|
|
f.write(username)
|
||
|
|
return username
|
||
|
|
|
||
|
|
def lookup(id):
|
||
|
|
users_path = os.path.join(fdb_loc,"users")
|
||
|
|
os.makedirs(users_path,exist_ok=True)
|
||
|
|
user_path = os.path.join(users_path,id)
|
||
|
|
if os.path.isdir(user_path):
|
||
|
|
password_hash = None
|
||
|
|
usern = None
|
||
|
|
if os.path.isfile(os.path.join(user_path,"username.txt")):
|
||
|
|
with open(os.path.join(user_path,"username.txt"),"r",encoding="utf-8") as f:
|
||
|
|
usern = f.read()
|
||
|
|
if os.path.isfile(os.path.join(user_path,"password_hash.txt")):
|
||
|
|
with open(os.path.join(user_path,"password_hash.txt"),"r",encoding="utf-8") as f:
|
||
|
|
password_hash = f.read()
|
||
|
|
return User(id,usern,password_hash)
|
||
|
|
else:
|
||
|
|
raise FileNotFoundError("User not found")
|
||
|
|
|
||
|
|
def deleteUser(id):
|
||
|
|
users_path = os.path.join(fdb_loc,"users")
|
||
|
|
user_path = os.path.join(users_path,id)
|
||
|
|
if os.path.isdir(user_path):
|
||
|
|
# Delete all files in the directory
|
||
|
|
for file in os.listdir(user_path):
|
||
|
|
os.remove(os.path.join(user_path,file))
|
||
|
|
# Delete the directory
|
||
|
|
shutil.rmtree(user_path)
|
||
|
|
else:
|
||
|
|
raise FileNotFoundError("User not found")
|
||
|
|
|
||
|
|
def lookupByUsername(username):
|
||
|
|
users_path = os.path.join(fdb_loc,"users")
|
||
|
|
os.makedirs(users_path,exist_ok=True)
|
||
|
|
for entry in os.listdir(users_path):
|
||
|
|
if os.path.isdir(os.path.join(users_path,entry)):
|
||
|
|
user_path = os.path.join(users_path,entry)
|
||
|
|
if os.path.isfile(os.path.join(user_path,"username.txt")):
|
||
|
|
with open(os.path.join(user_path,"username.txt"),"r",encoding="utf-8") as f:
|
||
|
|
if f.read() == username:
|
||
|
|
password_hash = None
|
||
|
|
if os.path.isfile(os.path.join(user_path,"password_hash.txt")):
|
||
|
|
with open(os.path.join(user_path,"password_hash.txt"),"r",encoding="utf-8") as f2:
|
||
|
|
password_hash = f2.read()
|
||
|
|
return User(entry,username,password_hash)
|
||
|
|
raise FileNotFoundError("User not found")
|
||
|
|
|
||
|
|
def listUsers():
|
||
|
|
users_path = os.path.join(fdb_loc,"users")
|
||
|
|
os.makedirs(users_path,exist_ok=True)
|
||
|
|
user_ids = []
|
||
|
|
for entry in os.listdir(users_path):
|
||
|
|
if os.path.isdir(os.path.join(users_path,entry)):
|
||
|
|
user_ids.append(lookup(entry))
|
||
|
|
return user_ids
|