First commit: New bot with message queue and some services

main
Nicolas Ong 2024-01-30 21:36:36 +01:00
parent ca10381a5a
commit 484e042e17
11 changed files with 1002 additions and 0 deletions

100
ChatBot.py Normal file
View File

@ -0,0 +1,100 @@
from helpers.TwitchIrc import TwitchIrc
from utils.MessageQueue import MessageQueue
class ChatBot:
twitch_irc = TwitchIrc()
queue = MessageQueue()
services = []
users = {}
msg_help = ""
msg_unknown_cmd = ""
msg_hi = ""
msg_bye = ""
running = False
# Settings -----
def set_twitch_irc_settings(self, nickname, password, channel):
self.twitch_irc.join(nickname, password, channel)
def add_service(self, service):
service.set_message_queue(self.queue)
self.services.append(service)
def clear_services(self):
self.services = []
def set_help_message(self, message):
self.msg_help = message
def set_unknown_command_message(self, message):
self.msg_unknown_cmd = message
def set_welcome_message(self, message):
self.msg_hi = message
def set_farewell_message(self, message):
self.msg_bye = message
# Commands -----
def parse_response(self, response):
command = response["message"].split(" ", 1)[0][1:]
if command == "help":
self.display_help()
return
elif command == "stop":
self.stop_command(response["tags"]["badges"])
return
result = None
for service in self.services:
if not service.knows(command):
continue
result = service.eval(command, response, self.users)
return
self.queue.append(f"{self.msg_unknown_cmd}, @{response['username']}")
def display_help(self):
self.queue.append(f"{self.msg_help}{self.list_commands()}")
def list_commands(self):
commands = ["help"]
for service in self.services:
commands.extend(service.list_commands())
return f" !{' !'.join(commands)}"
def stop_command(self, badges):
for badge in badges:
if badge["name"] == "broadcaster":
self.stop()
return
# Listening -----
def start(self):
for service in self.services:
service.start()
self.twitch_irc.request_tags()
self.twitch_irc.start()
if self.msg_hi != "":
self.queue.append(self.msg_hi)
self.running = True
while self.running:
response = self.twitch_irc.receive()
if response and response["message"][:1] == "!":
self.parse_response(response)
while self.queue.pending():
self.twitch_irc.send(self.queue.shift())
def stop(self):
if self.msg_bye != "":
self.queue.append(self.msg_bye)
while self.queue.pending():
continue
self.running = False
self.twitch_irc.stop()
for service in self.services:
service.stop()

83
helpers/TwitchIrc.py Normal file
View File

@ -0,0 +1,83 @@
from utils.Irc import Irc
SERVER = "irc.twitch.tv"
PORT = 6667
CAP_URI = "twitch.tv"
class TwitchIrc:
irc = Irc()
nickname = ""
password = ""
channel = ""
capabilities = {"membership": False, "tags": False, "commands": False}
def join(self, nickname, password, channel):
self.nickname = nickname
self.password = password
self.channel = f"#{channel}"
def request_membership(self):
self.capabilities["membership"] = True
def request_tags(self):
self.capabilities["tags"] = True
def request_commands(self):
self.capabilities["commands"] = True
def start(self):
self.irc.connect(SERVER, PORT)
self.irc.identify(self.nickname, self.password)
self.irc.join(self.channel)
for cap, req in self.capabilities.items():
if req:
self.irc.cap_req(f"{CAP_URI}/{cap}")
def receive(self):
response = self.irc.receive()
if not response:
return
if "PRIVMSG" in response and self.channel in response:
return self.parse(response)
def parse(self, response_str):
response = {}
components = response_str.split(" ", 4 if self.capabilities["tags"] else 3)
response["username"] = components[1 if self.capabilities["tags"] else 0].split("!")[0][1:]
response["message"] = components[4 if self.capabilities["tags"] else 3][1:].strip()
if self.capabilities["tags"]:
response["tags"] = self.parse_tags(components[0][1:])
return response
def parse_tags(self, tags_str):
tags = {}
for tag in tags_str.split(";"):
name, value = tag.split("=")
if name == "badge-info":
if value != "":
value = self.parse_badge(value)
else:
value = None
elif name in ["badges", "emotes"]:
value = value.split(",")
if name == "badges":
badges = []
if value != [""]:
for badge in value:
badges.append(self.parse_badge(badge))
value = badges
else:
value = []
tags[name] = value
return tags
def parse_badge(self, badge_str):
name, value = badge_str.split("/")
return {"name": name, "value": value}
def send(self, msg):
self.irc.send(self.channel, msg)
def stop(self):
self.irc.disconnect()

