70 lines
2.9 KiB
Python
70 lines
2.9 KiB
Python
|
|
from openai import OpenAI
|
||
|
|
from PIL import Image
|
||
|
|
import random
|
||
|
|
from tqdm import tqdm
|
||
|
|
from PIL import ImageSequence
|
||
|
|
import os
|
||
|
|
import base64
|
||
|
|
import mimetypes
|
||
|
|
import config
|
||
|
|
client = OpenAI(api_key=config.oaikey)
|
||
|
|
|
||
|
|
def check_image(floc):
|
||
|
|
print("MODERATION: Checking image",floc)
|
||
|
|
mime, encoding = mimetypes.guess_type(floc)
|
||
|
|
if mime is None:
|
||
|
|
return False
|
||
|
|
with open(floc,"rb") as f:
|
||
|
|
response = client.moderations.create(
|
||
|
|
model="omni-moderation-latest",
|
||
|
|
input=[
|
||
|
|
{
|
||
|
|
"type": "image_url",
|
||
|
|
"image_url": {
|
||
|
|
"url": f"data:{mime};base64,{base64.b64encode(f.read()).decode()}"
|
||
|
|
}
|
||
|
|
},
|
||
|
|
],
|
||
|
|
)
|
||
|
|
results = response.results[0]
|
||
|
|
flagged_categories = vars(results.categories)
|
||
|
|
print("MODDEBUG: Flagged categories for image:", flagged_categories)
|
||
|
|
return flagged_categories["sexual"] or flagged_categories.get("sexual_minors", False) # Some models may not have the "sexual/minors" category
|
||
|
|
|
||
|
|
|
||
|
|
|
||
|
|
def moderate(content_path): # Returns True if content is safe, False otherwise or if unsupported
|
||
|
|
if content_path.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp','.webp')):
|
||
|
|
return not check_image(content_path)
|
||
|
|
elif content_path.lower().endswith('.txt'):
|
||
|
|
with open(content_path, 'r') as file:
|
||
|
|
text = file.read()
|
||
|
|
response = client.moderations.create(
|
||
|
|
model="omni-moderation-latest",
|
||
|
|
input=text
|
||
|
|
)
|
||
|
|
results = response.results[0]
|
||
|
|
flagged_categories = vars(results.categories)
|
||
|
|
return flagged_categories["sexual"] or flagged_categories.get("sexual_minors", False)
|
||
|
|
elif content_path.lower().endswith(('.gif')):
|
||
|
|
# Currently, OpenAI does not support moderation for GIFs, so we use a hacky workaround
|
||
|
|
# by moderating all frames individually and flagging if any frame is flagged.
|
||
|
|
|
||
|
|
unsafe = False # Assume safe until proven otherwise in any frame
|
||
|
|
with Image.open(content_path) as img:
|
||
|
|
for frame in tqdm(ImageSequence.Iterator(img)):
|
||
|
|
# Save frame to a temporary file
|
||
|
|
temp_frame_path = f"temp_frame{random.randint(1,int(9e7))}.png"
|
||
|
|
frame.save(temp_frame_path)
|
||
|
|
if check_image(temp_frame_path): # Checks if an image contains adult content
|
||
|
|
unsafe = True
|
||
|
|
break
|
||
|
|
return not unsafe
|
||
|
|
return False # Unsupported file type, assume unsafe to protect users
|
||
|
|
|
||
|
|
def dummy_moderate(content_path): # Dummy moderation function that always returns True (for testing purposes)
|
||
|
|
return True
|
||
|
|
|
||
|
|
def dummy_moderate_schizo(content_path): # Dummy moderation function that randomly returns True or False (for testing purposes)
|
||
|
|
return random.choice([True, False]) # Called schizo because it has a schizophrenic behavior
|