#!/usr/bin/env python3 import asyncio import websockets import json from better_profanity import profanity from html_sanitizer import Sanitizer with open("config/xss-config.py","r") as f: configs = {} exec(f.read(),configs) # run configuration script sanitizer = Sanitizer(configs["config"]) # default configuration # Store connected clients ignore_profanity = True connected = [] profanity.load_censor_words() extraswears = ["hitler"] profanity.add_censor_words(extraswears) class Kick(Exception): pass async def sendwithappropriate(ws, clientdata, msg): msg = sanitizer.sanitize(msg) if clientdata["legacy"]: await ws.send(msg) # Legacy protocol aka "V1 Protocol" which has less features else: await ws.send(json.dumps({"type": "message", "content": msg})) # New protocol aka "V2 Protocol2 async def handle_client(websocket, path): # Add client to connected clients clientdata = {"socket": websocket, "username": "Unknown", "legacy": False, "extradata": None} connected.append(clientdata) strikes = 0 maxstrikes = 5 try: # Get the username from the first message handshake = await websocket.recv() try: handshake = json.loads(handshake) username = handshake["username"] clientdata["extradata"] = handshake["clientdata"] except json.JSONDecodeError as e: print("Legacy handshake detected!") clientdata["legacy"] = True clientdata["extradata"] = {} username = handshake await sendwithappropriate(websocket, clientdata, "SYSTEM: Attention! Your client is outdated (or does not support new handshake yet). Please update to use new features. Legacy support is still available for now, so you're okay for now!") if profanity.contains_profanity(username) and not ignore_profanity: await sendwithappropriate(websocket, clientdata, f"SYSTEM: Your username '{username}' contains profanity. Please change it and reload the page to try again...") raise Kick("profane username") # Notify other clients about the new user await joinMessage(username) clientdata["username"] = username async for jsonpacket in websocket: try: packet = json.loads(jsonpacket) except json.JSONDecodeError as e: print("Legacy message, forging one that complies with the new standards") packet = {"type": "message", "message": jsonpacket} if packet["type"] == "message": message = packet["message"] # Broadcast the message to all clients if message.strip() == "/online": await sendwithappropriate(websocket, clientdata, "People online:") for client in connected: await sendwithappropriate(websocket, clientdata, client["username"]) if profanity.contains_profanity(message) and not ignore_profanity: strikes += 1 await sendwithappropriate(websocket, clientdata, f"SYSTEM: {strikes}/{maxstrikes} strikes. Go above and you are kicked.") if strikes > maxstrikes: raise Kick("excessive swearing") await broadcast(f"{username}: {message}") elif packet["type"] == "unhandled": print(f"{clientdata.username}'s client couldn't handle packet {packet.packet}") except websockets.exceptions.ConnectionClosedError: pass except Kick as e: print("Kicked due to", e) await websocket.send("You have been kicked due to " + str(e)) finally: # Remove client from connected clients connected.remove(clientdata) # Notify other clients about the user leaving try: await leaveMessage(username) except UnboundLocalError: pass # This causes an error if the user closes the tab using an old client just before joining fully # so we catch it. async def joinMessage(username): # Send special kind of message when someone joins print(f"{username} joined the chat") if connected: for clientdata in connected: client = clientdata["socket"] if clientdata["legacy"] == True: await sendwithappropriate(client, clientdata, f"{username} joined the chat") else: await client.send(json.dumps({"type": "join", "username": username})) async def leaveMessage(username): # Send special kind of message when someone joins print(f"{username} left the chat") if connected: for clientdata in connected: client = clientdata["socket"] if clientdata["legacy"] == True: await sendwithappropriate(client, clientdata, f"{username} left the chat") else: await client.send(json.dumps({"type": "leave", "username": username})) async def broadcast(message): if message.replace(message[:message.find(": ") + 2], "").replace("\n", "") == "": return # Send message to all connected clients print(profanity.censor(message.replace("\n", " "))) # Not even the server host wants to see stuff like that if connected: for clientdata in connected: client = clientdata["socket"] if True: if ignore_profanity: await sendwithappropriate(client, clientdata, message) else: await sendwithappropriate(client, clientdata, profanity.censor(message)) # Start the WebSocket server print("Serving...") start_server = websockets.serve(handle_client, "0.0.0.0", 8765) print("Adding to loop...") asyncio.get_event_loop().run_until_complete(start_server) asyncio.get_event_loop().run_forever()