From 1e2cf95bee43137980d2ed8074fb42007858649b Mon Sep 17 00:00:00 2001 From: Elijah Lazkani Date: Sat, 9 Sep 2017 00:29:55 -0400 Subject: [PATCH] Changes of behaviour of how the Admin plugin handles the events This is a big commit: - Now the Admin class will not take any action only triggers an ADMIN event that can be consumed later by AdminCmd - AdminCmd now handles the commands allowed to the admins in a sane and clean way by consuming ADMIN events - Fixed the way the code exits, now it exits the loop cleanly everytime - Added a lot of functions to the Admin class which allows adding and removing administrators on the fly - Added a list of functions that will reply back a help page to the user --- README.md | 6 +- boots/admin.py | 276 +++++++++++++++++++++++++++++++--------- boots/admin_commands.py | 68 ++++++++++ boots/boots.py | 45 ++++--- boots/eightball.py | 2 +- boots/robot.py | 9 +- 6 files changed, 327 insertions(+), 79 deletions(-) create mode 100644 boots/admin_commands.py diff --git a/README.md b/README.md index 6454326..c639b40 100644 --- a/README.md +++ b/README.md @@ -15,4 +15,8 @@ It will only answer to the `keyword` provided to the `Eightball` object. # Admin -The `Admin` module provides the ability to add bot administrators who can request some destructive actions that might put the bot itself being abused like `join`, `part` and `quit`. \ No newline at end of file +The `Admin` module provides the ability to add bot administrators who can request some destructive actions that might put the bot itself being abused like `join`, `part` and `quit`. + +# TODO + +- A better way to handle help pages \ No newline at end of file diff --git a/boots/admin.py b/boots/admin.py index 888bc54..eea7dfc 100644 --- a/boots/admin.py +++ b/boots/admin.py @@ -1,8 +1,5 @@ -import types import logging import re -import asyncio -import functools import robot @@ -10,7 +7,13 @@ class Admin: """ Admin class that will keep user access to the bot """ - LOGIN = re.compile(r'^LOGIN\s+([^\s]+)\s+([^\s]+)\s*$') + HELP = re.compile(r'^help.*$') + HELP_CMD = re.compile(r'^help\s+([a-zA-Z]+)?$') + LOGIN = re.compile(r'^login\s+([a-zA-Z][a-zA-Z0-9]+)\s+([^\s]+)\s*$') + LOGOUT = re.compile(r'^logout.*$') + ADD = re.compile(r'^add\s+([a-zA-Z][a-zA-Z0-9]+)\s+([^\s]+)\s*$') + RM = re.compile(r'^rm\s+([a-zA-Z][a-zA-Z0-9]+)\s*$') + LIST = re.compile(r'^list.*$') def __init__(self, client: robot.Bot): self.client = client @@ -18,7 +21,7 @@ class Admin: self.services = {} self.logger = logging.getLogger() self.logger.debug("Initializing...") - client.on("privmsg")(self._handle) + client.on("PRIVMSG")(self._handle) def add_admin(self, user: str, password: str) -> None: """ @@ -33,90 +36,243 @@ class Admin: self.admins[user]['logged_in'] = False self.admins[user]['logged_in_hostname'] = None self.admins[user]['logged_in_nick'] = None - self.admins[user]['LOGIN'] = re.compile(r'^LOGIN\s+({})\s+({})\s*$'.format(user, password)) + self.admins[user]['LOGIN'] = re.compile(r'^login\s+({})\s+({})\s*$'.format(user, password)) + + def rm_admin(self, user: str) -> None: + """ + This method will delete a `user` key from the dictionary + which reprisents the list of admins + + :param user str: the key user to delete from the list of admins + :return: None + """ + del self.admins[user] def _handle(self, nick: str, target: str, message: str, **kwargs: dict) -> None: """ - client callback on event trigger + Admin handler, this will check if the user is asking actions from the bot + and triggers an ADMIN event that can be consumed later :param nick str: nickname of the caller - :param target str: nickname to reply to - :param message str: event hooks + :param target str: location the message was sent to + :param message str: messages coming from the `nick` :param kwargs dict: for API compatibility :return: None """ self.logger.debug("nick={} target={} message={}".format(nick, target, message)) self.logger.debug("{}".format(kwargs)) - _user = kwargs['user'] - _host = kwargs['host'] - match = self.LOGIN.match(message) - self.logger.debug("match={}".format(match)) - if match and target == self.client.nick: + + # Set the admin flag + if self.is_admin(nick, kwargs['host']): + kwargs['is_admin'] = True + else: + kwargs['is_admin'] = False + + if self.HELP.match(message): + self.admin_help(nick, target, message, **kwargs) + elif self.LOGIN.match(message): + self.log_in(nick, target, message, **kwargs) + elif self.LOGOUT.match(message): + self.log_out(nick, target, **kwargs) + elif self.ADD.match(message): + self.admin_add(nick, target, message, **kwargs) + elif self.RM.match(message): + self.admin_rm(nick, target, message, **kwargs) + elif self.LIST.match(message): + self.admin_list(nick, target, **kwargs) + + kwargs['nick'] = nick + kwargs['target'] = target + kwargs['message'] = message + + self.client.trigger('ADMIN', **kwargs) + + def admin_help(self, nick: str, target: str, message: str, **kwargs: dict) -> None: + """ + This method will reply back to the user a help manual of the available commands + > help + >> help [command] + >> commands: login logout add rm list + > + + and + + > help login + >> login - Login as an admin with your account + > + + :param nick str: the nickname of the caller + :param target str: the target where the message was sent to + :param message str: the message sent to the target + :param kwargs dict: for API compatibility + :return: None + """ + if target == self.client.nick: + if kwargs.get("is_admin", None) is True: + match_help = self.HELP.match(message) + match_help_cmd = self.HELP_CMD.match(message) + kwargs['target'] = nick + if match_help_cmd: + if len(match_help_cmd.groups()) == 1: + if match_help_cmd.group(1) == 'login': + kwargs['message'] = "login - Login as an admin with your account" + self.client.send("PRIVMSG", **kwargs) + if match_help_cmd.group(1) == 'logout': + kwargs['message'] = "logout - Log out from your account" + self.client.send("PRIVMSG", **kwargs) + if match_help_cmd.group(1) == 'add': + kwargs['message'] = "add - adds an admin account to the list of admins" + self.client.send("PRIVMSG", **kwargs) + if match_help_cmd.group(1) == 'rm': + kwargs['message'] = "rm - removes an admin from the list of admins" + self.client.send("PRIVMSG", **kwargs) + if match_help_cmd.group(1) == 'list': + kwargs['message'] = "list - lists all the admins" + self.client.send("PRIVMSG", **kwargs) + elif match_help: + kwargs['message'] = "help [command]" + self.client.send("PRIVMSG", **kwargs) + kwargs['message'] = "commands: login logout add rm list" + self.client.send("PRIVMSG", **kwargs) + + def log_in(self, nick: str, target: str, message:str , **kwargs: dict) -> None: + """ + This method is called when a user attempts to login to the bot. + It will mark the user as logged on on successful authentication + and save the information for later use. + + Replies back to the user that they have logged in successfully + only on successful login + + :param nick str: the nickname of the caller + :param target str: location the message was sent to + :param message str: message coming from `nick` + :param kwargs dict: for API compatibility + :return: None + """ + if target == self.client.nick: + match = self.LOGIN.match(message) user = self.admins.get(match.group(1), None) if user: - _login = user.get('LOGIN', None) - if _login: - login_match = _login.match(message) - if login_match: - self.admins[match.group(1)]['logged_in'] = True - self.admins[match.group(1)]['logged_in_nick'] = str(_user) - self.admins[match.group(1)]['logged_in_hostname'] = str(_host) - kwargs['target'] = nick - kwargs['message'] = "{}, you are logged in successfully".format(nick) - self.logger.debug("kwargs={}".format(kwargs)) - self.client.send("PRIVMSG", **kwargs) - return + login = user['LOGIN'] + if login.match(message): + self.admins[match.group(1)]['logged_in'] = True + self.admins[match.group(1)]['logged_in_nick'] = str(nick) + self.admins[match.group(1)]['logged_in_hostname'] = str(kwargs['host']) + kwargs['is_admin'] = True + kwargs['target'] = nick + kwargs['message'] = "{}, you are logged in successfully".format(nick) + self.logger.debug("LOGIN from kwargs: {}".format(kwargs)) + self.client.send("PRIVMSG", **kwargs) - for regex, (func, command) in self.services.items(): - match = regex.match(message) - if match: - if self.is_admin(_user, _host): - kwargs['channel'] = match.group(1) - self.logger.debug("command={} kwargs={}".format(command, kwargs)) - self.client.loop.create_task( - func(command, **kwargs)) - - def on_prvmsg(self, - special_char: chr, - command: str, - func: types.FunctionType = None, - **kwargs): + def log_out(self, nick: str, target: str, **kwargs: dict) -> None: """ - Decorator function for the admin plugin + This method is called when a user attempts to logout to the bot. + It will mark the user as logged out if they are the user logged + in. - :param command str: the command to answer to - :param func function: the function being decorated + Replies back to the user that they have been logged out. + + :param nick str: the nickname of the caller + :param target str: location where the message was sent to :param kwargs dict: for API compatibility - :return: function the function that called it + :return: None """ - self.logger.debug("sepcial_char={} command={} kwargs={}".format(special_char, command, kwargs)) - if func is None: - return functools.partial(self.on_prvmsg, special_char, command) + if target == self.client.nick: + if kwargs.get('is_admin', None) is True: + admin = self.is_admin(nick, kwargs['host']) + if admin: + self.admins[admin]['logged_in'] = False + self.admins[admin]['logged_in_nick'] = None + self.admins[admin]['logged_in_hostname'] = None + kwargs['is_admin'] = False + kwargs['target'] = nick + kwargs['message'] = "{}, you are logged out successfully".format(nick) + self.client.send("PRIVMSG", **kwargs) - wrapped = func - if not asyncio.iscoroutinefunction(wrapped): - wrapped = asyncio.coroutine(wrapped) + def admin_add(self, nick: str, target: str, message: str, **kwargs: dict) -> None: + """ + This method will add an administrator to the list of administrators only + if the administrator user exists - commands = command.split("|") + :param nick str: the nickname of the caller + :param target str: location where the message was sent to + :param message str: the message being sent to the target + :param kwargs dict: for API compatibility + :return: None + """ + if target == self.client.nick: + if kwargs.get('is_admin', None) is True: + match = self.ADD.match(message) + if len(match.groups()) == 2: + if self.admins.get(match.group(1), None) is None: + kwargs['target'] = nick + kwargs['message'] = "{} has been added...".format(match.group(1)) + self.add_admin(match.group(1), match.group(2)) + else: + kwargs['target'] = nick + kwargs['message'] = "{} has already been added...".format(match.group(1)) + self.client.send("PRIVMSG", **kwargs) - for command in commands: - compiled = re.compile(r'^{}{}\s*(.*)$'.format(re.escape(special_char), command)) - self.services[compiled] = (wrapped, command) + def admin_rm(self, nick: str, target: str, message: str, **kwags: dict) -> None: + """ + This method will remove an administrator from the list of administrators only + if the administrator user exists - return func + The caller will be notified either way + + :param nick str: the nickname of the caller + :param target str: location where the message was sent to + :param message str: the message being sent to the target + :param kwags dict: for API compatibility + :return: None + """ + if target == self.client.nick: + if kwags.get('is_admin', None) is True: + match = self.RM.match(message) + if len(match.groups()) == 1: + if self.admins.get(match.group(1), None) is None: + kwags['target'] = nick + kwags['message'] = "{} is not on the list...".format(match.group(1)) + else: + kwags['target'] = nick + kwags['message'] = "{} has been removed...".format(match.group(1)) + self.rm_admin(match.group(1)) + self.client.send("PRIVMSG", **kwags) + + def admin_list(self, nick: str, target: str, **kwargs: dict) -> None: + """ + This method returns to the admin the admin list + + :param nick str: the nickname of the caller + :param target str: location where the message was sent to + :param kwargs dict: for API compatibility + :return: None + """ + if target == self.client.nick: + if kwargs.get('is_admin', None) is True: + admins = "" + for key, _ in self.admins.items(): + admins = "{} {} ".format(admins, key) if admins else key + kwargs['target'] = nick + kwargs['message'] = "List of Administrators:" + self.client.send("PRIVMSG", **kwargs) + kwargs['message'] = admins + self.client.send("PRIVMSG", **kwargs) def is_admin(self, user: str, host: str): """ Method to check if the current user is an admin + and return the admin account :param user str: the user to check against :param host str: the host to check against - :return bool: if the user is an admin + :return str: account if admin in list of admins """ - for nick, value in self.admins.items(): + for admin, value in self.admins.items(): if value.get('logged_in', None) and \ value.get('logged_in_nick', None) == user and \ value.get('logged_in_hostname', None) == host: - self.logger.debug("{} is an admin".format(nick)) - return True - return False + self.logger.debug("{} is an admin".format(admin)) + return admin + return None diff --git a/boots/admin_commands.py b/boots/admin_commands.py new file mode 100644 index 0000000..f03d9dd --- /dev/null +++ b/boots/admin_commands.py @@ -0,0 +1,68 @@ +import types +import re +import functools +import logging +import asyncio +import robot +import admin + +class AdminCmd: + """ + This is AdminCmd class that consumes ADMIN events and parses them + into known actions defined by users + """ + def __init__(self, admin: admin.Admin, modifier: str = "!"): + self.admin = admin + self.client = admin.client + self.modifier = modifier + self.modifier_pattern = r'^{}([^\s]+).*$'.format(modifier) + self.services = {} + self.logger = logging.getLogger() + self.logger.debug("Initializing...") + admin.client.on("ADMIN")(self._handle) + + def _handle(self, target, message, **kwargs) -> None: + """ + client callback on event trigger + + :param nick str: the nickname of the caller + :param target str: the target the message was sent to + :param message str: the message sent to the target + :param kwargs dict: for API compatibility + :return: None + """ + test = kwargs.get('is_admin', None) + self.logger.debug("test {}".format(test)) + if bool(kwargs.get('is_admin', None)): + for regex, (func, pattern) in self.services.items(): + match = regex.match(message) + if match: + self.logger.debug("Calling the function that matched the regex {}".format(regex)) + split_msg = message.split(" ") + message = " ".join(split_msg[1:]) + self.client.loop.create_task( + func(target, message, **kwargs)) + + def on_command(self, + command: str, + func: types.FunctionType = None, + ** kwargs: dict) -> types.FunctionType: + """ + Decorator function for the administrator commands + + :param commant str: the command that the admins are allowed to do + :param func types.FunctionType: the function being decorated + :param kwargs dict: for API compatibility + :return: types.FunctionType the function that called it + """ + if func is None: + return functools.partial(self.on_command, command) + + wrapped = func + if not asyncio.iscoroutinefunction(wrapped): + wrapped = asyncio.coroutine(wrapped) + + compiled = re.compile(r'^{}{}\s*(.*)$'.format(re.escape(self.modifier), command)) + self.services[compiled] = (wrapped, command) + + return func diff --git a/boots/boots.py b/boots/boots.py index 31c9a70..61162fe 100644 --- a/boots/boots.py +++ b/boots/boots.py @@ -1,14 +1,43 @@ import logging +import asyncio import robot import eightball import admin +import admin_commands logging.basicConfig() logger = logging.getLogger() logger.setLevel(logging.DEBUG) +async def plugins(bot: robot.Bot): + magic_ball = eightball.EightBall(bot) + @magic_ball.on_keyword + async def ball(target, message, **kwargs): + bot.send("PRIVMSG", target=target, message=message) + + administrator = admin.Admin(bot) + administrator.add_admin("Armageddon", "12345") + + admin_cmd = admin_commands.AdminCmd(administrator) + + @admin_cmd.on_command("join") + def join(target, message, **kwargs): + bot.send("JOIN", channel=message) + + @admin_cmd.on_command("part") + def part(target, message, **kwargs): + bot.send("PART", channel=message) + + @admin_cmd.on_command("quit") + async def quit(target, message, **kwargs): + bot.send("QUIT", message=message) + await bot.disconnect() + # Exit the event loop cleanly + bot.loop.stop() + def main(): + host = 'eu.undernet.org' port = 6667 ssl = False @@ -17,24 +46,10 @@ def main(): channel = "#msaytbeh" bot = robot.Bot(host=host, port=port, ssl=ssl, nick=nick, channels=[channel]) - eightball = eightball.EightBall(bot) - @eightball.on_keyword - async def ball(target, message, **kwargs): - bot.send("PRIVMSG", target=target, message=message) - - admin = admin.Admin(bot) - admin.add_admin("Armageddon", "12345") - - @_admin.on_prvmsg(".", "join|part|quit") - async def login(command, **kwargs): - logger.debug("command={} kwargs={}".format(command, kwargs)) - if command: - bot.send(command, **kwargs) - - logger.info("Connecting to {}:{} as {}".format(host, port, nick)) bot.loop.create_task(bot.connect()) + asyncio.ensure_future(plugins(bot)) bot.loop.run_forever() bot.loop.close() diff --git a/boots/eightball.py b/boots/eightball.py index 2713038..91ee9ca 100644 --- a/boots/eightball.py +++ b/boots/eightball.py @@ -39,7 +39,7 @@ class EightBall: self.keyword_pattern = r'^{}\s+(.+)$'.format(keyword) self.logger = logging.getLogger() self.logger.debug("Initializing...") - client.on("privmsg")(self._handle) + client.on("PRIVMSG")(self._handle) def _handle(self, nick, target, message, **kwargs) -> None: """ diff --git a/boots/robot.py b/boots/robot.py index fe2e0fb..16675db 100644 --- a/boots/robot.py +++ b/boots/robot.py @@ -7,6 +7,7 @@ class Bot(bottom.Client): """ The Bot class adds features to the bottom.Client class """ + def __init__(self, host: str, port: int, @@ -47,7 +48,7 @@ class Bot(bottom.Client): """ self.send('NICK', nick=self.nick) self.send('USER', user=self.user, - realname=self.realname) + realname=self.realname) done, pending = await asyncio.wait( [self.wait("RPL_ENDOFMOTD"), @@ -60,4 +61,8 @@ class Bot(bottom.Client): for future in pending: future.cancel() for channel in self.channels: - self.send('JOIN', channel=channel) \ No newline at end of file + self.send('JOIN', channel=channel) + + async def on_disconnect(self, **kwargs: dict) -> None: + await self.disconnect() + self.loop.stop()