97
obschatbot.py Normal file
View File

@ -0,0 +1,97 @@
from ChatBot import ChatBot
from obspython import *
from services.CommandService import CommandService
from services.DiceService import DiceService
from services.SpotifyService import SpotifyService
from services.TimedMessageService import TimedMessageService
from threading import Thread
DESCRIPTION = "A Twitch chat bot that interacts with OBS and other services.\n\nby kholo"
chatbot = ChatBot()
services = {
"command": CommandService(),
"dice": DiceService(),
"spotify": SpotifyService(),
"tmsg": TimedMessageService()
}
def set_services():
chatbot.clear_services()
for name, service in services.items():
if service.enabled:
print(f"- Enabling service {name}")
chatbot.add_service(service)
# Callbacks -----
def start(props, prop):
print("[ STARTING THE BOT ]")
set_services()
thread = Thread(target=chatbot.start)
thread.daemon = True
thread.start()
print("Bot started")
def stop(props, prop):
print("[ STOPPING THE BOT ]")
chatbot.stop()
print("Bot stopped")
# Helpers -----
def create_messages_properties():
props = obs_properties_create()
obs_properties_add_text(props, "msg_help", "Help message", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "msg_unknown_cmd", "Unknown command message", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "msg_hi", "Welcome message", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "msg_bye", "Farewell message", OBS_TEXT_DEFAULT)
return props
def create_twitch_irc_properties():
props = obs_properties_create()
obs_properties_add_text(props, "irc_nickname", "Nickname", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "irc_password", "OAuth password", OBS_TEXT_PASSWORD)
obs_properties_add_text(props, "irc_channel", "Channel", OBS_TEXT_DEFAULT)
return props
# OBS functions -----
def script_description():
return DESCRIPTION
def script_load(settings):
for service in services.values():
service.load(settings)
def script_properties():
props = obs_properties_create()
obs_properties_add_button(props, "start", "Start bot", start)
obs_properties_add_button(props, "stop", "Stop bot", stop)
obs_properties_add_group(props, "messages", "Default messages", OBS_GROUP_NORMAL, create_messages_properties())
obs_properties_add_group(props, "irc", "Twitch IRC", OBS_GROUP_NORMAL, create_twitch_irc_properties())
for name, service in services.items():
obs_properties_add_group(props, name, service.title, OBS_GROUP_CHECKABLE, service.create_properties())
return props
def script_update(settings):
chatbot.set_help_message(obs_data_get_string(settings, "msg_help"))
chatbot.set_unknown_command_message(obs_data_get_string(settings, "msg_unknown_cmd"))
chatbot.set_welcome_message(obs_data_get_string(settings, "msg_hi"))
chatbot.set_farewell_message(obs_data_get_string(settings, "msg_bye"))
chatbot.set_twitch_irc_settings(
obs_data_get_string(settings, "irc_nickname"),
obs_data_get_string(settings, "irc_password"),
obs_data_get_string(settings, "irc_channel")
)
for name, service in services.items():
if obs_data_get_bool(settings, name):
service.enabled = True
service.update(settings)
else:
service.enabled = False
def script_save(settings):
for service in services.values():
service.save(settings)

118
services/CommandService.py Normal file
View File

