A few changes from the last commit again that add great features

* Now list of administrators is saved to drive for the bot to
  load again on next start
* Administrators now have levels and default behaviours, for more
  info read the `README`
* For security purposes, now the passwords are hashed and handled as
  hashes
* Added the ability for the administrators to change their own
  password
This commit is contained in:
Elijah Lazkani 2017-09-09 20:21:07 -04:00
parent 1e2cf95bee
commit 75993f0d1a
5 changed files with 253 additions and 37 deletions

View file

@ -6,7 +6,9 @@ This project is an attempt to write a fully featured IRC bot. I am learning to w
If you are reading this then you are welcome to contribute code, reviews are a part of learning programming =) If you are reading this then you are welcome to contribute code, reviews are a part of learning programming =)
# Requirements # Requirements
This code only requires one dependency; [bottom](https://github.com/numberoverzero/bottom). This code only requires the following dependencies:
* [bottom](https://github.com/numberoverzero/bottom)
* [aiofiles](https://github.com/Tinche/aiofiles)
## 8ball ## 8ball
@ -17,6 +19,26 @@ It will only answer to the `keyword` provided to the `Eightball` object.
The `Admin` module provides the ability to add bot administrators who can request some destructive actions that might put the bot itself being abused like `join`, `part` and `quit`. The `Admin` module provides the ability to add bot administrators who can request some destructive actions that might put the bot itself being abused like `join`, `part` and `quit`.
## Commands
So far, the `Admin` module by default creates a new `pickle` database if one does not exist yet, otherwise it will attempt to import it.
The `Admin` module supports multiple commands to manage administrators. Currently, it supports `login`, `logout`, `passwd`, `add`, `rm` and `list`.
A rudimentary `help` system was written, this system needs to be replaced in the future with a system wide event driven help compiler of sorts.
## Behaviours
A few default behaviours to note about the `Admin` module.
* If no database exists, it will create one with a default username and password as `Admin` and `Pa$$w0rd`, respectively.
* The default `Admin` user is not a special user and can be deleted by a user of the same `level` only.
* An admin with `level` 1000 is owner and can add or remove any user of the same or lower access `level`.
* If a database exists, it will attempt to import it. If it fails, it will create a new database talked about in the previous point.
* Default behaviour is that an admin can add another admin with an access `level` lower than their own.
* Default behaviour is that an admin cannot remove another admin with equal or higher access `level` than their own.
* Default behaviour when an admin uses the `passwd` command to change their password, they will be immediately logged out on successful password change and will require to log back in to test that they set the right password.
# TODO # TODO
- A better way to handle help pages - A better way to handle help pages

View file

@ -1,5 +1,9 @@
import logging import os
import re import re
import logging
import pickle
import hashlib
import aiofiles
import robot import robot
@ -11,44 +15,161 @@ class Admin:
HELP_CMD = re.compile(r'^help\s+([a-zA-Z]+)?$') HELP_CMD = re.compile(r'^help\s+([a-zA-Z]+)?$')
LOGIN = re.compile(r'^login\s+([a-zA-Z][a-zA-Z0-9]+)\s+([^\s]+)\s*$') LOGIN = re.compile(r'^login\s+([a-zA-Z][a-zA-Z0-9]+)\s+([^\s]+)\s*$')
LOGOUT = re.compile(r'^logout.*$') LOGOUT = re.compile(r'^logout.*$')
ADD = re.compile(r'^add\s+([a-zA-Z][a-zA-Z0-9]+)\s+([^\s]+)\s*$') ADD = re.compile(r'^add\s+([a-zA-Z][a-zA-Z0-9]+)\s+([^\s]+)\s+([\d]+).*$')
RM = re.compile(r'^rm\s+([a-zA-Z][a-zA-Z0-9]+)\s*$') RM = re.compile(r'^rm\s+([a-zA-Z][a-zA-Z0-9]+)\s*$')
PASSWD = re.compile(r'^passwd\s+([^\s]+).*$')
LIST = re.compile(r'^list.*$') LIST = re.compile(r'^list.*$')
def __init__(self, client: robot.Bot): def __init__(self, client: robot.Bot, config: str = None):
self.client = client self.client = client
self.config = config or "./config/admin.pkl"
self.admins = {} self.admins = {}
self.services = {} self.services = {}
self.logger = logging.getLogger() self.logger = logging.getLogger()
self.logger.debug("Initializing...") self.logger.debug("Initializing...")
client.on("PRIVMSG")(self._handle) client.on("PRIVMSG")(self._handle)
def add_admin(self, user: str, password: str) -> None: async def init(self) -> None:
"""
This method is an async initializer for the Admin class
It will attempt to load the pickled database as configuration
:return: None
"""
await self.load_config()
@staticmethod
def hash(string: str) -> str:
"""
This method returns a hexadecimal string representation
of the sha512 hashing algorithm of the string provided to it
:param string str: The string to be hashed
:return: str the hexadecimal sha512
"""
return hashlib.sha512(string.encode()).hexdigest()
@staticmethod
def abspath(path: str) -> str:
"""
This method is essentially a wrapper around
os.path.abspath()
:param path str: the path we require the abspath for
:return: str the absolute path
"""
return os.path.abspath(path)
@staticmethod
def hash_pass(message: str) -> str:
"""
This method will take a string with a password at the end
and return the same string with the password hashed
:param message str: the message that includes the password
:return: str the message with the password hashed
"""
split_message = message.split(" ")
split_message[-1] = Admin.hash(split_message[-1])
return " ".join(split_message)
@staticmethod
def level_up(max: int, num: int) -> int:
"""
The method takes 2 numbers, a max and a num.
It handles level 1000 as a special case as they are the only
level that can do actions on other admins with the same level
It also ensures that no access above 1000 is given and will
always return 1000
Otherwise, if num is larger than max it will return max - 1
Otherwise, if num is negative it will return 1
Lastly, if none of those conditions are met, it will rimply
return num
:param max int: a maximum value num can reach
:param num int: a num provided by a randdom caller
:return: int integer to use as level
"""
if max == 1000 and num >= max:
return 1000
if num >= max:
return max - 1
if num < 0:
return 1
return num
async def load_config(self, config: str = None) -> None:
"""
This method will attempt to load the configuration from a pickled
database, otherwise it will return a new database with a default
configuration
:param config str: the path to the configuration
:return: None
"""
try:
async with aiofiles.open(self.abspath(config or self.config), mode='rb') as f:
_file = await f.read()
try:
self.admins = pickle.loads(_file)
except pickle.PickleError:
self.admins = {}
await self.add_admin("Admin", "Pa$$w0rd", 1000)
f.close()
except FileNotFoundError:
self.admins = {}
await self.add_admin("Admin", "Pa$$w0rd", 1000)
async def save_config(self, config: str = None) -> None:
"""
This method will save the configuration of the admins into a pickled database
:param config str: the path to the configuration
:return: None
"""
async with aiofiles.open(self.abspath(config or self.config), mode='wb+') as f:
_file = pickle.dumps(self.admins)
await f.write(_file)
f.close()
async def add_admin(self, user: str, password: str, level: int = 1) -> None:
""" """
Method to add an admin to the list of admins Method to add an admin to the list of admins
Saves the configuration to database
:param user str: the user of the admin to be added :param user str: the user of the admin to be added
:param password str: the password to be added :param password str: the password to be added
:return: None :return: None
""" """
self.admins[user] = {} self.admins[user] = {}
self.admins[user]['password'] = password self.admins[user]['password'] = self.hash(password)
self.admins[user]['level'] = level
self.admins[user]['logged_in'] = False self.admins[user]['logged_in'] = False
self.admins[user]['logged_in_hostname'] = None self.admins[user]['logged_in_hostname'] = None
self.admins[user]['logged_in_nick'] = None self.admins[user]['logged_in_nick'] = None
self.admins[user]['LOGIN'] = re.compile(r'^login\s+({})\s+({})\s*$'.format(user, password)) self.admins[user]['LOGIN'] = re.compile(
r'^login\s+({})\s+({})\s*$'.format(user, re.escape(self.hash(password))))
await self.save_config()
def rm_admin(self, user: str) -> None: async def rm_admin(self, user: str) -> None:
""" """
This method will delete a `user` key from the dictionary This method will delete a `user` key from the dictionary
which reprisents the list of admins which represents the list of admins
Saves configuration after deletion
:param user str: the key user to delete from the list of admins :param user str: the key user to delete from the list of admins
:return: None :return: None
""" """
del self.admins[user] del self.admins[user]
await self.save_config()
def _handle(self, nick: str, target: str, message: str, **kwargs: dict) -> None: async def _handle(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
""" """
Admin handler, this will check if the user is asking actions from the bot Admin handler, this will check if the user is asking actions from the bot
and triggers an ADMIN event that can be consumed later and triggers an ADMIN event that can be consumed later
@ -63,19 +184,25 @@ class Admin:
self.logger.debug("{}".format(kwargs)) self.logger.debug("{}".format(kwargs))
# Set the admin flag # Set the admin flag
if self.is_admin(nick, kwargs['host']): admin = self.is_admin(nick, kwargs['host'])
if admin:
kwargs['is_admin'] = True kwargs['is_admin'] = True
kwargs['level'] = self.admins[admin]['level']
else: else:
kwargs['is_admin'] = False kwargs['is_admin'] = False
if kwargs.get('level', None) is not None:
del kwargs['level']
if self.HELP.match(message): if self.HELP.match(message):
self.admin_help(nick, target, message, **kwargs) self.admin_help(nick, target, message, **kwargs)
elif self.LOGIN.match(message): elif self.LOGIN.match(message):
self.log_in(nick, target, message, **kwargs) await self.log_in(nick, target, message, **kwargs)
elif self.LOGOUT.match(message): elif self.LOGOUT.match(message):
self.log_out(nick, target, **kwargs) await self.log_out(nick, target, **kwargs)
elif self.PASSWD.match(message):
await self.passwd(nick, target, message, **kwargs)
elif self.ADD.match(message): elif self.ADD.match(message):
self.admin_add(nick, target, message, **kwargs) await self.admin_add(nick, target, message, **kwargs)
elif self.RM.match(message): elif self.RM.match(message):
self.admin_rm(nick, target, message, **kwargs) self.admin_rm(nick, target, message, **kwargs)
elif self.LIST.match(message): elif self.LIST.match(message):
@ -115,11 +242,14 @@ class Admin:
if match_help_cmd: if match_help_cmd:
if len(match_help_cmd.groups()) == 1: if len(match_help_cmd.groups()) == 1:
if match_help_cmd.group(1) == 'login': if match_help_cmd.group(1) == 'login':
kwargs['message'] = "login <user> <password> - Login as an admin with your account" kwargs['message'] = "login <user> <password> <level> - Login as an admin with your account"
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'logout': if match_help_cmd.group(1) == 'logout':
kwargs['message'] = "logout <user> - Log out from your account" kwargs['message'] = "logout <user> - Log out from your account"
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'passwd':
kwargs['message'] = "passwd <new password> - Change your account\'s password"
self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'add': if match_help_cmd.group(1) == 'add':
kwargs['message'] = "add <user> <password> - adds an admin account to the list of admins" kwargs['message'] = "add <user> <password> - adds an admin account to the list of admins"
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
@ -132,10 +262,10 @@ class Admin:
elif match_help: elif match_help:
kwargs['message'] = "help [command]" kwargs['message'] = "help [command]"
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
kwargs['message'] = "commands: login logout add rm list" kwargs['message'] = "commands: login logout passwd add rm list"
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
def log_in(self, nick: str, target: str, message:str , **kwargs: dict) -> None: async def log_in(self, nick: str, target: str, message:str , **kwargs: dict) -> None:
""" """
This method is called when a user attempts to login to the bot. This method is called when a user attempts to login to the bot.
It will mark the user as logged on on successful authentication It will mark the user as logged on on successful authentication
@ -144,6 +274,8 @@ class Admin:
Replies back to the user that they have logged in successfully Replies back to the user that they have logged in successfully
only on successful login only on successful login
Saves configuration to database
:param nick str: the nickname of the caller :param nick str: the nickname of the caller
:param target str: location the message was sent to :param target str: location the message was sent to
:param message str: message coming from `nick` :param message str: message coming from `nick`
@ -155,17 +287,26 @@ class Admin:
user = self.admins.get(match.group(1), None) user = self.admins.get(match.group(1), None)
if user: if user:
login = user['LOGIN'] login = user['LOGIN']
if login.match(message): if login.match(self.hash_pass(message)):
if kwargs.get('is_admin', None) is True and \
match.group(1) != self.is_admin(nick, kwargs['host']):
await self.log_out(nick, target, **kwargs)
if match.group(1) == self.is_admin(nick, kwargs['host']):
kwargs['target'] = nick
kwargs['message'] = "{} you are already logged in...".format(nick)
else:
self.admins[match.group(1)]['logged_in'] = True self.admins[match.group(1)]['logged_in'] = True
self.admins[match.group(1)]['logged_in_nick'] = str(nick) self.admins[match.group(1)]['logged_in_nick'] = str(nick)
self.admins[match.group(1)]['logged_in_hostname'] = str(kwargs['host']) self.admins[match.group(1)]['logged_in_hostname'] = str(kwargs['host'])
kwargs['is_admin'] = True kwargs['is_admin'] = True
kwargs['target'] = nick kwargs['target'] = nick
kwargs['message'] = "{}, you are logged in successfully".format(nick) kwargs['message'] = "{}, you are logged in to {} successfully".format(
nick, match.group(1))
self.logger.debug("LOGIN from kwargs: {}".format(kwargs)) self.logger.debug("LOGIN from kwargs: {}".format(kwargs))
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
await self.save_config()
def log_out(self, nick: str, target: str, **kwargs: dict) -> None: async def log_out(self, nick: str, target: str, **kwargs: dict) -> None:
""" """
This method is called when a user attempts to logout to the bot. This method is called when a user attempts to logout to the bot.
It will mark the user as logged out if they are the user logged It will mark the user as logged out if they are the user logged
@ -173,6 +314,8 @@ class Admin:
Replies back to the user that they have been logged out. Replies back to the user that they have been logged out.
Saves the configuration to database
:param nick str: the nickname of the caller :param nick str: the nickname of the caller
:param target str: location where the message was sent to :param target str: location where the message was sent to
:param kwargs dict: for API compatibility :param kwargs dict: for API compatibility
@ -187,14 +330,47 @@ class Admin:
self.admins[admin]['logged_in_hostname'] = None self.admins[admin]['logged_in_hostname'] = None
kwargs['is_admin'] = False kwargs['is_admin'] = False
kwargs['target'] = nick kwargs['target'] = nick
kwargs['message'] = "{}, you are logged out successfully".format(nick) kwargs['message'] = "{}, you are logged out of {} successfully".format(
nick, admin)
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
await self.save_config()
def admin_add(self, nick: str, target: str, message: str, **kwargs: dict) -> None: async def passwd(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
"""
This method will change the password to the administrator currently logged in
to the account.
Saves the configuration to database
:param nick str: the nickname of the caller
:param target str: the target where the message was sent to
:param message str: the message sent to target
:param kwargs dict: for API compatibility
:return: None
"""
if target == self.client.nick:
if kwargs.get('is_admin', None) is True:
match = self.PASSWD.match(message)
if len(match.groups()) == 1:
admin = self.is_admin(nick, kwargs['host'])
self.admins[admin]['password'] = self.hash(match.group(1))
self.admins[admin]['LOGIN'] = re.compile(
r'^login\s+({})\s+({})\s*$'.format(admin, re.escape(self.hash(match.group(1)))))
kwargs['target'] = nick
kwargs['message'] = '{}, password for {} has been successfully changed...'.format(
nick, admin)
self.client.send("PRIVMSG", **kwargs)
await self.save_config()
kwargs['target'] = self.client.nick
await self.log_out(nick, **kwargs)
async def admin_add(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
""" """
This method will add an administrator to the list of administrators only This method will add an administrator to the list of administrators only
if the administrator user exists if the administrator user exists
Saves configuration to database
:param nick str: the nickname of the caller :param nick str: the nickname of the caller
:param target str: location where the message was sent to :param target str: location where the message was sent to
:param message str: the message being sent to the target :param message str: the message being sent to the target
@ -204,23 +380,27 @@ class Admin:
if target == self.client.nick: if target == self.client.nick:
if kwargs.get('is_admin', None) is True: if kwargs.get('is_admin', None) is True:
match = self.ADD.match(message) match = self.ADD.match(message)
if len(match.groups()) == 2: if len(match.groups()) == 3:
if self.admins.get(match.group(1), None) is None: if self.admins.get(match.group(1), None) is None:
kwargs['target'] = nick kwargs['target'] = nick
kwargs['message'] = "{} has been added...".format(match.group(1)) level = self.level_up(kwargs['level'], int(match.group(3)))
self.add_admin(match.group(1), match.group(2)) kwargs['message'] = "{} has been added with level {}...".format(
match.group(1), level)
await self.add_admin(match.group(1), match.group(2), level)
else: else:
kwargs['target'] = nick kwargs['target'] = nick
kwargs['message'] = "{} has already been added...".format(match.group(1)) kwargs['message'] = "{} has already been added...".format(match.group(1))
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)
def admin_rm(self, nick: str, target: str, message: str, **kwags: dict) -> None: async def admin_rm(self, nick: str, target: str, message: str, **kwags: dict) -> None:
""" """
This method will remove an administrator from the list of administrators only This method will remove an administrator from the list of administrators only
if the administrator user exists if the administrator user exists
The caller will be notified either way The caller will be notified either way
Saves configuration to database
:param nick str: the nickname of the caller :param nick str: the nickname of the caller
:param target str: location where the message was sent to :param target str: location where the message was sent to
:param message str: the message being sent to the target :param message str: the message being sent to the target
@ -235,9 +415,16 @@ class Admin:
kwags['target'] = nick kwags['target'] = nick
kwags['message'] = "{} is not on the list...".format(match.group(1)) kwags['message'] = "{} is not on the list...".format(match.group(1))
else: else:
if kwags['level'] > self.admins[match.group(1)]['level'] or \
(kwags['level'] == 1000 and self.admins[match.group(1)]['level'] and
kwags['level'] == self.admins[match.group(1)]['level']):
kwags['target'] = nick kwags['target'] = nick
kwags['message'] = "{} has been removed...".format(match.group(1)) kwags['message'] = "{} has been removed...".format(match.group(1))
self.rm_admin(match.group(1)) await self.rm_admin(match.group(1))
else:
kwags['target'] = nick
kwags['message'] = "{}, you do not have enough access to delete {}".format(
nick, match.group(1))
self.client.send("PRIVMSG", **kwags) self.client.send("PRIVMSG", **kwags)
def admin_list(self, nick: str, target: str, **kwargs: dict) -> None: def admin_list(self, nick: str, target: str, **kwargs: dict) -> None:
@ -253,7 +440,10 @@ class Admin:
if kwargs.get('is_admin', None) is True: if kwargs.get('is_admin', None) is True:
admins = "" admins = ""
for key, _ in self.admins.items(): for key, _ in self.admins.items():
admins = "{} {} ".format(admins, key) if admins else key if admins:
admins = "{} {}({})".format(admins, key, self.admins[key]['level'])
else:
admins = "{}({})".format(key, self.admins[key]['level'])
kwargs['target'] = nick kwargs['target'] = nick
kwargs['message'] = "List of Administrators:" kwargs['message'] = "List of Administrators:"
self.client.send("PRIVMSG", **kwargs) self.client.send("PRIVMSG", **kwargs)

View file

@ -16,7 +16,7 @@ async def plugins(bot: robot.Bot):
bot.send("PRIVMSG", target=target, message=message) bot.send("PRIVMSG", target=target, message=message)
administrator = admin.Admin(bot) administrator = admin.Admin(bot)
administrator.add_admin("Armageddon", "12345") await administrator.init()
admin_cmd = admin_commands.AdminCmd(administrator) admin_cmd = admin_commands.AdminCmd(administrator)

3
config/README.md Normal file
View file

@ -0,0 +1,3 @@
# Config
This directory is a placeholder for the pickled databases written by the modules.

View file

@ -1 +1,2 @@
bottom bottom
aiofiles