Files
old-webchat-server/main.py

158 lines
6.4 KiB
Python
Executable File

#!/usr/bin/env python3
import asyncio
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
import websockets
import json
from better_profanity import profanity
from html_sanitizer import Sanitizer
with open("config/xss-config.py","r") as f:
configs = {}
exec(f.read(),configs) # run configuration script
sanitizer = Sanitizer(configs["config"]) # default configuration
# Store connected clients
ignore_profanity = True
connected = []
profanity.load_censor_words()
extraswears = ["hitler"]
profanity.add_censor_words(extraswears)
class Kick(Exception):
pass
async def sendwithappropriate(ws, clientdata, msg):
msg = sanitizer.sanitize(msg)
if clientdata["legacy"]:
await ws.send(msg) # Legacy protocol aka "V1 Protocol" which has less features
else:
await ws.send(json.dumps({"type": "message", "content": msg})) # New protocol aka "V2 Protocol2
async def handle_client(websocket, path):
# Add client to connected clients
clientdata = {"socket": websocket, "username": "Unknown", "legacy": False, "extradata": None}
connected.append(clientdata)
strikes = 0
maxstrikes = 5
try:
# Get the username from the first message
handshake = await websocket.recv()
try:
handshake = json.loads(handshake)
username = handshake["username"]
clientdata["extradata"] = handshake["clientdata"]
except json.JSONDecodeError as e:
print("Legacy handshake detected!")
clientdata["legacy"] = True
clientdata["extradata"] = {}
username = handshake
await sendwithappropriate(websocket, clientdata,
"SYSTEM: Attention! Your client is outdated (or does not support new handshake yet). Please update to use new features. Legacy support is still available for now, so you're okay for now!")
if profanity.contains_profanity(username) and not ignore_profanity:
await sendwithappropriate(websocket, clientdata,
f"SYSTEM: Your username '{username}' contains profanity. Please change it and reload the page to try again...")
raise Kick("profane username")
# Notify other clients about the new user
await joinMessage(username)
clientdata["username"] = username
async for jsonpacket in websocket:
try:
packet = json.loads(jsonpacket)
except json.JSONDecodeError as e:
print("Legacy message, forging one that complies with the new standards")
packet = {"type": "message", "message": jsonpacket}
if packet["type"] == "message":
message = packet["message"]
# Broadcast the message to all clients
if message.strip() == "/online":
await sendwithappropriate(websocket, clientdata, "People online:")
for client in connected:
await sendwithappropriate(websocket, clientdata, client["username"])
if profanity.contains_profanity(message) and not ignore_profanity:
strikes += 1
await sendwithappropriate(websocket, clientdata,
f"SYSTEM: {strikes}/{maxstrikes} strikes. Go above and you are kicked.")
if strikes > maxstrikes:
raise Kick("excessive swearing")
await broadcast(f"{username}: {message}")
elif packet["type"] == "unhandled":
print(f"{clientdata.username}'s client couldn't handle packet {packet.packet}")
except websockets.exceptions.ConnectionClosedError:
pass
except Kick as e:
print("Kicked due to", e)
await websocket.send("You have been kicked due to " + str(e))
finally:
# Remove client from connected clients
connected.remove(clientdata)
# Notify other clients about the user leaving
try:
await leaveMessage(username)
except UnboundLocalError:
pass # This causes an error if the user closes the tab using an old client just before joining fully
# so we catch it.
async def joinMessage(username):
# Send special kind of message when someone joins
print(f"{username} joined the chat")
if connected:
for clientdata in connected:
client = clientdata["socket"]
if clientdata["legacy"] == True:
await sendwithappropriate(client, clientdata, f"{username} joined the chat")
else:
await client.send(json.dumps({"type": "join", "username": username}))
async def leaveMessage(username):
# Send special kind of message when someone joins
print(f"{username} left the chat")
if connected:
for clientdata in connected:
client = clientdata["socket"]
if clientdata["legacy"] == True:
await sendwithappropriate(client, clientdata, f"{username} left the chat")
else:
await client.send(json.dumps({"type": "leave", "username": username}))
async def broadcast(message):
if message.replace(message[:message.find(": ") + 2], "").replace("\n", "") == "":
return
# Send message to all connected clients
print(profanity.censor(message.replace("\n", " "))) # Not even the server host wants to see stuff like that
if connected:
for clientdata in connected:
client = clientdata["socket"]
if True:
if ignore_profanity:
await sendwithappropriate(client, clientdata, message)
else:
await sendwithappropriate(client, clientdata, profanity.censor(message))
# GLUE
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
import re,sys
async def ainput(string: str) -> str:
await asyncio.get_event_loop().run_in_executor(
None, lambda s=string: sys.stdout.write(s+' '))
return await asyncio.get_event_loop().run_in_executor(
None, sys.stdin.readline)
async def glueer():
while True:
st = await ainput("CONS>")
if st.startswith("say "):
await broadcast(re.sub("say ","CONSOLE: ",st))
# Start the WebSocket server
print("Serving...")
start_server = websockets.serve(handle_client, "0.0.0.0", 8765)
print("Adding to loop...")
loop.create_task(glueer())
loop.run_until_complete(start_server)
loop.run_forever()