@ -0,0 +1,118 @@
from obspython import *
from services.Service import Service
import os.path
import random
import re
KNOWN_COMMANDS = ["cmd"]
SEPARATOR = ";"
SUBST_SEP = ".,"
PATTERN_ARGS = "\{[0-9]+\}"
class CommandService(Service):
title = "Command Service"
enabled = False
msg_cant_manage = ""
msg_unknown = ""
msg_wrong_arg_count = ""
file = ""
commands = {}
# Service -----
def knows(self, command):
return command.lower() in self.list_commands()
def eval(self, command, response, users):
if command == "cmd":
self.queue.append(self.manage(response))
else:
self.queue.append(self.custom(response["message"]))
def list_commands(self):
return KNOWN_COMMANDS + list(self.commands.keys())
def start(self):
if self.file == "" or not os.path.isfile(self.file):
return
with open(self.file, "r", encoding="utf-8") as file:
content = file.read().splitlines()
for line in content:
self.load_command(line)
def load_command(self, line):
components = line.split(SEPARATOR)
self.commands[components[0].lower()] = components[1].replace(SUBST_SEP, SEPARATOR).split("||")
# OBS subset -----
def create_properties(self):
props = obs_properties_create()
obs_properties_add_path(props, "cmd_file", "Commands list", OBS_PATH_FILE, "Text file (*.txt)", None)
obs_properties_add_text(props, "cmd_msg_cant_manage", "Message: Unauthorized", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "cmd_msg_unknown", "Message: Unknown command", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "cmd_msg_wrong_arg_count", "Message: Wrong argument count", OBS_TEXT_DEFAULT)
return props
def update(self, settings):
self.file = obs_data_get_string(settings, "cmd_file")
# Commands -----
def manage(self, response):
can_manage = False
for badge in response["tags"]["badges"]:
if badge["name"] in ["broadcaster", "moderator"]:
can_manage = True
if not can_manage:
return f"{self.msg_cant_manage}, @{response['username']}"
components = response["message"].split(" ", 3)
if len(components) < 3:
return f"{self.msg_wrong_arg_count}, @{response['username']}"
command, action, target_command = components[0], components[1], components[2].lower()
if len(components) == 4:
args = components[3]
if action in ["add", "edit", "update"]:
return self.add_command(target_command, args) + f", @{response['username']}"
elif action in ["del", "delete", "remove"]:
return self.del_command(target_command) + f", @{response['username']}"
else:
return f"{self.msg_unknown}, @{response['username']}"
def add_command(self, command, args):
self.commands[command.lstrip("!")] = args.split("||")
self.write()
return f"La commande '!{command}' a été ajoutée"
def del_command(self, command):
if command not in self.commands:
return f"La commande '!{command}' n'existe pas"
self.commands.pop(command)
self.write()
return f"La commande '!{command}' a été supprimée"
def write(self):
with open(self.file, "w", encoding="utf-8") as file:
for name in self.commands:
file.write(f"{name}{SEPARATOR}{'||'.join(self.commands[name]).replace(SEPARATOR, SUBST_SEP)}\n")
def custom(self, message):
components = message.split(" ", 1)
command_name = components[0][1:].lower()
command = self.commands[command_name][random.randint(0, len(self.commands[command_name]) - 1)]
args = list(dict.fromkeys(re.findall(PATTERN_ARGS, command)))
args.sort()
count = len(args)
if count >= 1:
msg_components = []
if len(components) > 1:
msg_components = components[1].split(" ")
if len(msg_components) != count:
return self.msg_wrong_arg_count
for i in range(0, count):
command = command.replace(args[i], msg_components[i].lstrip("@"))
return command

71
services/DiceService.py Normal file
View File

