2025-08-20 20:20:07 +02:00
#!/usr/bin/env python3
import asyncio
2025-11-02 01:09:03 +00:00
loop = asyncio . new_event_loop ( )
asyncio . set_event_loop ( loop )
2025-08-20 20:20:07 +02:00
import websockets
import json
from better_profanity import profanity
from html_sanitizer import Sanitizer
with open ( " config/xss-config.py " , " r " ) as f :
configs = { }
exec ( f . read ( ) , configs ) # run configuration script
sanitizer = Sanitizer ( configs [ " config " ] ) # default configuration
# Store connected clients
ignore_profanity = True
connected = [ ]
profanity . load_censor_words ( )
extraswears = [ " hitler " ]
profanity . add_censor_words ( extraswears )
class Kick ( Exception ) :
pass
async def sendwithappropriate ( ws , clientdata , msg ) :
msg = sanitizer . sanitize ( msg )
if clientdata [ " legacy " ] :
await ws . send ( msg ) # Legacy protocol aka "V1 Protocol" which has less features
else :
await ws . send ( json . dumps ( { " type " : " message " , " content " : msg } ) ) # New protocol aka "V2 Protocol2
async def handle_client ( websocket , path ) :
# Add client to connected clients
clientdata = { " socket " : websocket , " username " : " Unknown " , " legacy " : False , " extradata " : None }
connected . append ( clientdata )
strikes = 0
maxstrikes = 5
try :
# Get the username from the first message
handshake = await websocket . recv ( )
try :
handshake = json . loads ( handshake )
username = handshake [ " username " ]
clientdata [ " extradata " ] = handshake [ " clientdata " ]
except json . JSONDecodeError as e :
print ( " Legacy handshake detected! " )
clientdata [ " legacy " ] = True
clientdata [ " extradata " ] = { }
username = handshake
await sendwithappropriate ( websocket , clientdata ,
" SYSTEM: Attention! Your client is outdated (or does not support new handshake yet). Please update to use new features. Legacy support is still available for now, so you ' re okay for now! " )
if profanity . contains_profanity ( username ) and not ignore_profanity :
await sendwithappropriate ( websocket , clientdata ,
f " SYSTEM: Your username ' { username } ' contains profanity. Please change it and reload the page to try again... " )
raise Kick ( " profane username " )
# Notify other clients about the new user
await joinMessage ( username )
clientdata [ " username " ] = username
async for jsonpacket in websocket :
try :
packet = json . loads ( jsonpacket )
except json . JSONDecodeError as e :
print ( " Legacy message, forging one that complies with the new standards " )
packet = { " type " : " message " , " message " : jsonpacket }
if packet [ " type " ] == " message " :
message = packet [ " message " ]
# Broadcast the message to all clients
if message . strip ( ) == " /online " :
await sendwithappropriate ( websocket , clientdata , " People online: " )
for client in connected :
await sendwithappropriate ( websocket , clientdata , client [ " username " ] )
if profanity . contains_profanity ( message ) and not ignore_profanity :
strikes + = 1
await sendwithappropriate ( websocket , clientdata ,
f " SYSTEM: { strikes } / { maxstrikes } strikes. Go above and you are kicked. " )
if strikes > maxstrikes :
raise Kick ( " excessive swearing " )
await broadcast ( f " { username } : { message } " )
elif packet [ " type " ] == " unhandled " :
print ( f " { clientdata . username } ' s client couldn ' t handle packet { packet . packet } " )
except websockets . exceptions . ConnectionClosedError :
pass
except Kick as e :
print ( " Kicked due to " , e )
await websocket . send ( " You have been kicked due to " + str ( e ) )
finally :
# Remove client from connected clients
connected . remove ( clientdata )
# Notify other clients about the user leaving
try :
await leaveMessage ( username )
except UnboundLocalError :
pass # This causes an error if the user closes the tab using an old client just before joining fully
# so we catch it.
async def joinMessage ( username ) :
# Send special kind of message when someone joins
print ( f " { username } joined the chat " )
if connected :
for clientdata in connected :
client = clientdata [ " socket " ]
if clientdata [ " legacy " ] == True :
await sendwithappropriate ( client , clientdata , f " { username } joined the chat " )
else :
await client . send ( json . dumps ( { " type " : " join " , " username " : username } ) )
async def leaveMessage ( username ) :
# Send special kind of message when someone joins
print ( f " { username } left the chat " )
if connected :
for clientdata in connected :
client = clientdata [ " socket " ]
if clientdata [ " legacy " ] == True :
await sendwithappropriate ( client , clientdata , f " { username } left the chat " )
else :
await client . send ( json . dumps ( { " type " : " leave " , " username " : username } ) )
async def broadcast ( message ) :
if message . replace ( message [ : message . find ( " : " ) + 2 ] , " " ) . replace ( " \n " , " " ) == " " :
return
# Send message to all connected clients
print ( profanity . censor ( message . replace ( " \n " , " " ) ) ) # Not even the server host wants to see stuff like that
if connected :
for clientdata in connected :
client = clientdata [ " socket " ]
if True :
if ignore_profanity :
await sendwithappropriate ( client , clientdata , message )
else :
await sendwithappropriate ( client , clientdata , profanity . censor ( message ) )
2025-11-02 01:09:03 +00:00
# GLUE
#loop = asyncio.new_event_loop()
#asyncio.set_event_loop(loop)
import re , sys
async def ainput ( string : str ) - > str :
await asyncio . get_event_loop ( ) . run_in_executor (
None , lambda s = string : sys . stdout . write ( s + ' ' ) )
return await asyncio . get_event_loop ( ) . run_in_executor (
None , sys . stdin . readline )
async def glueer ( ) :
while True :
st = await ainput ( " CONS> " )
if st . startswith ( " say " ) :
await broadcast ( re . sub ( " say " , " CONSOLE: " , st ) )
2025-08-20 20:20:07 +02:00
# Start the WebSocket server
print ( " Serving... " )
start_server = websockets . serve ( handle_client , " 0.0.0.0 " , 8765 )
print ( " Adding to loop... " )
2025-11-02 01:09:03 +00:00
loop . create_task ( glueer ( ) )
loop . run_until_complete ( start_server )
loop . run_forever ( )