boots/boots/robot.py

278 lines
10 KiB
Python

import logging
import asyncio
import types
import typing
import functools
import datetime
import bottom
class Bot(bottom.Client):
"""
The Bot class adds features to the bottom.Client class
"""
def __init__(self,
host: str,
port: int,
ssl: bool,
nick: str,
user: str = None,
realname: str = None,
channels: list = None,
msg_flood_size_threshold = 3,
msg_flood_time_threshold = 5,
action_flood_size_threshold=3,
action_flood_time_threshold=5,
global_flood_size_threshold=3,
global_flood_time_threshold=5,
loop: typing.Optional[asyncio.AbstractEventLoop] = None) -> None:
super().__init__(host=host, port=port, ssl=ssl, loop=loop)
self.logger = logging.getLogger(self.__class__.__name__)
self.logger.debug("Initializing...")
self.nick = nick
self.user = user or self.nick
self.realname = realname or self.nick
self.channels = channels or []
self.pre_pub = []
self.msg_flood_size_threshold = msg_flood_size_threshold
self.msg_flood_time_threshold = msg_flood_time_threshold
self.action_flood_size_threshold = action_flood_size_threshold
self.action_flood_time_threshold = action_flood_time_threshold
self.global_flood_size_threshold = global_flood_size_threshold
self.global_flood_time_threshold = global_flood_time_threshold
self.msg_timestamp_queue = []
self.action_timestamp_queue = []
self.global_timestamp_queue = []
self.on("ping", self.keepalive)
self.on("CLIENT_CONNECT", self.on_connect)
self.on("PRIVMSG", self.privmsg)
self.on("NOTICE", self.notice)
async def keepalive(self, message: str = None, **kwargs) -> None:
"""
Essential keepalive method to pong the server back
automagically everytime it pings
:param message str: the ping parameters
:param kwargs: for API compatibility
:return: None
"""
message = message or ""
self.logger.debug("PONG {}".format(message))
self.send("pong", message=message)
async def on_connect(self, **kwargs) -> None:
"""
Essential user information to send at the
beginning of the connection with the server
:param kwargs: for API compatibility
:return: None
"""
self.logger.debug("We are being called")
self.logger.info("Connecting...")
self.logger.debug(
"We are sending NICK as {} to server ".format(self.nick))
self.send('NICK', nick=self.nick)
self.logger.debug(
"We are sending USER {} {}".format(self.user, self.realname))
self.send('USER', user=self.user,
realname=self.realname)
done, pending = await asyncio.wait(
[self.wait("RPL_ENDOFMOTD"),
self.wait("ERR_NOMOTD")],
loop=self.loop,
return_when=asyncio.FIRST_COMPLETED
)
# Cancel whichever waiter's event didn't come in
for future in pending:
future.cancel()
for func in self.pre_pub:
self.logger.debug("Running {}".format(func))
await self.loop.create_task(func(**kwargs))
self.logger.info(
"We are auto-joining channels {}".format(self.channels))
for channel in self.channels:
self.send('JOIN', channel=channel)
def pre_public(self,
func: types.FunctionType = None,
**kwargs) -> types.FunctionType:
"""
This method will inject a function to be trigger before
the Bot module joins channels
:param func types.FunctionType: the method being decorated
:param kwargs: for API compatibility
:return func types.FunctionType: always return the function itself
"""
if func is None:
return functools.partial(self.pre_public)
wrapped = func
if not asyncio.iscoroutinefunction(wrapped):
wrapped = asyncio.coroutine(wrapped)
self.pre_pub.extend([wrapped])
return func
@staticmethod
async def _wait(timestamp_queue: list, flood_size_threshold: int,
flood_time_threshold: int, logger: logging.Logger) -> list:
"""
This method will figure out if the calling function needs to wait.
If so, it will wait the correct amount of time before returning.
:param timestamp_queue list: list of timestamp of the size of flood_size_threshold
:param flood_size_threshold int: total size of the flood threshold
:param flood_time_threshold int: total time of the flood threshold
:param logger logging.Logger: logger to use
:return timetamp_queue list: the new timestamp_queue after modifications
"""
timestamp = datetime.datetime.now()
if timestamp_queue.__len__() < flood_size_threshold:
timestamp_queue.extend([timestamp])
else:
del timestamp_queue[0]
timestamp_queue.extend([timestamp])
time_diff = timestamp_queue[-1] - timestamp_queue[0]
if len(timestamp_queue) == flood_size_threshold and \
time_diff.total_seconds() < flood_time_threshold:
logger.debug("Waiting {}s".format(flood_time_threshold - time_diff.total_seconds()))
await asyncio.sleep(flood_time_threshold - time_diff.total_seconds())
return timestamp_queue
async def global_buffer(self, command: str, **kwargs) -> None:
"""
This method will buffer the communication sent to the server
to keep the client from getting disconnected
It uses global_* variables as configuration
NOTE: this method is to be called *only* by other buffer methods
:param command str: the command to send to the server
:param kwargs: the information required for the command
:return None
"""
self.global_timestamp_queue = await self._wait(self.global_timestamp_queue,
self.global_flood_size_threshold,
self.global_flood_time_threshold,
self.logger)
self.send(command, **kwargs)
async def msg_buffer(self, command: str, **kwargs) -> None:
"""
This method will buffer the messages sent to the server
to keep the client from getting disconnected
messages are of type: PRIVMSG which includes ACTION
It uses msg_* variables as configuration
NOTE: this method is to be called *only* by bot methods
:param command str: the message command to send to the server
:param kwargs: the information required for the message command
:return None
"""
self.msg_timestamp_queue = await self._wait(self.msg_timestamp_queue,
self.msg_flood_size_threshold,
self.msg_flood_time_threshold,
self.logger)
await self.global_buffer(command, **kwargs)
async def action_buffer(self, command: str, **kwargs):
"""
This method will buffer the actions sent to the server
to keep the client from getting disconnected
actions are of type: JOIN, PART and QUIT
It uses action_* variables as configuration
NOTE: this method is to be called *only* by bot methods
:param command str: the action command to send to the server
:param kwargs: the information required for the action command
:return None
"""
self.action_timestamp_queue = await self._wait(self.action_timestamp_queue,
self.action_flood_size_threshold,
self.action_flood_time_threshold,
self.logger)
await self.global_buffer(command, **kwargs)
async def msg(self, **kwargs) -> None:
"""
This method will send a private message to the messages buffer
:param kwargs: the information required for the command
:return None
"""
await self.msg_buffer("PRIVMSG", **kwargs)
async def action(self, **kwargs) -> None:
"""
This method will send an action message to the messages buffer
:param kwargs: the information required for the command
:return None
"""
if kwargs.get('message', None):
kwargs['message'] = \
"\x01ACTION {}\x01".format(kwargs['message'])
await self.msg_buffer("PRIVMSG", **kwargs)
else:
# TODO: better error handling for the future
self.logger.error("ACTION does not have a message\n{}".format(kwargs))
async def join(self, **kwargs) -> None:
"""
This method will send a join command to the action buffer
:param kwargs: the information required for the command
:return None
"""
await self.action_buffer("JOIN", **kwargs)
async def part(self, **kwargs) -> None:
"""
This method will send a part command to the action buffer
:param kwargs: the information required for the command
:return None
"""
await self.action_buffer("PART", **kwargs)
async def quit(self, **kwargs) -> None:
"""
This method will send a quit command to the action buffer
:param kwargs: the information required for the command
:return None
"""
await self.action_buffer("QUIT", **kwargs)
def privmsg(self, **kwargs) -> None:
self.logger.debug("PRIVMSG {}".format(kwargs))
def notice(self, **kwargs) -> None:
self.logger.debug("NOTICE {}".format(kwargs))
async def on_disconnect(self, **kwargs) -> None:
self.logger.debug("We are being called")
await self.disconnect()
self.logger.debug("We are calling loop.stop()")
self.loop.stop()