@ -0,0 +1,71 @@
from obspython import *
from services.Service import Service
import random
import re
KNOWN_COMMANDS = ["lastrolls", "clearrolls"]
USER_KEY = "last_dice_roll"
PATTERN_DICE = "^([0-9]+)d([0-9]+)$"
class DiceService(Service):
title = "Dice Service"
enabled = False
msg_cleared = ""
# Service -----
def knows(self, command):
return command in KNOWN_COMMANDS or re.match(PATTERN_DICE, command)
def eval(self, command, response, users):
if command == "lastrolls":
self.queue.append(self.get_last_rolls(users))
elif command == "clearrolls":
self.queue.append(self.clear_rolls(users) + f", @{response['username']}")
else:
m = re.match(PATTERN_DICE, command)
dices = self.roll_dices(int(m.group(1)), int(m.group(2)))
self.save_last_roll(users, response["username"], dices)
self.queue.append(f"🎲 {response['username']}: {' + '.join(dices)} 🎲")
def list_commands(self):
return ["<n>d<f> (n=nombre de dés, f=nombre de faces)"] + KNOWN_COMMANDS
# OBS subset -----
def create_properties(self):
props = obs_properties_create()
obs_properties_add_text(props, "dice_msg_cleared", "Message: rolls cleared", OBS_TEXT_DEFAULT)
return props
def update(self, settings):
self.msg_cleared = obs_data_get_string(settings, "dice_msg_cleared")
# Commands -----
def get_last_rolls(self, users):
results = []
for user in users:
if USER_KEY in users[user]:
results.append(f"{user}: {' + '.join(users[user][USER_KEY])}")
return " 🎲".join(results)
def clear_rolls(self, users):
for username in users:
if USER_KEY in users[username]:
users[username].pop(USER_KEY)
return self.msg_cleared
def roll_dices(self, number, faces):
dices = []
for i in range(0, number):
dices.append(str(random.randint(1, faces)))
return (dices)
def save_last_roll(self, users, username, dices):
if username not in users:
users[username] = {}
users[username][USER_KEY] = dices

42
services/Service.py Normal file
View File

@ -0,0 +1,42 @@
from abc import ABC, abstractmethod
from obspython import *
class Service(ABC):
queue = None
def set_message_queue(self, queue):
self.queue = queue
@abstractmethod
def knows(self, command):
pass
@abstractmethod
def eval(self, command, response, user):
pass
@abstractmethod
def list_commands(self):
pass
def start(self):
pass
def stop(self):
pass
# OBS subset -----
def load(self, settings):
pass
def create_properties(self):
props = obs_properties_create()
return props
def update(self, settings):
pass
def save(self, settings):
pass

163
services/SpotifyService.py Normal file
View File

