Adding the ability to inject functions to be ran prior to the Bot joining channels

* PEP8 compliance changes in formatting
* multiple bugfixes
* fixing documentation
This commit is contained in:
Elijah Lazkani 2017-09-22 23:35:28 -04:00
parent c416ac14b8
commit 7da5650153
7 changed files with 286 additions and 135 deletions

View file

@ -42,4 +42,7 @@ A few default behaviours to note about the `Admin` module.
# TODO
- Test suites
- Module to handle service authentication
- A better way to handle help pages
- `Sphinx` documentation generation

View file

@ -21,7 +21,7 @@ class Admin:
LIST = re.compile(r'^list.*$')
def __init__(self, client: robot.Bot, config: str = None):
self.logger = logging.getLogger(__class__.__name__)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug("Initializing...")
self.client = client
self.config = config or "./config/admin.pkl"
@ -115,34 +115,43 @@ class Admin:
self.logger.info("Loading configuration...")
self.logger.debug("Attempting to load configuration {}".format(config))
try:
async with aiofiles.open(self.abspath(config or self.config), mode='rb') as f:
async with aiofiles.open(
self.abspath(config or self.config), mode='rb') as f:
_file = await f.read()
try:
self.admins = pickle.loads(_file)
except pickle.PickleError:
self.logger.warn("We failed to load configuration, creating a new one")
self.logger.warn(
"We failed to load configuration, creating a new one")
self.admins = {}
await self.add_admin("Admin", "Pa$$w0rd", 1000)
f.close()
except FileNotFoundError:
self.logger.warn("We failed to read the configuration file, creating a new configuration")
self.logger.warn(
"We failed to read the configuration file, "
"creating a new configuration")
self.admins = {}
await self.add_admin("Admin", "Pa$$w0rd", 1000)
async def save_config(self, config: str = None) -> None:
"""
This method will save the configuration of the admins into a pickled database
This method will save the configuration of the admins
into a pickled database
:param config str: the path to the configuration
:return: None
"""
self.logger.debug("We are attempting to write to drive...")
async with aiofiles.open(self.abspath(config or self.config), mode='wb+') as f:
async with aiofiles.open(
self.abspath(config or self.config), mode='wb+') as f:
_file = pickle.dumps(self.admins)
await f.write(_file)
f.close()
async def add_admin(self, user: str, password: str, level: int = 1) -> None:
async def add_admin(self,
user: str,
password: str,
level: int = 1) -> None:
"""
Method to add an admin to the list of admins
@ -160,7 +169,8 @@ class Admin:
self.admins[user]['logged_in_hostname'] = None
self.admins[user]['logged_in_nick'] = None
self.admins[user]['LOGIN'] = re.compile(
r'^login\s+({})\s+({})\s*$'.format(user, re.escape(self.hash(password))))
r'^login\s+({})\s+({})\s*$'.format(
user, re.escape(self.hash(password))))
self.logger.debug("We are calling save_config()")
await self.save_config()
@ -179,23 +189,30 @@ class Admin:
self.logger.debug("We are calling save_config()")
await self.save_config()
async def _handle(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
async def _handle(self,
nick: str,
target: str,
message: str,
**kwargs) -> None:
"""
Admin handler, this will check if the user is asking actions from the bot
and triggers an ADMIN event that can be consumed later
Admin handler, this will check if the user
is asking actions from the bot and triggers
an ADMIN event that can be consumed later
:param nick str: nickname of the caller
:param target str: location the message was sent to
:param message str: messages coming from the `nick`
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
self.logger.debug("We have received nick={} target={} message={} kwargs={}".format(
self.logger.debug("We have received nick={} target={} "
"message={} kwargs={}".format(
nick, target, message, kwargs))
# Set the admin flag
admin = self.is_admin(nick, kwargs['host'])
self.logger.debug("We are checking to see if {} is an admin".format(nick))
self.logger.debug(
"We are checking to see if {} is an admin".format(nick))
if admin:
kwargs['is_admin'] = True
kwargs['level'] = self.admins[admin]['level']
@ -224,12 +241,19 @@ class Admin:
kwargs['nick'] = nick
kwargs['target'] = target
kwargs['message'] = message
self.logger.debug("We are triggering a new event called ADMIN with {}".format(kwargs))
self.logger.debug(
"We are triggering a new event called ADMIN with {}".format(
kwargs))
self.client.trigger('ADMIN', **kwargs)
def admin_help(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
def admin_help(self,
nick: str,
target: str,
message: str,
**kwargs) -> None:
"""
This method will reply back to the user a help manual of the available commands
This method will reply back to the user a help manual of the
available commands
> help
>> help [command]
>> commands: login logout add rm list
@ -244,11 +268,12 @@ class Admin:
:param nick str: the nickname of the caller
:param target str: the target where the message was sent to
:param message str: the message sent to the target
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if target == self.client.nick:
self.logger.debug("We have received a help request from {}".format(nick))
self.logger.debug(
"We have received a help request from {}".format(nick))
if kwargs.get("is_admin", None) is True:
match_help = self.HELP.match(message)
match_help_cmd = self.HELP_CMD.match(message)
@ -256,21 +281,29 @@ class Admin:
if match_help_cmd:
if len(match_help_cmd.groups()) == 1:
if match_help_cmd.group(1) == 'login':
kwargs['message'] = "login <user> <password> <level> - Login as an admin with your account"
kwargs['message'] = \
"login <user> <password> <level> - Login as " \
"an admin with your account"
self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'logout':
kwargs['message'] = "logout <user> - Log out from your account"
kwargs['message'] = \
"logout <user> - Log out from your account"
self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'passwd':
kwargs['message'] = "passwd <new password> - Change your account\'s password"
kwargs['message'] = \
"passwd <new password> - Change your" \
" account\'s password"
self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'add':
kwargs['message'] = \
"add <user> <password> <level> - adds an admin account to the list of" \
"admins with provided level"
"add <user> <password> <level> - adds" \
" an admin account to the list of admins" \
" with provided level"
self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'rm':
kwargs['message'] = "rm <user> - removes an admin from the list of admins"
kwargs['message'] = \
"rm <user> - removes an admin from the list" \
" of admins"
self.client.send("PRIVMSG", **kwargs)
if match_help_cmd.group(1) == 'list':
kwargs['message'] = "list - lists all the admins"
@ -278,10 +311,15 @@ class Admin:
elif match_help:
kwargs['message'] = "help [command]"
self.client.send("PRIVMSG", **kwargs)
kwargs['message'] = "commands: login logout passwd add rm list"
kwargs['message'] = \
"commands: login logout passwd add rm list"
self.client.send("PRIVMSG", **kwargs)
async def log_in(self, nick: str, target: str, message:str , **kwargs: dict) -> None:
async def log_in(self,
nick: str,
target: str,
message: str,
**kwargs) -> None:
"""
This method is called when a user attempts to login to the bot.
It will mark the user as logged on on successful authentication
@ -295,7 +333,7 @@ class Admin:
:param nick str: the nickname of the caller
:param target str: location the message was sent to
:param message str: message coming from `nick`
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if target == self.client.nick:
@ -306,28 +344,36 @@ class Admin:
login = user['LOGIN']
if login.match(self.hash_pass(message)):
if kwargs.get('is_admin', None) is True and \
match.group(1) != self.is_admin(nick, kwargs['host']):
match.group(1) != self.is_admin(nick,
kwargs['host']):
self.logger.warn(
"We detected that {} is already logged in as different user, logging him out".format(nick))
"We detected that {} is already logged in as"
" different user, logging him out".format(nick))
await self.log_out(nick, target, **kwargs)
if match.group(1) == self.is_admin(nick, kwargs['host']):
kwargs['target'] = nick
kwargs['message'] = "{} you are already logged in...".format(nick)
self.logger.warn("We detected that {} is already logged in, notifying".format(nick))
kwargs['message'] = "{} you are already logged in" \
"...".format(nick)
self.logger.warn("We detected that {} is already "
"logged in, notifying".format(nick))
else:
self.admins[match.group(1)]['logged_in'] = True
self.admins[match.group(1)]['logged_in_nick'] = str(nick)
self.admins[match.group(1)]['logged_in_hostname'] = str(kwargs['host'])
self.admins[match.group(1)]['logged_in_nick'] =\
str(nick)
self.admins[match.group(1)]['logged_in_hostname'] = \
str(kwargs['host'])
kwargs['is_admin'] = True
kwargs['target'] = nick
kwargs['message'] = "{}, you are logged in to {} successfully".format(
kwargs['message'] = "{}, you are logged in to {}" \
" successfully".format(
nick, match.group(1))
self.logger.debug("We have logged in {} successfully, notifying".format(nick))
self.logger.debug("We have logged in {} successfully, "
"notifying".format(nick))
self.client.send("PRIVMSG", **kwargs)
self.logger.debug("We are calling save_config()")
await self.save_config()
async def log_out(self, nick: str, target: str, **kwargs: dict) -> None:
async def log_out(self, nick: str, target: str, **kwargs) -> None:
"""
This method is called when a user attempts to logout to the bot.
It will mark the user as logged out if they are the user logged
@ -339,7 +385,7 @@ class Admin:
:param nick str: the nickname of the caller
:param target str: location where the message was sent to
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if target == self.client.nick:
@ -352,24 +398,29 @@ class Admin:
self.admins[admin]['logged_in_hostname'] = None
kwargs['is_admin'] = False
kwargs['target'] = nick
kwargs['message'] = "{}, you are logged out of {} successfully".format(
nick, admin)
self.logger.debug("We have successfully logged {} out, notifying".format(nick))
kwargs['message'] = "{}, you are logged out of" \
" successfully".format(nick, admin)
self.logger.debug("We have successfully logged {}"
" out, notifying".format(nick))
self.client.send("PRIVMSG", **kwargs)
self.logger.debug("We are calling save_config")
await self.save_config()
async def passwd(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
async def passwd(self,
nick: str,
target: str,
message: str,
**kwargs) -> None:
"""
This method will change the password to the administrator currently logged in
to the account.
This method will change the password to the administrator currently
logged in to the account.
Saves the configuration to database
:param nick str: the nickname of the caller
:param target str: the target where the message was sent to
:param message str: the message sent to target
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if target == self.client.nick:
@ -380,11 +431,15 @@ class Admin:
admin = self.is_admin(nick, kwargs['host'])
self.admins[admin]['password'] = self.hash(match.group(1))
self.admins[admin]['LOGIN'] = re.compile(
r'^login\s+({})\s+({})\s*$'.format(admin, re.escape(self.hash(match.group(1)))))
r'^login\s+({})\s+({})\s*$'.format(
admin, re.escape(self.hash(match.group(1)))))
kwargs['target'] = nick
kwargs['message'] = '{}, password for {} has been successfully changed...'.format(
nick, admin)
self.logger.debug("We have successfully changed {}'s password, notifying".format(nick))
kwargs['message'] = \
'{}, password for {} has been successfully' \
' changed...'.format(nick, admin)
self.logger.debug(
"We have successfully changed {}'s password,"
" notifying".format(nick))
self.client.send("PRIVMSG", **kwargs)
self.logger.debug("We are calling save_config()")
await self.save_config()
@ -392,17 +447,21 @@ class Admin:
self.logger.debug("We are logging {} out".format(nick))
await self.log_out(nick, **kwargs)
async def admin_add(self, nick: str, target: str, message: str, **kwargs: dict) -> None:
async def admin_add(self,
nick: str,
target: str,
message: str,
**kwargs) -> None:
"""
This method will add an administrator to the list of administrators only
if the administrator user exists
This method will add an administrator to the list of administrators
only if the administrator user exists
Saves configuration to database
:param nick str: the nickname of the caller
:param target str: location where the message was sent to
:param message str: the message being sent to the target
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if target == self.client.nick:
@ -412,24 +471,33 @@ class Admin:
if len(match.groups()) == 3:
if self.admins.get(match.group(1), None) is None:
kwargs['target'] = nick
level = self.level_up(kwargs['level'], int(match.group(3)))
kwargs['message'] = "{} has been added with level {}...".format(
level = self.level_up(kwargs['level'],
int(match.group(3)))
kwargs['message'] = "{} has been added with " \
"level {}...".format(
match.group(1), level)
self.logger.debug(
"We have added {} with level {}, notifying {}".format(
match.group(1), level, nick))
await self.add_admin(match.group(1), match.group(2), level)
"We have added {} with level {}, notifying"
" {}".format(match.group(1), level, nick))
await self.add_admin(
match.group(1), match.group(2), level)
else:
kwargs['target'] = nick
kwargs['message'] = "{} has already been added...".format(match.group(1))
kwargs['message'] = "{} has already been added" \
"...".format(match.group(1))
self.logger.warn(
"We detected that {} has already been added, notifying {}".format(match.group(1), nick))
"We detected that {} has already been added,"
" notifying {}".format(match.group(1), nick))
self.client.send("PRIVMSG", **kwargs)
async def admin_rm(self, nick: str, target: str, message: str, **kwags: dict) -> None:
async def admin_rm(self,
nick: str,
target: str,
message: str,
**kwags) -> None:
"""
This method will remove an administrator from the list of administrators only
if the administrator user exists
This method will remove an administrator from the list of
administrators only if the administrator user exists
The caller will be notified either way
@ -438,7 +506,7 @@ class Admin:
:param nick str: the nickname of the caller
:param target str: location where the message was sent to
:param message str: the message being sent to the target
:param kwags dict: for API compatibility
:param kwags: for API compatibility
:return: None
"""
if target == self.client.nick:
@ -448,33 +516,41 @@ class Admin:
if len(match.groups()) == 1:
if self.admins.get(match.group(1), None) is None:
kwags['target'] = nick
kwags['message'] = "{} is not on the list...".format(match.group(1))
kwags['message'] = "{} is not on the list" \
"...".format(match.group(1))
self.logger.warn("admin_rm() ")
else:
if kwags['level'] > self.admins[match.group(1)]['level'] or \
(kwags['level'] == 1000 and self.admins[match.group(1)]['level'] and
kwags['level'] == self.admins[match.group(1)]['level']):
if kwags['level'] > \
self.admins[match.group(1)]['level'] \
or (kwags['level'] == 1000
and self.admins[match.group(1)]['level']
and kwags['level'] ==
self.admins[match.group(1)]['level']):
kwags['target'] = nick
kwags['message'] = "{} has been removed...".format(match.group(1))
self.logger.debug("We removed {} successfully, notifying {}".format(
kwags['message'] = "{} has been removed" \
"...".format(match.group(1))
self.logger.debug("We removed {} successfully,"
" notifying {}".format(
match.group(1), nick))
await self.rm_admin(match.group(1))
else:
kwags['target'] = nick
kwags['message'] = "{}, you do not have enough access to delete {}".format(
kwags['message'] = "{}, you do not have enough" \
" access to delete {}".format(
nick, match.group(1))
self.logger.warn(
"We detected that {0} does not have enough access to delete {1}, notifying {0}".format(
"We detected that {0} does not have enough"
" access to delete {1}, notifying {0}".format(
nick, match.group(1)))
self.client.send("PRIVMSG", **kwags)
def admin_list(self, nick: str, target: str, **kwargs: dict) -> None:
def admin_list(self, nick: str, target: str, **kwargs) -> None:
"""
This method returns to the admin the admin list
:param nick str: the nickname of the caller
:param target str: location where the message was sent to
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if target == self.client.nick:
@ -483,15 +559,17 @@ class Admin:
admins = ""
for key, _ in self.admins.items():
if admins:
admins = "{} {}({})".format(admins, key, self.admins[key]['level'])
admins = "{} {}({})".format(
admins, key, self.admins[key]['level'])
else:
admins = "{}({})".format(key, self.admins[key]['level'])
admins = "{}({})".format(
key, self.admins[key]['level'])
kwargs['target'] = nick
kwargs['message'] = "List of Administrators:"
self.client.send("PRIVMSG", **kwargs)
kwargs['message'] = admins
self.logger.debug("We are returning admin list page to {}".format(
kwargs))
self.logger.debug("We are returning admin list page to"
" {}".format(kwargs))
self.client.send("PRIVMSG", **kwargs)
def is_admin(self, user: str, host: str):
@ -508,7 +586,9 @@ class Admin:
if value.get('logged_in', None) and \
value.get('logged_in_nick', None) == user and \
value.get('logged_in_hostname', None) == host:
self.logger.debug("We are returning {} as an admin".format(user))
self.logger.debug("We are returning {} as an"
" admin".format(user))
return admin
self.logger.debug("We are returning nothing, {} is not an admin".format(user))
self.logger.debug("We are returning nothing, {} is"
" not an admin".format(user))
return None

View file

@ -5,13 +5,14 @@ import logging
import asyncio
import admin
class AdminCmd:
"""
This is AdminCmd class that consumes ADMIN events and parses them
into known actions defined by users
"""
def __init__(self, admin: admin.Admin, modifier: str = "!"):
self.logger = logging.getLogger(__class__.__name__)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug("Initializing...")
self.admin = admin
self.client = admin.client
@ -20,21 +21,24 @@ class AdminCmd:
self.services = {}
admin.client.on("ADMIN")(self._handle)
def _handle(self, target: str, message: str, **kwargs: dict) -> None:
def _handle(self, target: str, message: str, **kwargs) -> None:
"""
client callback on event trigger
:param target str: the target the message was sent to
:param message str: the message sent to the target
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
if bool(kwargs.get('is_admin', None)):
self.logger.debug("We are being called by {}".format(kwargs['nick']))
self.logger.debug(
"We are being called by {}".format(kwargs['nick']))
for regex, (func, pattern) in self.services.items():
match = regex.match(message)
if match:
self.logger.debug("We are calling the function that matched the regex {}".format(regex))
self.logger.debug(
"We are calling the function that matched the regex"
" {}".format(regex))
split_msg = message.split(" ")
message = " ".join(split_msg[1:])
self.client.loop.create_task(
@ -43,13 +47,13 @@ class AdminCmd:
def on_command(self,
command: str,
func: types.FunctionType = None,
** kwargs: dict) -> types.FunctionType:
** kwargs) -> types.FunctionType:
"""
Decorator function for the administrator commands
:param commant str: the command that the admins are allowed to do
:param func types.FunctionType: the function being decorated
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: types.FunctionType the function that called it
"""
if func is None:
@ -59,7 +63,8 @@ class AdminCmd:
if not asyncio.iscoroutinefunction(wrapped):
wrapped = asyncio.coroutine(wrapped)
compiled = re.compile(r'^{}{}\s*(.*)$'.format(re.escape(self.modifier), command))
compiled = re.compile(r'^{}{}\s*(.*)$'.format(
re.escape(self.modifier), command))
self.services[compiled] = (wrapped, command)
return func

View file

@ -11,7 +11,28 @@ logger = logging.getLogger(__name__)
async def plugins(bot: robot.Bot):
@bot.pre_public
async def auth(**kwargs):
logger.debug("Asking Help")
bot.send("USERMODE", nick=bot.nick, modes="+x")
target = "E@channels.evilnet.org"
message = "login <nick> <password>"
bot.send("PRIVMSG", target=target, message=message)
await bot.wait("NOTICE")
# Code below will not work, it is awaiting a PR upstream
# @bot.on("USERMODE")
# def umode(**kwargs):
# logger.debug("USERMODE {}".format(kwargs))
# @bot.on("CHANNELMODE")
# def cmode(**kwargs):
# logger.debug("CHANNELMODE {}".format(kwargs))
# 8Ball
magic_ball = eightball.EightBall(bot)
@magic_ball.on_keyword
async def ball(target, message, **kwargs):
bot.send("PRIVMSG", target=target, message=message)
@ -38,7 +59,8 @@ async def plugins(bot: robot.Bot):
@admin_cmd.on_command("action")
def action(target, message, **kwargs):
kwargs['target'] = message.split(' ')[0]
kwargs['message'] = "\x01ACTION {}\x01".format(" ".join(message.split(' ')[1:]))
kwargs['message'] = \
"\x01ACTION {}\x01".format(" ".join(message.split(' ')[1:]))
bot.send("PRIVMSG", **kwargs)
@admin_cmd.on_command("quit")
@ -48,16 +70,19 @@ async def plugins(bot: robot.Bot):
# Exit the event loop cleanly
bot.loop.stop()
def main():
host = 'eu.undernet.org'
host = 'irc.evilnet.org'
port = 6667
ssl = False
nick = "LuckyBoots"
channel = "#msaytbeh"
channel = "#boots"
bot = robot.Bot(host=host, port=port, ssl=ssl, nick=nick, channels=[channel])
bot = robot.Bot(host=host, port=port,
ssl=ssl, nick=nick,
channels=[channel])
bot.loop.create_task(bot.connect())

View file

@ -6,6 +6,7 @@ import functools
import asyncio
import robot
class EightBall:
"""
EightBall Game for the IRC bot
@ -34,7 +35,7 @@ class EightBall:
]
def __init__(self, client: robot.Bot, keyword: str = ".8ball"):
self.logger = logging.getLogger(__class__.__name__)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug("Initializing...")
self.client = client
self.questions = {}
@ -48,7 +49,7 @@ class EightBall:
:param nick str: nickname of the caller
:param target str: nickname to reply to
:param message str: the question asked
:param kwargs dict:
:param kwargs: for API compatibility
:return: None
"""
self.logger.debug("We are being called by {}".format(nick))
@ -61,14 +62,17 @@ class EightBall:
for regex, (func, pattern) in self.questions.items():
match = regex.match(message)
if match:
if is_question(match.group(1)):
if EightBall.is_question(match.group(1)):
answer = self._get_answer()
message = "{}, {}".format(nick, answer)
self.logger.debug("We found '{}', notifying {}".format(answer, nick))
self.logger.debug("We found '{}', notifying"
" {}".format(answer, nick))
else:
message = \
"{}, I did not detect a question for me to answer".format(nick)
self.logger.debug("We did not detect a question, notifying {}".format(nick))
"{}, I did not detect a question for me" \
" to answer".format(nick)
self.logger.debug("We did not detect a question,"
" notifying {}".format(nick))
self.client.loop.create_task(
func(target, message, **kwargs))
@ -100,13 +104,13 @@ class EightBall:
def on_keyword(self,
func: types.FunctionType = None,
**kwargs: dict):
**kwargs):
"""
Decorator function for the 8ball game
:param keyword str: the keyword to activate the 8ball game
:param func types.FunctionType: the function being decorated
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: function the function that called it
"""
if func is None:
@ -120,10 +124,7 @@ class EightBall:
self.questions[compiled] = (wrapped, self.keyword_pattern)
return func
logger = logging.getLogger(__name__)
@staticmethod
def is_question(phrase: str) -> bool:
"""
Method to check if a string is a question
@ -131,10 +132,7 @@ def is_question(phrase: str) -> bool:
:param phrase str: the phrase to check
:return: bool if the phrase is a question or not
"""
logger.debug("We are being called for '{}'".format(phrase))
compiled = re.compile(r'^[^\?]+\?\s*$')
if compiled.match(phrase):
logger.debug("Returning True")
return True
logger.debug("Returning False")
return False

View file

@ -6,8 +6,7 @@ import yaml
def setup_logging(
default_path: str = None,
default_level: int = logging.INFO,
env_key: str = 'LOG_CFG'
):
env_key: str = 'LOG_CFG'):
if default_path:
path = default_path
value = os.getenv(env_key, None)
@ -18,5 +17,6 @@ def setup_logging(
config = yaml.safe_load(f.read())
logging.config.dictConfig(config)
else:
_format = '%(asctime)s - %(levelname)s - %(filename)s:%(name)s.%(funcName)s:%(lineno)d - %(message)s'
_format = '%(asctime)s - %(levelname)s - %(filename)s:' \
'%(name)s.%(funcName)s:%(lineno)d - %(message)s'
logging.basicConfig(level=default_level, format=_format)

View file

@ -1,5 +1,7 @@
import logging
import asyncio
import types
import functools
import bottom
@ -17,41 +19,46 @@ class Bot(bottom.Client):
realname: str = None,
channels: list = None) -> None:
super().__init__(host=host, port=port, ssl=ssl)
self.logger = logging.getLogger(__class__.__name__)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug("Initializing...")
self.nick = nick
self.user = user or self.nick
self.realname = realname or self.nick
self.channels = channels or []
self.pre_pub = []
self.on("ping", self.keepalive)
self.on("CLIENT_CONNECT", self.on_connect)
self.on("PRIVMSG", self.privmsg)
self.on("NOTICE", self.notice)
async def keepalive(self, message: str = None, **kwargs: dict) -> None:
async def keepalive(self, message: str = None, **kwargs) -> None:
"""
Essential keepalive method to pong the server back
automagically everytime it pings
:param message str: the ping parameters
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
message = message or ""
self.logger.debug("PONG {}".format(message))
self.send("pong", message=message)
async def on_connect(self, **kwargs: dict) -> None:
async def on_connect(self, **kwargs) -> None:
"""
Essential user information to send at the
beginning of the connection with the server
:param kwargs dict: for API compatibility
:param kwargs: for API compatibility
:return: None
"""
self.logger.debug("We are being called")
self.logger.info("Connecting...")
self.logger.debug("We are sending NICK as {} to server ".format(self.nick))
self.logger.debug(
"We are sending NICK as {} to server ".format(self.nick))
self.send('NICK', nick=self.nick)
self.logger.debug("We are sending USER {} {}".format(self.user, self.realname))
self.logger.debug(
"We are sending USER {} {}".format(self.user, self.realname))
self.send('USER', user=self.user,
realname=self.realname)
@ -65,11 +72,44 @@ class Bot(bottom.Client):
# Cancel whichever waiter's event didn't come in
for future in pending:
future.cancel()
self.logger.info("We are auto-joining channels {}".format(self.channels))
for func in self.pre_pub:
self.logger.debug("Running {}".format(func))
await self.loop.create_task(func(**kwargs))
self.logger.info(
"We are auto-joining channels {}".format(self.channels))
for channel in self.channels:
self.send('JOIN', channel=channel)
async def on_disconnect(self, **kwargs: dict) -> None:
def pre_public(self,
func: types.FunctionType = None,
**kwargs) -> types.FunctionType:
"""
This method will inject a function to be trigger before
the Bot module joins channels
:param func types.FunctionType: the method being decorated
:param kwargs: for API compatibility
:return func types.FunctionType: always return the function itself
"""
if func is None:
return functools.partial(self.pre_public)
wrapped = func
if not asyncio.iscoroutinefunction(wrapped):
wrapped = asyncio.coroutine(wrapped)
self.pre_pub.extend([wrapped])
return func
def privmsg(self, **kwargs) -> None:
self.logger.debug("PRIVMSG {}".format(kwargs))
def notice(self, **kwargs) -> None:
self.logger.debug("NOTICE {}".format(kwargs))
async def on_disconnect(self, **kwargs) -> None:
self.logger.debug("We are being called")
await self.disconnect()
self.logger.debug("We are calling loop.stop()")