From 484e042e17a4bca423e8ef1cd607d28d523af6a0 Mon Sep 17 00:00:00 2001 From: kholo Date: Tue, 30 Jan 2024 21:36:36 +0100 Subject: [PATCH] First commit: New bot with message queue and some services --- ChatBot.py | 100 ++++++++++++++++++++ helpers/TwitchIrc.py | 83 ++++++++++++++++ obschatbot.py | 97 +++++++++++++++++++ services/CommandService.py | 118 +++++++++++++++++++++++ services/DiceService.py | 71 ++++++++++++++ services/Service.py | 42 ++++++++ services/SpotifyService.py | 163 ++++++++++++++++++++++++++++++++ services/TimedMessageService.py | 148 +++++++++++++++++++++++++++++ utils/Irc.py | 47 +++++++++ utils/MessageQueue.py | 12 +++ utils/OAuth2.py | 121 ++++++++++++++++++++++++ 11 files changed, 1002 insertions(+) create mode 100644 ChatBot.py create mode 100644 helpers/TwitchIrc.py create mode 100644 obschatbot.py create mode 100644 services/CommandService.py create mode 100644 services/DiceService.py create mode 100644 services/Service.py create mode 100644 services/SpotifyService.py create mode 100644 services/TimedMessageService.py create mode 100644 utils/Irc.py create mode 100644 utils/MessageQueue.py create mode 100644 utils/OAuth2.py diff --git a/ChatBot.py b/ChatBot.py new file mode 100644 index 0000000..c15977e --- /dev/null +++ b/ChatBot.py @@ -0,0 +1,100 @@ +from helpers.TwitchIrc import TwitchIrc +from utils.MessageQueue import MessageQueue + +class ChatBot: + + twitch_irc = TwitchIrc() + queue = MessageQueue() + + services = [] + users = {} + + msg_help = "" + msg_unknown_cmd = "" + msg_hi = "" + msg_bye = "" + + running = False + + # Settings ----- + + def set_twitch_irc_settings(self, nickname, password, channel): + self.twitch_irc.join(nickname, password, channel) + + def add_service(self, service): + service.set_message_queue(self.queue) + self.services.append(service) + + def clear_services(self): + self.services = [] + + def set_help_message(self, message): + self.msg_help = message + + def set_unknown_command_message(self, message): + self.msg_unknown_cmd = message + + def set_welcome_message(self, message): + self.msg_hi = message + + def set_farewell_message(self, message): + self.msg_bye = message + + # Commands ----- + def parse_response(self, response): + command = response["message"].split(" ", 1)[0][1:] + if command == "help": + self.display_help() + return + elif command == "stop": + self.stop_command(response["tags"]["badges"]) + return + result = None + for service in self.services: + if not service.knows(command): + continue + result = service.eval(command, response, self.users) + return + self.queue.append(f"{self.msg_unknown_cmd}, @{response['username']}") + + def display_help(self): + self.queue.append(f"{self.msg_help}{self.list_commands()}") + + def list_commands(self): + commands = ["help"] + for service in self.services: + commands.extend(service.list_commands()) + return f" !{' !'.join(commands)}" + + def stop_command(self, badges): + for badge in badges: + if badge["name"] == "broadcaster": + self.stop() + return + + # Listening ----- + + def start(self): + for service in self.services: + service.start() + self.twitch_irc.request_tags() + self.twitch_irc.start() + if self.msg_hi != "": + self.queue.append(self.msg_hi) + self.running = True + while self.running: + response = self.twitch_irc.receive() + if response and response["message"][:1] == "!": + self.parse_response(response) + while self.queue.pending(): + self.twitch_irc.send(self.queue.shift()) + + def stop(self): + if self.msg_bye != "": + self.queue.append(self.msg_bye) + while self.queue.pending(): + continue + self.running = False + self.twitch_irc.stop() + for service in self.services: + service.stop() diff --git a/helpers/TwitchIrc.py b/helpers/TwitchIrc.py new file mode 100644 index 0000000..baee688 --- /dev/null +++ b/helpers/TwitchIrc.py @@ -0,0 +1,83 @@ +from utils.Irc import Irc + +SERVER = "irc.twitch.tv" +PORT = 6667 +CAP_URI = "twitch.tv" + +class TwitchIrc: + + irc = Irc() + nickname = "" + password = "" + channel = "" + capabilities = {"membership": False, "tags": False, "commands": False} + + def join(self, nickname, password, channel): + self.nickname = nickname + self.password = password + self.channel = f"#{channel}" + + def request_membership(self): + self.capabilities["membership"] = True + + def request_tags(self): + self.capabilities["tags"] = True + + def request_commands(self): + self.capabilities["commands"] = True + + def start(self): + self.irc.connect(SERVER, PORT) + self.irc.identify(self.nickname, self.password) + self.irc.join(self.channel) + for cap, req in self.capabilities.items(): + if req: + self.irc.cap_req(f"{CAP_URI}/{cap}") + + def receive(self): + response = self.irc.receive() + if not response: + return + if "PRIVMSG" in response and self.channel in response: + return self.parse(response) + + def parse(self, response_str): + response = {} + components = response_str.split(" ", 4 if self.capabilities["tags"] else 3) + response["username"] = components[1 if self.capabilities["tags"] else 0].split("!")[0][1:] + response["message"] = components[4 if self.capabilities["tags"] else 3][1:].strip() + if self.capabilities["tags"]: + response["tags"] = self.parse_tags(components[0][1:]) + return response + + def parse_tags(self, tags_str): + tags = {} + for tag in tags_str.split(";"): + name, value = tag.split("=") + if name == "badge-info": + if value != "": + value = self.parse_badge(value) + else: + value = None + elif name in ["badges", "emotes"]: + value = value.split(",") + if name == "badges": + badges = [] + if value != [""]: + for badge in value: + badges.append(self.parse_badge(badge)) + value = badges + else: + value = [] + tags[name] = value + return tags + + def parse_badge(self, badge_str): + name, value = badge_str.split("/") + return {"name": name, "value": value} + + def send(self, msg): + self.irc.send(self.channel, msg) + + def stop(self): + self.irc.disconnect() diff --git a/obschatbot.py b/obschatbot.py new file mode 100644 index 0000000..4487101 --- /dev/null +++ b/obschatbot.py @@ -0,0 +1,97 @@ +from ChatBot import ChatBot +from obspython import * +from services.CommandService import CommandService +from services.DiceService import DiceService +from services.SpotifyService import SpotifyService +from services.TimedMessageService import TimedMessageService +from threading import Thread + +DESCRIPTION = "A Twitch chat bot that interacts with OBS and other services.\n\nby kholo" + +chatbot = ChatBot() + +services = { + "command": CommandService(), + "dice": DiceService(), + "spotify": SpotifyService(), + "tmsg": TimedMessageService() +} + +def set_services(): + chatbot.clear_services() + for name, service in services.items(): + if service.enabled: + print(f"- Enabling service {name}") + chatbot.add_service(service) + +# Callbacks ----- + +def start(props, prop): + print("[ STARTING THE BOT ]") + set_services() + thread = Thread(target=chatbot.start) + thread.daemon = True + thread.start() + print("Bot started") + +def stop(props, prop): + print("[ STOPPING THE BOT ]") + chatbot.stop() + print("Bot stopped") + +# Helpers ----- + +def create_messages_properties(): + props = obs_properties_create() + obs_properties_add_text(props, "msg_help", "Help message", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "msg_unknown_cmd", "Unknown command message", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "msg_hi", "Welcome message", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "msg_bye", "Farewell message", OBS_TEXT_DEFAULT) + return props + +def create_twitch_irc_properties(): + props = obs_properties_create() + obs_properties_add_text(props, "irc_nickname", "Nickname", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "irc_password", "OAuth password", OBS_TEXT_PASSWORD) + obs_properties_add_text(props, "irc_channel", "Channel", OBS_TEXT_DEFAULT) + return props + +# OBS functions ----- + +def script_description(): + return DESCRIPTION + +def script_load(settings): + for service in services.values(): + service.load(settings) + +def script_properties(): + props = obs_properties_create() + obs_properties_add_button(props, "start", "Start bot", start) + obs_properties_add_button(props, "stop", "Stop bot", stop) + obs_properties_add_group(props, "messages", "Default messages", OBS_GROUP_NORMAL, create_messages_properties()) + obs_properties_add_group(props, "irc", "Twitch IRC", OBS_GROUP_NORMAL, create_twitch_irc_properties()) + for name, service in services.items(): + obs_properties_add_group(props, name, service.title, OBS_GROUP_CHECKABLE, service.create_properties()) + return props + +def script_update(settings): + chatbot.set_help_message(obs_data_get_string(settings, "msg_help")) + chatbot.set_unknown_command_message(obs_data_get_string(settings, "msg_unknown_cmd")) + chatbot.set_welcome_message(obs_data_get_string(settings, "msg_hi")) + chatbot.set_farewell_message(obs_data_get_string(settings, "msg_bye")) + chatbot.set_twitch_irc_settings( + obs_data_get_string(settings, "irc_nickname"), + obs_data_get_string(settings, "irc_password"), + obs_data_get_string(settings, "irc_channel") + ) + for name, service in services.items(): + if obs_data_get_bool(settings, name): + service.enabled = True + service.update(settings) + else: + service.enabled = False + +def script_save(settings): + for service in services.values(): + service.save(settings) diff --git a/services/CommandService.py b/services/CommandService.py new file mode 100644 index 0000000..1f33303 --- /dev/null +++ b/services/CommandService.py @@ -0,0 +1,118 @@ +from obspython import * +from services.Service import Service +import os.path +import random +import re + +KNOWN_COMMANDS = ["cmd"] + +SEPARATOR = ";" +SUBST_SEP = ".," +PATTERN_ARGS = "\{[0-9]+\}" + +class CommandService(Service): + + title = "Command Service" + enabled = False + + msg_cant_manage = "" + msg_unknown = "" + msg_wrong_arg_count = "" + + file = "" + commands = {} + + # Service ----- + + def knows(self, command): + return command.lower() in self.list_commands() + + def eval(self, command, response, users): + if command == "cmd": + self.queue.append(self.manage(response)) + else: + self.queue.append(self.custom(response["message"])) + + def list_commands(self): + return KNOWN_COMMANDS + list(self.commands.keys()) + + def start(self): + if self.file == "" or not os.path.isfile(self.file): + return + with open(self.file, "r", encoding="utf-8") as file: + content = file.read().splitlines() + for line in content: + self.load_command(line) + + def load_command(self, line): + components = line.split(SEPARATOR) + self.commands[components[0].lower()] = components[1].replace(SUBST_SEP, SEPARATOR).split("||") + + # OBS subset ----- + + def create_properties(self): + props = obs_properties_create() + obs_properties_add_path(props, "cmd_file", "Commands list", OBS_PATH_FILE, "Text file (*.txt)", None) + obs_properties_add_text(props, "cmd_msg_cant_manage", "Message: Unauthorized", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "cmd_msg_unknown", "Message: Unknown command", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "cmd_msg_wrong_arg_count", "Message: Wrong argument count", OBS_TEXT_DEFAULT) + return props + + def update(self, settings): + self.file = obs_data_get_string(settings, "cmd_file") + + # Commands ----- + + def manage(self, response): + can_manage = False + for badge in response["tags"]["badges"]: + if badge["name"] in ["broadcaster", "moderator"]: + can_manage = True + if not can_manage: + return f"{self.msg_cant_manage}, @{response['username']}" + components = response["message"].split(" ", 3) + if len(components) < 3: + return f"{self.msg_wrong_arg_count}, @{response['username']}" + command, action, target_command = components[0], components[1], components[2].lower() + if len(components) == 4: + args = components[3] + if action in ["add", "edit", "update"]: + return self.add_command(target_command, args) + f", @{response['username']}" + elif action in ["del", "delete", "remove"]: + return self.del_command(target_command) + f", @{response['username']}" + else: + return f"{self.msg_unknown}, @{response['username']}" + + def add_command(self, command, args): + self.commands[command.lstrip("!")] = args.split("||") + self.write() + return f"La commande '!{command}' a été ajoutée" + + def del_command(self, command): + if command not in self.commands: + return f"La commande '!{command}' n'existe pas" + self.commands.pop(command) + self.write() + return f"La commande '!{command}' a été supprimée" + + def write(self): + with open(self.file, "w", encoding="utf-8") as file: + for name in self.commands: + file.write(f"{name}{SEPARATOR}{'||'.join(self.commands[name]).replace(SEPARATOR, SUBST_SEP)}\n") + + def custom(self, message): + components = message.split(" ", 1) + command_name = components[0][1:].lower() + command = self.commands[command_name][random.randint(0, len(self.commands[command_name]) - 1)] + args = list(dict.fromkeys(re.findall(PATTERN_ARGS, command))) + args.sort() + count = len(args) + if count >= 1: + msg_components = [] + if len(components) > 1: + msg_components = components[1].split(" ") + if len(msg_components) != count: + return self.msg_wrong_arg_count + for i in range(0, count): + command = command.replace(args[i], msg_components[i].lstrip("@")) + return command diff --git a/services/DiceService.py b/services/DiceService.py new file mode 100644 index 0000000..bab1521 --- /dev/null +++ b/services/DiceService.py @@ -0,0 +1,71 @@ +from obspython import * +from services.Service import Service +import random +import re + +KNOWN_COMMANDS = ["lastrolls", "clearrolls"] +USER_KEY = "last_dice_roll" + +PATTERN_DICE = "^([0-9]+)d([0-9]+)$" + +class DiceService(Service): + + title = "Dice Service" + enabled = False + + msg_cleared = "" + + # Service ----- + + def knows(self, command): + return command in KNOWN_COMMANDS or re.match(PATTERN_DICE, command) + + def eval(self, command, response, users): + if command == "lastrolls": + self.queue.append(self.get_last_rolls(users)) + elif command == "clearrolls": + self.queue.append(self.clear_rolls(users) + f", @{response['username']}") + else: + m = re.match(PATTERN_DICE, command) + dices = self.roll_dices(int(m.group(1)), int(m.group(2))) + self.save_last_roll(users, response["username"], dices) + self.queue.append(f"🎲 {response['username']}: {' + '.join(dices)} 🎲") + + def list_commands(self): + return ["d (n=nombre de dés, f=nombre de faces)"] + KNOWN_COMMANDS + + # OBS subset ----- + + def create_properties(self): + props = obs_properties_create() + obs_properties_add_text(props, "dice_msg_cleared", "Message: rolls cleared", OBS_TEXT_DEFAULT) + return props + + def update(self, settings): + self.msg_cleared = obs_data_get_string(settings, "dice_msg_cleared") + + # Commands ----- + + def get_last_rolls(self, users): + results = [] + for user in users: + if USER_KEY in users[user]: + results.append(f"{user}: {' + '.join(users[user][USER_KEY])}") + return " 🎲".join(results) + + def clear_rolls(self, users): + for username in users: + if USER_KEY in users[username]: + users[username].pop(USER_KEY) + return self.msg_cleared + + def roll_dices(self, number, faces): + dices = [] + for i in range(0, number): + dices.append(str(random.randint(1, faces))) + return (dices) + + def save_last_roll(self, users, username, dices): + if username not in users: + users[username] = {} + users[username][USER_KEY] = dices diff --git a/services/Service.py b/services/Service.py new file mode 100644 index 0000000..51c7400 --- /dev/null +++ b/services/Service.py @@ -0,0 +1,42 @@ +from abc import ABC, abstractmethod +from obspython import * + +class Service(ABC): + + queue = None + + def set_message_queue(self, queue): + self.queue = queue + + @abstractmethod + def knows(self, command): + pass + + @abstractmethod + def eval(self, command, response, user): + pass + + @abstractmethod + def list_commands(self): + pass + + def start(self): + pass + + def stop(self): + pass + + # OBS subset ----- + + def load(self, settings): + pass + + def create_properties(self): + props = obs_properties_create() + return props + + def update(self, settings): + pass + + def save(self, settings): + pass diff --git a/services/SpotifyService.py b/services/SpotifyService.py new file mode 100644 index 0000000..cddaa03 --- /dev/null +++ b/services/SpotifyService.py @@ -0,0 +1,163 @@ +from obspython import * +from services.Service import Service +from utils.OAuth2 import OAuth2 +import base64 +import requests + +KNOWN_COMMANDS = ["addsong", "dellast", "playlist"] +USER_KEY = "last_song_added" + +AUTH_URL = "https://accounts.spotify.com" +API_URL = "https://api.spotify.com/v1" +SPOTIFY_SCOPES = "playlist-modify-public playlist-modify-private user-read-private" + +class SpotifyService(OAuth2, Service): + + title = "Spotify Service" + enabled = False + + msg_missing_args = "" + msg_not_found = "" + msg_not_added = "" + msg_no_song = "" + msg_deleted = "" + + code_url = f"{AUTH_URL}/authorize" + authorization_url = f"{AUTH_URL}/api/token" + refresh_url = f"{AUTH_URL}/api/token" + scope = SPOTIFY_SCOPES + playlist = "" + msg_not_deleted = "" + + # OAuth ----- + + def get_client_headers(self): + return base64.b64encode(bytes(f"{self.client_id}:{self.client_secret}", "ascii")).decode("ascii") + + # Service ----- + + def knows(self, command): + return command in KNOWN_COMMANDS + + def eval(self, command, response, users): + if command == "addsong": + user = response["username"] + components = response["message"].split(" ", 1) + if len(components) != 2: + self.queue.append(f"🎵 {self.msg_missing_args}, @{user}🎵") + return + query = components[1] + self.queue.append(self.add_song(user, query, users)) + elif command == "dellast": + self.queue.append(self.del_last(response["username"], users)) + elif command == "playlist": + self.queue.append(self.get_playlist_url()) + + def list_commands(self): + return KNOWN_COMMANDS + + def start(self): + if self.refresh_token == "": + self.get_authorization() + + # OBS subset ----- + + def load(self, settings): + self.refresh_token = obs_data_get_string(settings, "spotify_token") + + def create_properties(self): + props = obs_properties_create() + obs_properties_add_text(props, "spotify_playlist_id", "Playlist ID", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_client_id", "Client ID", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_client_secret", "Client secret", OBS_TEXT_PASSWORD) + token = obs_properties_add_text(props, "spotify_token", "Refresh token", OBS_TEXT_DEFAULT) + obs_property_set_visible(token, False) + obs_properties_add_text(props, "spotify_msg_missing_args", "Message: Missing arguments", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_msg_not_found", "Message: Song not found", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_msg_not_added", "Message: Song not added", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_msg_no_song", "Message: No song added", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_msg_deleted", "Message: Song deleted", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "spotify_msg_not_deleted", "Message: Song not deleted", OBS_TEXT_DEFAULT) + return props + + def update(self, settings): + self.playlist = obs_data_get_string(settings, "spotify_playlist_id") + self.set_credentials( + obs_data_get_string(settings, "spotify_client_id"), + obs_data_get_string(settings, "spotify_client_secret") + ) + if obs_data_get_string(settings, "spotify_token") != "": + self.set_refresh_token(obs_data_get_string(settings, "spotify_token")) + self.msg_missing_args = obs_data_get_string(settings, "spotify_msg_missing_args") + self.msg_not_found = obs_data_get_string(settings, "spotify_msg_not_found") + self.msg_not_added = obs_data_get_string(settings, "spotify_msg_not_added") + self.msg_no_song = obs_data_get_string(settings, "spotify_msg_no_song") + self.msg_deleted = obs_data_get_string(settings, "spotify_msg_deleted") + self.msg_not_deleted = obs_data_get_string(settings, "spotify_msg_not_deleted") + + def save(self, settings): + obs_data_set_string(settings, "spotify_token", self.refresh_token) + + # Commands ----- + + def add_song(self, user, query, users): + if self.has_expired(): + self.refresh() + track = self.search(query) + if track is None: + return f"🎵 {self.msg_not_found} @{user} 🎵" + if self.add_to_playlist(track): + artist = track["artists"][0]["name"] + title = track["name"] + if user not in users: + users[user] = {} + users[user][USER_KEY] = track["uri"] + return f"🎵 Le morceau '{title}' de {artist} a été ajouté 🎵" + else: + return f"🎵 {self.msg_not_added}, @{user} 🎵" + + def search(self, query): + track = None + response = requests.get( + f"{API_URL}/search", + params={ + "q": query, + "type": "track", + "limit": 1 + }, + headers={"Authorization": f"Bearer {self.access_token}"} + ) + if response.ok and response.json()["tracks"]["total"] > 0: + track = response.json()["tracks"]["items"][0] + return track + + def add_to_playlist(self, track): + response = requests.post( + f"{API_URL}/playlists/{self.playlist}/tracks", + json={"uris": [track["uri"]]}, + headers={ + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json" + } + ) + return response.ok + + def del_last(self, user, users): + if user not in users or USER_KEY not in users[user]: + return f"🎵 {self.msg_no_song}, @{user} 🎵" + if self.has_expired(): + self.refresh() + response = requests.delete( + f"{API_URL}/playlists/{self.playlist}/tracks", + json={"tracks": [{"uri": users[user][USER_KEY]}]}, + headers={ + "Authorization": f"Bearer {self.access_token}", + "Content-Type": "application/json" + } + ) + if response.ok: + users[user].pop(USER_KEY) + return f"🎵 {self.msg_deleted if response.ok else self.msg_not_deleted}, @{user} 🎵" + + def get_playlist_url(self): + return f"🎵 https://open.spotify.com/playlist/{self.playlist} 🎵" diff --git a/services/TimedMessageService.py b/services/TimedMessageService.py new file mode 100644 index 0000000..9116f20 --- /dev/null +++ b/services/TimedMessageService.py @@ -0,0 +1,148 @@ +from datetime import datetime, timedelta +from obspython import * +from services.Service import Service +from threading import Thread +import os.path +import time + +KNOWN_COMMANDS = ["addmsg", "delmsg", "listmsg"] + +SEPARATOR = ";" +SUBST_SEP = ".," + +class TimedMessageService(Service): + + title = "Timed Message Service" + enabled = False + + msg_added = "" + msg_deleted = "" + msg_not_deleted = "" + + file = "" + messages = {} + interval = 60 + running = False + + def knows(self, command): + return command in KNOWN_COMMANDS + + def eval(self, command, response, users): + can_manage = False + for badge in response["tags"]["badges"]: + if badge["name"] in ["broadcaster", "moderator"]: + can_manage = True + if not can_manage: + return + if command == "addmsg": + self.queue.append(self.add(response)) + elif command == "delmsg": + self.queue.append(self.delete(response)) + elif command == "listmsg": + self.queue.append(self.list(response)) + + def list_commands(self): + return KNOWN_COMMANDS + + def start(self): + self.load_file() + self.init_time() + self.running = True + thread = Thread(target=self.dispatch) + thread.daemon = True + thread.start() + + def load_file(self): + if self.file == "" or not os.path.isfile(self.file): + return + with open(self.file, "r", encoding="utf-8") as file: + content = file.read().splitlines() + for line in content: + self.load_message(line) + + def load_message(self, line): + components = line.split(SEPARATOR) + starting_time = datetime.now() + self.messages[components[0]] = { + "interval": int(components[1]), + "next_time": "", + "message": components[2].replace(SUBST_SEP, SEPARATOR) + } + + def init_time(self): + starting_time = datetime.now() + times = [] + for msg_id in self.messages: + new_time = starting_time + timedelta(minutes = self.messages[msg_id]["interval"]) + formatted = new_time.strftime("%H:%M") + while formatted in times: + new_time += timedelta(minutes = 3) + formatted = new_time.strftime("%H:%M") + times.append(formatted) + self.messages[msg_id]["next_time"] = formatted + + def stop(self): + self.running = False + + # OBS subset ----- + + def create_properties(self): + props = obs_properties_create() + obs_properties_add_path(props, "tmsg_file", "Messages list", OBS_PATH_FILE, "Text file (*.txt)", None) + obs_properties_add_text(props, "tmsg_msg_added", "Message: Message added", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "tmsg_msg_deleted", "Message: Message deleted", OBS_TEXT_DEFAULT) + obs_properties_add_text(props, "tmsg_msg_not_deleted", "Message: Message not deleted", OBS_TEXT_DEFAULT) + return props + + def update(self, settings): + self.file = obs_data_get_string(settings, "tmsg_file") + self.msg_added = obs_data_get_string(settings, "tmsg_msg_added") + self.msg_deleted = obs_data_get_string(settings, "tmsg_msg_deleted") + self.msg_not_deleted = obs_data_get_string(settings, "tmsg_msg_not_deleted") + + # Commands ----- + + def add(self, response): + current_time = datetime.now() + components = response["message"].split(" ", 3) + print(components) + self.messages[components[1]] = { + "interval": int(components[2]), + "next_time": "", + "message": components[3], + } + next_time = current_time + timedelta(minutes = self.messages[components[1]]["interval"]) + self.messages[components[1]]["next_time"] = next_time.strftime("%H:%M") + self.write() + return f"{self.msg_added}" + + def delete(self, response): + msg_id = response["message"].split(" ", 2)[1] + if msg_id not in self.messages: + return f"{self.msg_not_deleted}" + self.messages.pop(msg_id) + self.write() + return f"{self.msg_deleted}" + + def list(self, response): + return " ".join(self.messages) + + def write(self): + with open(self.file, "w", encoding="utf-8") as file: + for msg_id, message in self.messages.items(): + file.write(f"{msg_id}{SEPARATOR}{message['interval']}{SEPARATOR}{message['message'].replace(SEPARATOR, SUBST_SEP)}\n"); + + # Main function ----- + + def dispatch(self): + while self.running: + current_time = datetime.now() + formatted = current_time.strftime("%H:%M") + for msg_id in self.messages: + if self.messages[msg_id]["next_time"] != formatted: + continue + self.queue.append(self.messages[msg_id]["message"]) + new_time = current_time + timedelta(minutes = self.messages[msg_id]["interval"]) + self.messages[msg_id]["next_time"] = new_time.strftime("%H:%M") + break + time.sleep(self.interval) diff --git a/utils/Irc.py b/utils/Irc.py new file mode 100644 index 0000000..a65902e --- /dev/null +++ b/utils/Irc.py @@ -0,0 +1,47 @@ +import select +import socket + +DEFAULT_PORT = 6667 +TIMEOUT = 1 +BUFFER_SIZE = 2048 +ENCODING = "utf-8" + +class Irc: + + socket = None + + def connect(self, server, port = DEFAULT_PORT): + self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.socket.connect((server, port)) + + def identify(self, nickname, password): + self.send_raw(f"PASS {password}") + self.send_raw(f"NICK {nickname}") + + def join(self, channel): + self.send_raw(f"JOIN {channel}") + + def cap_req(self, cap): + self.send_raw(f"CAP REQ :{cap}") + + def receive(self): + rlist, wlist, elist = select.select([self.socket], [], [], TIMEOUT) + response = None + if rlist: + try: + response = self.socket.recv(BUFFER_SIZE).decode(ENCODING) + if response.find("PING") != -1: + self.send_raw(f"PONG {response.split()[1]}") + except: + pass + return response + + def send(self, channel, msg): + self.send_raw(f"PRIVMSG {channel} :{msg}") + + def send_raw(self, msg): + self.socket.send(f"{msg}\n".encode(ENCODING)) + + def disconnect(self): + self.socket.shutdown(socket.SHUT_RDWR) + self.socket.close() diff --git a/utils/MessageQueue.py b/utils/MessageQueue.py new file mode 100644 index 0000000..b750041 --- /dev/null +++ b/utils/MessageQueue.py @@ -0,0 +1,12 @@ +class MessageQueue: + + messages = [] + + def append(self, message): + self.messages.append(message) + + def shift(self): + return self.messages.pop(0) + + def pending(self): + return len(self.messages) > 0 diff --git a/utils/OAuth2.py b/utils/OAuth2.py new file mode 100644 index 0000000..dc70408 --- /dev/null +++ b/utils/OAuth2.py @@ -0,0 +1,121 @@ +import requests +import socket +import time +import webbrowser + +BUFFER_SIZE = 2048 +ENCODING = "utf-8" + +HTML_RESPONSE = """ + + + + Authorization granted + + + +