@ -0,0 +1,163 @@
from obspython import *
from services.Service import Service
from utils.OAuth2 import OAuth2
import base64
import requests
KNOWN_COMMANDS = ["addsong", "dellast", "playlist"]
USER_KEY = "last_song_added"
AUTH_URL = "https://accounts.spotify.com"
API_URL = "https://api.spotify.com/v1"
SPOTIFY_SCOPES = "playlist-modify-public playlist-modify-private user-read-private"
class SpotifyService(OAuth2, Service):
title = "Spotify Service"
enabled = False
msg_missing_args = ""
msg_not_found = ""
msg_not_added = ""
msg_no_song = ""
msg_deleted = ""
code_url = f"{AUTH_URL}/authorize"
authorization_url = f"{AUTH_URL}/api/token"
refresh_url = f"{AUTH_URL}/api/token"
scope = SPOTIFY_SCOPES
playlist = ""
msg_not_deleted = ""
# OAuth -----
def get_client_headers(self):
return base64.b64encode(bytes(f"{self.client_id}:{self.client_secret}", "ascii")).decode("ascii")
# Service -----
def knows(self, command):
return command in KNOWN_COMMANDS
def eval(self, command, response, users):
if command == "addsong":
user = response["username"]
components = response["message"].split(" ", 1)
if len(components) != 2:
self.queue.append(f"🎵 {self.msg_missing_args}, @{user}🎵")
return
query = components[1]
self.queue.append(self.add_song(user, query, users))
elif command == "dellast":
self.queue.append(self.del_last(response["username"], users))
elif command == "playlist":
self.queue.append(self.get_playlist_url())
def list_commands(self):
return KNOWN_COMMANDS
def start(self):
if self.refresh_token == "":
self.get_authorization()
# OBS subset -----
def load(self, settings):
self.refresh_token = obs_data_get_string(settings, "spotify_token")
def create_properties(self):
props = obs_properties_create()
obs_properties_add_text(props, "spotify_playlist_id", "Playlist ID", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_client_id", "Client ID", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_client_secret", "Client secret", OBS_TEXT_PASSWORD)
token = obs_properties_add_text(props, "spotify_token", "Refresh token", OBS_TEXT_DEFAULT)
obs_property_set_visible(token, False)
obs_properties_add_text(props, "spotify_msg_missing_args", "Message: Missing arguments", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_msg_not_found", "Message: Song not found", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_msg_not_added", "Message: Song not added", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_msg_no_song", "Message: No song added", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_msg_deleted", "Message: Song deleted", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "spotify_msg_not_deleted", "Message: Song not deleted", OBS_TEXT_DEFAULT)
return props
def update(self, settings):
self.playlist = obs_data_get_string(settings, "spotify_playlist_id")
self.set_credentials(
obs_data_get_string(settings, "spotify_client_id"),
obs_data_get_string(settings, "spotify_client_secret")
)
if obs_data_get_string(settings, "spotify_token") != "":
self.set_refresh_token(obs_data_get_string(settings, "spotify_token"))
self.msg_missing_args = obs_data_get_string(settings, "spotify_msg_missing_args")
self.msg_not_found = obs_data_get_string(settings, "spotify_msg_not_found")
self.msg_not_added = obs_data_get_string(settings, "spotify_msg_not_added")
self.msg_no_song = obs_data_get_string(settings, "spotify_msg_no_song")
self.msg_deleted = obs_data_get_string(settings, "spotify_msg_deleted")
self.msg_not_deleted = obs_data_get_string(settings, "spotify_msg_not_deleted")
def save(self, settings):
obs_data_set_string(settings, "spotify_token", self.refresh_token)
# Commands -----
def add_song(self, user, query, users):
if self.has_expired():
self.refresh()
track = self.search(query)
if track is None:
return f"🎵 {self.msg_not_found} @{user} 🎵"
if self.add_to_playlist(track):
artist = track["artists"][0]["name"]
title = track["name"]
if user not in users:
users[user] = {}
users[user][USER_KEY] = track["uri"]
return f"🎵 Le morceau '{title}' de {artist} a été ajouté 🎵"
else:
return f"🎵 {self.msg_not_added}, @{user} 🎵"
def search(self, query):
track = None
response = requests.get(
f"{API_URL}/search",
params={
"q": query,
"type": "track",
"limit": 1
},
headers={"Authorization": f"Bearer {self.access_token}"}
)
if response.ok and response.json()["tracks"]["total"] > 0:
track = response.json()["tracks"]["items"][0]
return track
def add_to_playlist(self, track):
response = requests.post(
f"{API_URL}/playlists/{self.playlist}/tracks",
json={"uris": [track["uri"]]},
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
)
return response.ok
def del_last(self, user, users):
if user not in users or USER_KEY not in users[user]:
return f"🎵 {self.msg_no_song}, @{user} 🎵"
if self.has_expired():
self.refresh()
response = requests.delete(
f"{API_URL}/playlists/{self.playlist}/tracks",
json={"tracks": [{"uri": users[user][USER_KEY]}]},
headers={
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json"
}
)
if response.ok:
users[user].pop(USER_KEY)
return f"🎵 {self.msg_deleted if response.ok else self.msg_not_deleted}, @{user} 🎵"
def get_playlist_url(self):
return f"🎵 https://open.spotify.com/playlist/{self.playlist} 🎵"

View File

@ -0,0 +1,148 @@
from datetime import datetime, timedelta
from obspython import *
from services.Service import Service
from threading import Thread
import os.path
import time
KNOWN_COMMANDS = ["addmsg", "delmsg", "listmsg"]
SEPARATOR = ";"
SUBST_SEP = ".,"
class TimedMessageService(Service):
title = "Timed Message Service"
enabled = False
msg_added = ""
msg_deleted = ""
msg_not_deleted = ""
file = ""
messages = {}
interval = 60
running = False
def knows(self, command):
return command in KNOWN_COMMANDS
def eval(self, command, response, users):
can_manage = False
for badge in response["tags"]["badges"]:
if badge["name"] in ["broadcaster", "moderator"]:
can_manage = True
if not can_manage:
return
if command == "addmsg":
self.queue.append(self.add(response))
elif command == "delmsg":
self.queue.append(self.delete(response))
elif command == "listmsg":
self.queue.append(self.list(response))
def list_commands(self):
return KNOWN_COMMANDS
def start(self):
self.load_file()
self.init_time()
self.running = True
thread = Thread(target=self.dispatch)
thread.daemon = True
thread.start()
def load_file(self):
if self.file == "" or not os.path.isfile(self.file):
return
with open(self.file, "r", encoding="utf-8") as file:
content = file.read().splitlines()
for line in content:
self.load_message(line)
def load_message(self, line):
components = line.split(SEPARATOR)
starting_time = datetime.now()
self.messages[components[0]] = {
"interval": int(components[1]),
"next_time": "",
"message": components[2].replace(SUBST_SEP, SEPARATOR)
}
def init_time(self):
starting_time = datetime.now()
times = []
for msg_id in self.messages:
new_time = starting_time + timedelta(minutes = self.messages[msg_id]["interval"])
formatted = new_time.strftime("%H:%M")
while formatted in times:
new_time += timedelta(minutes = 3)
formatted = new_time.strftime("%H:%M")
times.append(formatted)
self.messages[msg_id]["next_time"] = formatted
def stop(self):
self.running = False
# OBS subset -----
def create_properties(self):
props = obs_properties_create()
obs_properties_add_path(props, "tmsg_file", "Messages list", OBS_PATH_FILE, "Text file (*.txt)", None)
obs_properties_add_text(props, "tmsg_msg_added", "Message: Message added", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "tmsg_msg_deleted", "Message: Message deleted", OBS_TEXT_DEFAULT)
obs_properties_add_text(props, "tmsg_msg_not_deleted", "Message: Message not deleted", OBS_TEXT_DEFAULT)
return props
def update(self, settings):
self.file = obs_data_get_string(settings, "tmsg_file")
self.msg_added = obs_data_get_string(settings, "tmsg_msg_added")
self.msg_deleted = obs_data_get_string(settings, "tmsg_msg_deleted")
self.msg_not_deleted = obs_data_get_string(settings, "tmsg_msg_not_deleted")
# Commands -----
def add(self, response):
current_time = datetime.now()
components = response["message"].split(" ", 3)
print(components)
self.messages[components[1]] = {
"interval": int(components[2]),
"next_time": "",
"message": components[3],
}
next_time = current_time + timedelta(minutes = self.messages[components[1]]["interval"])
self.messages[components[1]]["next_time"] = next_time.strftime("%H:%M")
self.write()
return f"{self.msg_added}"
def delete(self, response):
msg_id = response["message"].split(" ", 2)[1]
if msg_id not in self.messages:
return f"{self.msg_not_deleted}"
self.messages.pop(msg_id)
self.write()
return f"{self.msg_deleted}"
def list(self, response):
return " ".join(self.messages)
def write(self):
with open(self.file, "w", encoding="utf-8") as file:
for msg_id, message in self.messages.items():
file.write(f"{msg_id}{SEPARATOR}{message['interval']}{SEPARATOR}{message['message'].replace(SEPARATOR, SUBST_SEP)}\n");
# Main function -----
def dispatch(self):
while self.running:
current_time = datetime.now()
formatted = current_time.strftime("%H:%M")
for msg_id in self.messages:
if self.messages[msg_id]["next_time"] != formatted:
continue
self.queue.append(self.messages[msg_id]["message"])
new_time = current_time + timedelta(minutes = self.messages[msg_id]["interval"])
self.messages[msg_id]["next_time"] = new_time.strftime("%H:%M")
break
time.sleep(self.interval)

47
utils/Irc.py Normal file
View File

@ -0,0 +1,47 @@
import select
import socket
DEFAULT_PORT = 6667
TIMEOUT = 1
BUFFER_SIZE = 2048
ENCODING = "utf-8"
class Irc:
socket = None
def connect(self, server, port = DEFAULT_PORT):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.socket.connect((server, port))
def identify(self, nickname, password):
self.send_raw(f"PASS {password}")
self.send_raw(f"NICK {nickname}")
def join(self, channel):
self.send_raw(f"JOIN {channel}")
def cap_req(self, cap):
self.send_raw(f"CAP REQ :{cap}")
def receive(self):
rlist, wlist, elist = select.select([self.socket], [], [], TIMEOUT)
response = None
if rlist:
try:
response = self.socket.recv(BUFFER_SIZE).decode(ENCODING)
if response.find("PING") != -1:
self.send_raw(f"PONG {response.split()[1]}")
except:
pass
return response
def send(self, channel, msg):
self.send_raw(f"PRIVMSG {channel} :{msg}")
def send_raw(self, msg):
self.socket.send(f"{msg}\n".encode(ENCODING))
def disconnect(self):
self.socket.shutdown(socket.SHUT_RDWR)
self.socket.close()