Authorization granted

+

You can close this window.

+ + +""" + +REDIRECT_HOST = "localhost" +REDIRECT_PORT = 8000 +REDIRECT_URI = f"http://{REDIRECT_HOST}:{str(REDIRECT_PORT)}" + +class OAuth2: + + code_url = "" + authorization_url = "" + refresh_url = "" + client_id = "" + client_secret = "" + scope = "" + access_token = "" + refresh_token = "" + grant_type = "" + expiration_date = 0 + + def set_credentials(self, client_id, client_secret): + self.client_id = client_id + self.client_secret = client_secret + + def set_refresh_token(self, refresh_token): + self.refresh_token = refresh_token + + def get_authorization(self): + params = { + "client_id": self.client_id, + "redirect_uri": REDIRECT_URI, + "response_type": "code", + "scope": self.scope.replace(" ", "%20") + } + url = f"{self.code_url}?{'&'.join([f'{key}={value}' for key, value in params.items()])}" + webbrowser.open(url, 2) + response = self.wait_for_code() + return self.authorize(response["code"]) + + def wait_for_code(self): + httpd = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + httpd.bind((REDIRECT_HOST, REDIRECT_PORT)) + httpd.listen() + waiting = True + while waiting: + client, addr = httpd.accept() + response = client.recv(BUFFER_SIZE).decode(ENCODING) + if "GET" in response and "code=" in response: + client.send(bytes("HTTP/1.0 200 OK\n", ENCODING)) + client.send(bytes("Content-Type: text/html\n", ENCODING)) + client.send(bytes("\n", ENCODING)) + client.send(bytes(HTML_RESPONSE, ENCODING)) + client.close() + waiting = False + httpd.close() + return self.parse_code_response(response) + + def parse_code_response(self, response_str): + response = {} + for param in response_str.split("\n")[0].split()[1][2:].split("&"): + name, value = param.split("=") + response[name] = value + return response + + def authorize(self, code): + response = requests.post( + self.authorization_url, + data={ + "grant_type": "authorization_code", + "code": code, + "redirect_uri": REDIRECT_URI + }, + headers={"Authorization": f"Basic {self.get_client_headers()}"} + ) + if not response.ok: + return + self.access_token = response.json()["access_token"] + self.refresh_token = response.json()["refresh_token"] + self.token_type = response.json()["token_type"] + self.expiration_date = time.time() + response.json()["expires_in"] + return self.refresh_token + + def refresh(self): + response = requests.post( + self.refresh_url, + data={ + "grant_type": "refresh_token", + "refresh_token": self.refresh_token + }, + headers={"Authorization": f"Basic {self.get_client_headers()}"} + ) + if not response.ok: + return + self.access_token = response.json()["access_token"] + self.token_type = response.json()["token_type"] + self.expiration_date = time.time() + response.json()["expires_in"] + + def get_client_headers(self): + pass + + def has_expired(self): + return time.time() > self.expiration_date