12
utils/MessageQueue.py Normal file
View File

@ -0,0 +1,12 @@
class MessageQueue:
messages = []
def append(self, message):
self.messages.append(message)
def shift(self):
return self.messages.pop(0)
def pending(self):
return len(self.messages) > 0

121
utils/OAuth2.py Normal file
View File

@ -0,0 +1,121 @@
import requests
import socket
import time
import webbrowser
BUFFER_SIZE = 2048
ENCODING = "utf-8"
HTML_RESPONSE = """
<!doctype>
<html>
<head>
<title>Authorization granted</title>
<script>window.close();</script>
</head>
<body>
<h1>Authorization granted</h1>
<p>You can close this window.</p>
</body>
</html>
"""
REDIRECT_HOST = "localhost"
REDIRECT_PORT = 8000
REDIRECT_URI = f"http://{REDIRECT_HOST}:{str(REDIRECT_PORT)}"
class OAuth2:
code_url = ""
authorization_url = ""
refresh_url = ""
client_id = ""
client_secret = ""
scope = ""
access_token = ""
refresh_token = ""
grant_type = ""
expiration_date = 0
def set_credentials(self, client_id, client_secret):
self.client_id = client_id
self.client_secret = client_secret
def set_refresh_token(self, refresh_token):
self.refresh_token = refresh_token
def get_authorization(self):
params = {
"client_id": self.client_id,
"redirect_uri": REDIRECT_URI,
"response_type": "code",
"scope": self.scope.replace(" ", "%20")
}
url = f"{self.code_url}?{'&'.join([f'{key}={value}' for key, value in params.items()])}"
webbrowser.open(url, 2)
response = self.wait_for_code()
return self.authorize(response["code"])
def wait_for_code(self):
httpd = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
httpd.bind((REDIRECT_HOST, REDIRECT_PORT))
httpd.listen()
waiting = True
while waiting:
client, addr = httpd.accept()
response = client.recv(BUFFER_SIZE).decode(ENCODING)
if "GET" in response and "code=" in response:
client.send(bytes("HTTP/1.0 200 OK\n", ENCODING))
client.send(bytes("Content-Type: text/html\n", ENCODING))
client.send(bytes("\n", ENCODING))
client.send(bytes(HTML_RESPONSE, ENCODING))
client.close()
waiting = False
httpd.close()
return self.parse_code_response(response)
def parse_code_response(self, response_str):
response = {}
for param in response_str.split("\n")[0].split()[1][2:].split("&"):
name, value = param.split("=")
response[name] = value
return response
def authorize(self, code):
response = requests.post(
self.authorization_url,
data={
"grant_type": "authorization_code",
"code": code,
"redirect_uri": REDIRECT_URI
},
headers={"Authorization": f"Basic {self.get_client_headers()}"}
)
if not response.ok:
return
self.access_token = response.json()["access_token"]
self.refresh_token = response.json()["refresh_token"]
self.token_type = response.json()["token_type"]
self.expiration_date = time.time() + response.json()["expires_in"]
return self.refresh_token
def refresh(self):
response = requests.post(
self.refresh_url,
data={
"grant_type": "refresh_token",
"refresh_token": self.refresh_token
},
headers={"Authorization": f"Basic {self.get_client_headers()}"}
)
if not response.ok:
return
self.access_token = response.json()["access_token"]
self.token_type = response.json()["token_type"]
self.expiration_date = time.time() + response.json()["expires_in"]
def get_client_headers(self):
pass
def has_expired(self):
return time.time() > self.expiration_date