This commit is contained in:
deathbybandaid 2022-02-24 10:58:48 -05:00
parent e9a1e07bb9
commit 46f6966eda
27 changed files with 2129 additions and 0 deletions

57
.gitignore vendored Normal file
View File

@ -0,0 +1,57 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/

23
COPYING Normal file
View File

@ -0,0 +1,23 @@
Eiffel Forum License, version 2
1. Permission is hereby granted to use, copy, modify and/or
distribute this package, provided that:
* copyright notices are retained unchanged,
* any distribution of this package, whether modified or not,
includes this license text.
2. Permission is hereby also granted to distribute binary programs
which depend on this package. If the binary program depends on a
modified version of this package, you are encouraged to publicly
release the modified version of this package.
***********************
THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT WARRANTY. ANY EXPRESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE TO ANY PARTY FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THIS PACKAGE.
***********************

9
MANIFEST.in Normal file
View File

@ -0,0 +1,9 @@
include NEWS
include COPYING
include README.md
recursive-exclude * __pycache__
recursive-exclude * *.py[co]
recursive-include *.py
recursive-include * *.py
recursive-include * * *.py

0
NEWS Normal file
View File

View File

@ -1,2 +1,3 @@
# SpiceBot
A Niche Wrapper around Sopel

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
schedule

34
setup.cfg Normal file
View File

@ -0,0 +1,34 @@
[metadata]
name = SpiceBot
version = 0.1.0
description = A Niche Wrapper around Sopel
author = deathbybandaid
author_email = sam@deathbybandaid.net
url = https://git.deathbybandaid.net/deathbybandaid/SpiceBot.git
license = Eiffel Forum License, version 2
classifiers =
Intended Audience :: Developers
Intended Audience :: System Administrators
License :: Eiffel Forum License (EFL)
License :: OSI Approved :: Eiffel Forum License
Topic :: Communications :: Chat :: Internet Relay Chat
[options]
packages = find:
zip_safe = false
include_package_data = true
install_requires =
sopel>=7.0,<8
[options.entry_points]
sopel.plugins =
sopel_SpiceBot_Core_1 = sopel_SpiceBot_Core_1
sopel_SpiceBot_Core_Prerun = sopel_SpiceBot_Core_Prerun
sopel_SpiceBot_Core_Startup = sopel_SpiceBot_Core_Startup
sopel_SpiceBot_Runtime_Commands = sopel_SpiceBot_Runtime_Commands
sopel_SpiceBot_Runtime_Nickname_Commands = sopel_SpiceBot_Runtime_Nickname_Commands
sopel_SpiceBot_Runtime_Action_Commands = sopel_SpiceBot_Runtime_Action_Commands
sopel_SpiceBot_Runtime_Unmatched_Commands = sopel_SpiceBot_Runtime_Unmatched_Commands
spicemanip = spicemanip
spicebot_command_lower = spicebot_command_lower
spicebot_command_upper = spicebot_command_upper

25
setup.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import sys
from setuptools import setup, find_packages
if __name__ == '__main__':
print('Sopel does not correctly load plugins installed with setup.py '
'directly. Please use "pip install .", or add '
'{}/sopel_SpiceBot to core.extra in your config.'
.format(os.path.dirname(os.path.abspath(__file__))),
file=sys.stderr)
with open('README.md') as readme_file:
readme = readme_file.read()
with open('NEWS') as history_file:
history = history_file.read()
setup(
long_description=readme + '\n\n' + history,
long_description_content_type='text/markdown',
)

View File

@ -0,0 +1,60 @@
from .config import Config
from .versions import Versions
from .logger import Logger
from .database import Database
from .comms import Comms
from .events import Events
from .commands import Commands
class SpiceBotCore_OBJ():
def __init__(self, script_dir):
# Set directory for the plugin
self.script_dir = script_dir
# Allow SpiceBot to interact with Sopel Logger
self.logger = Logger()
self.logger.info("SpiceBot Logging Interface Setup Complete.")
# Allow Spicebot to mimic Sopel Config
self.config = Config(script_dir)
self.logger.info("SpiceBot Config Interface Setup Complete.")
# Parse Version Information for the ENV
self.versions = Versions(self.config, self.logger)
self.logger.info("SpiceBot Versions Interface Setup Complete.")
# Mimic Sopel DB, with enhancements
self.database = Database(self.config)
self.logger.info("SpiceBot Database Interface Setup Complete.")
# SpiceBots manual event system
self.events = Events(self.logger)
self.logger.info("SpiceBot Events Interface Setup Complete.")
# Bypass Sopel's method for writing to IRC
self.comms = Comms(self.config)
self.logger.info("SpiceBot Comms Interface Setup Complete.")
# SpiceBots access to Sopel Command listing
self.commands = Commands(self.config)
self.logger.info("SpiceBot Commands Interface Setup Complete.")
def setup(self, bot):
"""This runs with the plugin setup routine"""
# store an access interface to sopel.bot
self.bot = bot
# Re-initialize the bot config properly during plugin setup routine
self.config.config = bot.config
# Give Spicebot access to bot commands
self.commands.bot = bot
# OSD shortcut
def osd(self, messages, recipients=None, text_method='PRIVMSG', max_messages=-1):
return self.comms.osd(messages, recipients, text_method, max_messages)

View File

@ -0,0 +1,333 @@
from sopel.trigger import PreTrigger
class Commands():
def __init__(self, config):
self.bot = None
self.config = config
@property
def multi_split_key(self):
# TODO config
return "&&"
@property
def pipe_split_key(self):
# TODO config
return "|"
@property
def valid_sopel_commands(self):
found = []
for command_dict in self.sopel_commands:
found.append(command_dict["name"])
found.extend(command_dict["aliases"])
return found
@property
def valid_sopel_nickname_commands(self):
found = []
for command_dict in self.sopel_nickname_commands:
found.append(command_dict["name"])
found.extend(command_dict["aliases"])
return found
@property
def valid_sopel_action_commands(self):
found = []
for command_dict in self.sopel_action_commands:
found.append(command_dict["name"])
found.extend(command_dict["aliases"])
return found
@property
def sopel_commands(self):
commands_list = []
for plugin_name, commands in self.bot.rules.get_all_commands():
for command in commands.values():
commands_list.append({
"name": command.name,
"aliases": command.aliases,
"type": "command"
})
return commands_list
@property
def sopel_nickname_commands(self):
commands_list = []
for plugin_name, commands in self.bot.rules.get_all_nick_commands():
for command in commands.values():
commands_list.append({
"name": command.name,
"aliases": command.aliases,
"type": "nickname_command"
})
return commands_list
@property
def sopel_action_commands(self):
commands_list = []
for plugin_name, commands in self.bot.rules.get_all_action_commands():
for command in commands.values():
commands_list.append({
"name": command.name,
"aliases": command.aliases,
"type": "action_command"
})
return commands_list
def dispatch(self, trigger_dict):
if trigger_dict["trigger_type"] == "command":
pretrigger = self.generate_pretrigger_command(trigger_dict)
elif trigger_dict["trigger_type"] == "nickname_command":
pretrigger = self.generate_pretrigger_nickname_command(trigger_dict)
elif trigger_dict["trigger_type"] == "action_command":
pretrigger = self.generate_pretrigger_action_command(trigger_dict)
self.bot.dispatch(pretrigger)
def generate_pretrigger_command(self, trigger_dict):
# @time=2022-02-23T15:04:01.447Z :
pretrigger = PreTrigger(
self.bot.nick,
":%s %s %s :%s%s %s" % (trigger_dict["trigger_hostmask"], "PRIVMSG", trigger_dict["trigger_sender"],
trigger_dict["trigger_prefix"], trigger_dict["trigger_command"], trigger_dict["trigger_str"])
)
return pretrigger
def generate_pretrigger_nickname_command(self, trigger_dict):
pretrigger = PreTrigger(
self.bot.nick,
":%s %s %s :%s %s %s" % (trigger_dict["trigger_hostmask"], "PRIVMSG", trigger_dict["trigger_sender"],
trigger_dict["trigger_prefix"], trigger_dict["trigger_command"], trigger_dict["trigger_str"])
)
return pretrigger
def generate_pretrigger_action_command(self, trigger_dict):
pretrigger = PreTrigger(
self.bot.nick,
":%s %s %s :%s%s %s%s %s" % (trigger_dict["trigger_hostmask"], "PRIVMSG", trigger_dict["trigger_sender"],
"\x01", "ACTION", trigger_dict["trigger_command"], trigger_dict["trigger_str"], "\x01")
)
return pretrigger
def get_command_from_trigger(self, trigger):
commstring = trigger.args[1]
if commstring.startswith(tuple(self.config.prefix_list)):
command = commstring[1:].split(" ")[0]
elif commstring.startswith(self.bot.nick):
command = " ".join(commstring.split(" ")[1:]).split(" ")[0]
elif "intent" in trigger.tags and trigger.tags["intent"] == "ACTION":
command = commstring.split(" ")[0]
else:
command = ""
return command
def what_command_type(self, trigger):
full_trigger_str = trigger.args[1]
if full_trigger_str.startswith(tuple(self.config.prefix_list)):
return "command"
elif full_trigger_str.startswith(self.bot.nick):
return "nickname_command"
elif "intent" in trigger.tags and trigger.tags["intent"] == "ACTION":
return "action_command"
else:
return "rule"
def is_real_command(self, trigger_dict):
if trigger_dict["trigger_type"] == "command":
commands_list = self.valid_sopel_commands
elif trigger_dict["trigger_type"] == "nickname_command":
commands_list = self.valid_sopel_nickname_commands
elif trigger_dict["trigger_type"] == "action_command":
commands_list = self.valid_sopel_action_commands
if trigger_dict["trigger_command"] in commands_list:
return True
else:
return False
def is_rulematch(self, function, command_type):
"""Determine if function could be called with a rule match"""
rulematch = False
if command_type == "command":
if hasattr(function, 'commands'):
command_aliases = function.commands
elif command_type == "nickname_command":
if hasattr(function, 'nickname_commands'):
command_aliases = function.nickname_commands
elif command_type == "action_command":
if hasattr(function, 'action_commands'):
command_aliases = function.action_commands
if '(.*)' in command_aliases:
rulematch = True
return rulematch
def get_commands_nosplit(self, trigger):
commands = []
first_full_trigger_str = trigger.args[1]
if first_full_trigger_str.startswith(tuple(self.config.prefix_list)):
first_trigger_type = "command"
first_trigger_prefix = first_full_trigger_str[0]
first_trigger_noprefix = first_full_trigger_str[1:]
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif first_full_trigger_str.startswith(self.bot.nick):
first_trigger_type = "nickname_command"
first_trigger_prefix = str(first_full_trigger_str.split(" ")[0])
first_trigger_noprefix = " ".join(first_full_trigger_str.split(" ")[1:])
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif "intent" in trigger.tags and trigger.tags["intent"] == "ACTION":
first_trigger_type = "action_command"
first_trigger_prefix = "ACTION"
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
else:
first_trigger_type = "rule"
first_trigger_prefix = None
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
commands.append({
"trigger_type": first_trigger_type,
"trigger_prefix": first_trigger_prefix,
"trigger_str": first_trigger_str,
"trigger_command": first_trigger_command,
"trigger_hostmask": trigger.hostmask,
"trigger_sender": trigger.sender,
"trigger_time": str(trigger.time)
})
return commands
def get_commands_split(self, trigger, splitkey=None):
commands = []
# Get split for multiple commands
if splitkey in trigger.args[1]:
triggers = [x.strip() for x in trigger.args[1].split(splitkey)]
else:
triggers = [trigger.args[1]]
first_full_trigger_str = triggers[0]
if first_full_trigger_str.startswith(tuple(self.config.prefix_list)):
first_trigger_type = "command"
first_trigger_prefix = first_full_trigger_str[0]
first_trigger_noprefix = first_full_trigger_str[1:]
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif first_full_trigger_str.startswith(self.bot.nick):
first_trigger_type = "nickname_command"
first_trigger_prefix = str(first_full_trigger_str.split(" ")[0])
first_trigger_noprefix = " ".join(first_full_trigger_str.split(" ")[1:])
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif "intent" in trigger.tags and trigger.tags["intent"] == "ACTION":
first_trigger_type = "action_command"
first_trigger_prefix = "ACTION"
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
else:
first_trigger_type = "rule"
first_trigger_prefix = None
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
commands.append({
"trigger_type": first_trigger_type,
"trigger_prefix": first_trigger_prefix,
"trigger_str": first_trigger_str.replace(splitkey, ""),
"trigger_command": first_trigger_command.replace(splitkey, ""),
"trigger_hostmask": trigger.hostmask,
"trigger_sender": trigger.sender,
"trigger_time": str(trigger.time)
})
if not len(triggers) > 1:
return commands
for full_trigger_str in triggers[1:]:
if full_trigger_str.startswith(tuple(self.config.prefix_list)):
trigger_type = "command"
trigger_prefix = full_trigger_str[0]
trigger_noprefix = full_trigger_str[1:]
trigger_command = trigger_noprefix.split(" ")[0]
trigger_str = " ".join([x.strip() for x in trigger_noprefix.split(" ")[1:]])
elif full_trigger_str.startswith(self.bot.nick):
trigger_type = "nickname_command"
trigger_prefix = str(full_trigger_str.split(" ")[0])
trigger_noprefix = " ".join(full_trigger_str.split(" ")[1:])
trigger_command = trigger_noprefix.split(" ")[0]
trigger_str = " ".join([x.strip() for x in trigger_noprefix.split(" ")[1:]])
elif full_trigger_str.startswith(tuple(["ACTION", "/me"])):
trigger_type = "action_command"
trigger_prefix = "ACTION"
trigger_noprefix = full_trigger_str
trigger_command = trigger_noprefix.split(" ")[0]
trigger_str = " ".join([x.strip() for x in trigger_noprefix.split(" ")[1:]])
else:
trigger_command = full_trigger_str.split(" ")[0]
trigger_str = " ".join([x.strip() for x in full_trigger_str.split(" ")[1:]])
command_types = ["command", "nickname_command", "action_command"]
# Assume same command type until proven otherwise
assumed_trigger_type = first_trigger_type
assumed_trigger_prefix = first_trigger_prefix
# Still under the assumption that the command is most likely the same type as first command
command_types.remove(assumed_trigger_type)
command_types.insert(0, assumed_trigger_type)
found = []
for command_type in command_types:
if command_type == "command":
commands_list = self.valid_sopel_commands
elif command_type == "nickname_command":
commands_list = self.valid_sopel_nickname_commands
elif command_type == "action_command":
commands_list = self.valid_sopel_action_commands
if trigger_command in commands_list:
found.append(command_type)
if len(found):
trigger_type = found[0]
if trigger_type == "command":
trigger_prefix = self.config.prefix_list[0]
elif trigger_type == "nickname_command":
trigger_prefix = "%s," % self.bot.nick
elif trigger_type == "action_command":
trigger_prefix = "ACTION"
else:
trigger_type = assumed_trigger_type
trigger_prefix = assumed_trigger_prefix
if (not full_trigger_str.isspace() and full_trigger_str not in ['', None]
and not trigger_command.isspace() and trigger_command not in ['', None]):
commands.append({
"trigger_type": trigger_type,
"trigger_prefix": trigger_prefix,
"trigger_str": trigger_str.replace(splitkey, ""),
"trigger_command": trigger_command.replace(splitkey, ""),
"trigger_hostmask": trigger.hostmask,
"trigger_sender": trigger.sender,
"trigger_time": str(trigger.time)
})
return commands

View File

@ -0,0 +1,253 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
from sopel.tools import Identifier
from sopel.irc.utils import safe
import threading
import sys
if sys.version_info.major >= 3:
from collections import abc
if sys.version_info.major >= 3:
unicode = str
class Comms():
def __init__(self, config):
self.config = config
self.backend = None
self.sending = threading.RLock()
self.stack = {}
self.hostmask = None
def ircbackend_initialize(self, bot):
self.backend = bot.backend
self.dispatch = bot.dispatch
def hostmask_set(self, bot):
self.hostmask = bot.hostmask
@property
def botnick(self):
return self.config.core.nick
@property
def bothostmask(self):
return self.hostmask
def write(self, args, text=None):
while not self.backend:
pass
args = [safe(arg) for arg in args]
self.backend.send_command(*args, text=text)
def get_message_recipientgroups(self, recipients, text_method):
"""
Split recipients into groups based on server capabilities.
This defaults to 4
Input can be
* unicode string
* a comma-seperated unicode string
* list
* dict_keys handy for list(bot.channels.keys())
"""
if sys.version_info.major >= 3:
if isinstance(recipients, abc.KeysView):
recipients = [x for x in recipients]
if isinstance(recipients, dict):
recipients = [x for x in recipients]
if not isinstance(recipients, list):
recipients = recipients.split(",")
if not len(recipients):
raise ValueError("Recipients list empty.")
if text_method == 'NOTICE':
maxtargets = 4
elif text_method in ['PRIVMSG', 'ACTION']:
maxtargets = 4
maxtargets = int(maxtargets)
recipientgroups = []
while len(recipients):
recipients_part = ','.join(x for x in recipients[-maxtargets:])
recipientgroups.append(recipients_part)
del recipients[-maxtargets:]
return recipientgroups
def get_available_message_bytes(self, recipientgroups, text_method):
"""
Get total available bytes for sending a message line
Total sendable bytes is 512
* 15 are reserved for basic IRC NOTICE/PRIVMSG and a small buffer.
* The bots hostmask plays a role in this count
Note: if unavailable, we calculate the maximum length of a hostmask
* The recipients we send to also is a factor. Multiple recipients reduces
sendable message length
"""
if text_method == 'ACTION':
text_method_bytes = (len('PRIVMSG')
+ len("\x01ACTION \x01")
)
else:
text_method_bytes = len(text_method)
if self.bothostmask:
hostmaskbytes = len((self.bothostmask).encode('utf-8'))
else:
hostmaskbytes = (len((self.botnick).encode('utf-8')) # Bot's NICKLEN
+ 1 # (! separator)
+ len('~') # (for the optional ~ in user)
+ 9 # max username length
+ 1 # (@ separator)
+ 63 # <hostname> has a maximum length of 63 characters.
)
# find the maximum target group length, and use the max
groupbytes = []
for recipients_part in recipientgroups:
groupbytes.append(len((recipients_part).encode('utf-8')))
max_recipients_bytes = max(groupbytes)
allowedLength = (512
- len(':') - hostmaskbytes
- len(' ') - text_method_bytes - len(' ')
- max_recipients_bytes
- len(' :')
- len('\r\n')
)
return allowedLength
def get_sendable_message_list(self, messages, max_length=400):
"""Get a sendable ``text`` message list.
:param str txt: unicode string of text to send
:param int max_length: maximum length of the message to be sendable
:return: a tuple of two values, the sendable text and its excess text
We're arbitrarily saying that the max is 400 bytes of text when
messages will be split. Otherwise, we'd have to account for the bot's
hostmask, which is hard.
The `max_length` is the max length of text in **bytes**, but we take
care of unicode 2-bytes characters, by working on the unicode string,
then making sure the bytes version is smaller than the max length.
"""
if not isinstance(messages, list):
messages = [messages]
messages_list = ['']
message_padding = 4 * " "
for message in messages:
if len((messages_list[-1] + message_padding + message).encode('utf-8')) <= max_length:
if messages_list[-1] == '':
messages_list[-1] = message
else:
messages_list[-1] = messages_list[-1] + message_padding + message
else:
text_list = []
while len(message.encode('utf-8')) > max_length and not message.isspace():
last_space = message.rfind(' ', 0, max_length)
if last_space == -1:
# No last space, just split where it is possible
splitappend = message[:max_length]
if not splitappend.isspace():
text_list.append(splitappend)
message = message[max_length:]
else:
# Split at the last best space found
splitappend = message[:last_space]
if not splitappend.isspace():
text_list.append(splitappend)
message = message[last_space:]
if len(message.encode('utf-8')) and not message.isspace():
text_list.append(message)
messages_list.extend(text_list)
return messages_list
def osd(self, messages, recipients=None, text_method='PRIVMSG', max_messages=-1):
"""Send ``text`` as a PRIVMSG, CTCP ACTION, or NOTICE to ``recipients``.
In the context of a triggered callable, the ``recipient`` defaults to
the channel (or nickname, if a private message) from which the message
was received.
By default, unless specified in the configuration file, there is some
built-in flood protection. Messages displayed over 5 times in 2 minutes
will be displayed as '...'.
The ``recipient`` can be in list format or a comma seperated string,
with the ability to send to multiple recipients simultaneously. The
default recipients that the bot will send to is 4 if the IRC server
doesn't specify a limit for TARGMAX.
Text can be sent to this function in either string or list format.
List format will insert as small buffering space between entries in the
list.
There are 512 bytes available in a single IRC message. This includes
hostmask of the bot as well as around 15 bytes of reserved IRC message
type. This also includes the destinations/recipients of the message.
This will split given strings/lists into a displayable format as close
to the maximum 512 bytes as possible.
If ``max_messages`` is given, the split mesage will display in as many
lines specified by this argument. Specifying ``0`` or a negative number
will display without limitation. By default this is set to ``-1`` when
called directly. When called from the say/msg/reply/notice/action it
will default to ``1``.
"""
if not hasattr(self, 'stack'):
self.stack = {}
text_method = text_method.upper()
if text_method == 'SAY' or text_method not in ['NOTICE', 'ACTION']:
text_method = 'PRIVMSG'
recipientgroups = self.get_message_recipientgroups(recipients, text_method)
available_bytes = self.get_available_message_bytes(recipientgroups, text_method)
messages_list = self.get_sendable_message_list(messages, available_bytes)
if max_messages >= 1:
messages_list = messages_list[:max_messages]
text_method_orig = text_method
for recipientgroup in recipientgroups:
text_method = text_method_orig
recipient_id = Identifier(recipientgroup)
recipient_stack = self.stack.setdefault(recipient_id, {
'messages': [],
'flood_left': 999,
'dots': 0,
})
recipient_stack['dots'] = 0
with self.sending:
for text in messages_list:
if recipient_stack['dots'] <= 3:
if text_method == 'ACTION':
text = '\001ACTION {}\001'.format(text)
self.write(('PRIVMSG', recipientgroup), text)
text_method = 'PRIVMSG'
elif text_method == 'NOTICE':
self.write(('NOTICE', recipientgroup), text)
else:
self.write(('PRIVMSG', recipientgroup), text)
def __getattr__(self, name):
"""
Quick and dirty shortcuts. Will only get called for undefined attributes.
"""
if hasattr(self.bot, name):
return eval("self.bot.%s" % name)

View File

@ -0,0 +1,45 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
import sys
import os
from sopel.cli.run import build_parser, get_configuration
class Config():
def __init__(self, script_dir):
self.script_dir = script_dir
# Load config
self.config = get_configuration(self.get_opts())
@property
def basename(self):
return os.path.basename(self.config.filename).rsplit('.', 1)[0]
@property
def prefix_list(self):
return str(self.config.core.prefix).replace("\\", '').split("|")
def define_section(self, name, cls_, validate=True):
return self.config.define_section(name, cls_, validate)
def get_opts(self):
parser = build_parser()
if not len(sys.argv[1:]):
argv = ['legacy']
else:
argv = sys.argv[1:]
return parser.parse_args(argv)
def __getattr__(self, name):
''' will only get called for undefined attributes '''
"""We will try to find a core value, or return None"""
if hasattr(self.config, name):
return eval("self.config." + name)
else:
return None

View File

@ -0,0 +1,294 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
import json
from sopel.tools import Identifier
from sopel.db import SopelDB, NickValues, ChannelValues, PluginValues
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
BASE = declarative_base()
class SpiceDB(object):
# NICK FUNCTIONS
def adjust_nick_value(self, nick, key, value):
"""Sets the value for a given key to be associated with the nick."""
nick = Identifier(nick)
value = json.dumps(value, ensure_ascii=False)
nick_id = self.get_nick_id(nick)
session = self.ssession()
try:
result = session.query(NickValues) \
.filter(NickValues.nick_id == nick_id) \
.filter(NickValues.key == key) \
.one_or_none()
# NickValue exists, update
if result:
result.value = float(result.value) + float(value)
session.commit()
# DNE - Insert
else:
new_nickvalue = NickValues(nick_id=nick_id, key=key, value=float(value))
session.add(new_nickvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
def adjust_nick_list(self, nick, key, entries, adjustmentdirection):
"""Sets the value for a given key to be associated with the nick."""
nick = Identifier(nick)
if not isinstance(entries, list):
entries = [entries]
entries = json.dumps(entries, ensure_ascii=False)
nick_id = self.get_nick_id(nick)
session = self.ssession()
try:
result = session.query(NickValues) \
.filter(NickValues.nick_id == nick_id) \
.filter(NickValues.key == key) \
.one_or_none()
# NickValue exists, update
if result:
if adjustmentdirection == 'add':
for entry in entries:
if entry not in result.value:
result.value.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in result.value:
result.value.remove(entry)
session.commit()
# DNE - Insert
else:
values = []
if adjustmentdirection == 'add':
for entry in entries:
if entry not in values:
values.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in values:
values.remove(entry)
new_nickvalue = NickValues(nick_id=nick_id, key=key, value=values)
session.add(new_nickvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
# CHANNEL FUNCTIONS
def adjust_channel_value(self, channel, key, value):
"""Sets the value for a given key to be associated with the channel."""
channel = Identifier(channel).lower()
value = json.dumps(value, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(ChannelValues) \
.filter(ChannelValues.channel == channel)\
.filter(ChannelValues.key == key) \
.one_or_none()
# ChannelValue exists, update
if result:
result.value = float(result.value) + float(value)
session.commit()
# DNE - Insert
else:
new_channelvalue = ChannelValues(channel=channel, key=key, value=float(value))
session.add(new_channelvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
def adjust_channel_list(self, channel, key, entries, adjustmentdirection):
"""Sets the value for a given key to be associated with the channel."""
channel = Identifier(channel).lower()
if not isinstance(entries, list):
entries = [entries]
entries = json.dumps(entries, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(ChannelValues) \
.filter(ChannelValues.channel == channel)\
.filter(ChannelValues.key == key) \
.one_or_none()
# ChannelValue exists, update
if result:
if adjustmentdirection == 'add':
for entry in entries:
if entry not in result.value:
result.value.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in result.value:
result.value.remove(entry)
session.commit()
# DNE - Insert
else:
values = []
if adjustmentdirection == 'add':
for entry in entries:
if entry not in values:
values.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in values:
values.remove(entry)
new_channelvalue = ChannelValues(channel=channel, key=key, value=values)
session.add(new_channelvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
# PLUGIN FUNCTIONS
def adjust_plugin_value(self, plugin, key, value):
"""Sets the value for a given key to be associated with the plugin."""
plugin = plugin.lower()
value = json.dumps(value, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(PluginValues) \
.filter(PluginValues.plugin == plugin)\
.filter(PluginValues.key == key) \
.one_or_none()
# PluginValue exists, update
if result:
result.value = float(result.value) + float(value)
session.commit()
# DNE - Insert
else:
new_pluginvalue = PluginValues(plugin=plugin, key=key, value=float(value))
session.add(new_pluginvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
def adjust_plugin_list(self, plugin, key, entries, adjustmentdirection):
"""Sets the value for a given key to be associated with the plugin."""
plugin = plugin.lower()
if not isinstance(entries, list):
entries = [entries]
entries = json.dumps(entries, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(PluginValues) \
.filter(PluginValues.plugin == plugin)\
.filter(PluginValues.key == key) \
.one_or_none()
# PluginValue exists, update
if result:
if adjustmentdirection == 'add':
for entry in entries:
if entry not in result.value:
result.value.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in result.value:
result.value.remove(entry)
session.commit()
# DNE - Insert
else:
values = []
if adjustmentdirection == 'add':
for entry in entries:
if entry not in values:
values.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in values:
values.remove(entry)
new_pluginvalue = PluginValues(plugin=plugin, key=key, value=values)
session.add(new_pluginvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
class Database():
def __init__(self, config):
SopelDB.adjust_nick_value = SpiceDB.adjust_nick_value
SopelDB.adjust_nick_list = SpiceDB.adjust_nick_list
SopelDB.adjust_channel_value = SpiceDB.adjust_channel_value
SopelDB.adjust_channel_list = SpiceDB.adjust_channel_list
SopelDB.adjust_plugin_value = SpiceDB.adjust_plugin_value
SopelDB.adjust_plugin_list = SpiceDB.adjust_plugin_list
self.db = SopelDB(config)
BASE.metadata.create_all(self.db.engine)
@property
def botnick(self):
return self.config.core.nick
def __getattr__(self, name):
''' will only get called for undefined attributes '''
if hasattr(self.db, name):
return eval("self.db." + name)
else:
return None
"""Nick"""
def adjust_nick_value(self, nick, key, value):
return self.db.adjust_nick_value(nick, key, value)
def adjust_nick_list(self, nick, key, entries, adjustmentdirection):
return self.db.adjust_nick_list(nick, key, entries, adjustmentdirection)
"""Bot"""
def get_bot_value(self, key):
return self.db.get_nick_value(self.botnick, key)
def set_bot_value(self, key, value):
return self.db.set_nick_value(self.botnick, key, value)
def delete_bot_value(self, key):
return self.db.delete_nick_value(self.botnick, key)
def adjust_bot_value(self, key, value):
return self.db.adjust_nick_value(self.botnick, key, value)
def adjust_bot_list(self, key, entries, adjustmentdirection):
return self.db.adjust_nick_list(self.botnick, key, entries, adjustmentdirection)
"""Channels"""
def adjust_channel_value(self, channel, key, value):
return self.db.adjust_channel_value(channel, key, value)
def adjust_channel_list(self, nick, key, entries, adjustmentdirection):
return self.db.adjust_channel_list(nick, key, entries, adjustmentdirection)
"""Plugins"""
def adjust_plugin_value(self, plugin, key, value):
return self.db.adjust_plugin_value(plugin, key, value)
def adjust_plugin_list(self, plugin, key, entries, adjustmentdirection):
return self.db.adjust_plugin_list(plugin, key, entries, adjustmentdirection)

View File

@ -0,0 +1,147 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
"""
This is the SpiceBot events system.
We utilize the Sopel code for event numbers and
self-trigger the bot into performing actions
"""
from sopel.trigger import PreTrigger
import functools
import threading
import time
class Events(object):
"""A dynamic listing of all the notable Bot numeric events.
Events will be assigned a 4-digit number above 1000.
This allows you to do, ``@plugin.event(events.BOT_WELCOME)````
Triggers handled by this plugin will be processed immediately.
Others will be placed into a queue.
Triggers will be logged by ID and content
"""
def __init__(self, logger):
self.logger = logger
self.lock = threading.Lock()
# This is a defined IRC event
self.RPL_WELCOME = '001'
self.RPL_MYINFO = '004'
self.RPL_ISUPPORT = '005'
self.RPL_WHOREPLY = '352'
self.RPL_NAMREPLY = '353'
# this is an unrealircd event
self.RPL_WHOISREGNICK = '307'
# These Are Speicebot generated events
self.BOT_UPTIME = time.time()
self.BOT_WELCOME = '1001'
self.BOT_READY = '1002'
self.BOT_CONNECTED = '1003'
self.BOT_LOADED = '1004'
self.BOT_RECONNECTED = '1005'
self.defaultevents = [self.BOT_WELCOME, self.BOT_READY, self.BOT_CONNECTED, self.BOT_LOADED, self.BOT_RECONNECTED]
self.dict = {
"assigned_IDs": {1000: "BOT_UPTIME", 1001: "BOT_WELCOME", 1002: "BOT_READY", 1003: "BOT_CONNECTED", 1004: "BOT_LOADED", 1005: "BOT_RECONNECTED"},
"triggers_recieved": {},
"trigger_queue": [],
"startup_required": [self.BOT_WELCOME, self.BOT_READY, self.BOT_CONNECTED],
"RPL_WELCOME_Count": 0
}
def __getattr__(self, name):
''' will only get called for undefined attributes '''
self.lock.acquire()
eventnumber = max(list(self.dict["assigned_IDs"].keys())) + 1
self.dict["assigned_IDs"][eventnumber] = str(name)
setattr(self, name, str(eventnumber))
self.lock.release()
return str(eventnumber)
def trigger(self, bot, number, message="SpiceBot_Events"):
pretriggerdict = {"number": str(number), "message": message}
if number in self.defaultevents:
self.dispatch(bot, pretriggerdict)
else:
self.dict["trigger_queue"].append(pretriggerdict)
def dispatch(self, bot, pretriggerdict):
number = pretriggerdict["number"]
message = pretriggerdict["message"]
pretrigger = PreTrigger(
bot.nick,
":SpiceBot_Events %s %s :%s" % (number, bot.nick, message)
)
bot.dispatch(pretrigger)
self.recieved({"number": number, "message": message})
def recieved(self, trigger):
self.lock.acquire()
if isinstance(trigger, dict):
eventnumber = str(trigger["number"])
message = str(trigger["message"])
else:
eventnumber = str(trigger.event)
message = trigger.args[1]
self.logger.info('SpiceBot_Events: %s %s' % (eventnumber, message))
if eventnumber not in self.dict["triggers_recieved"]:
self.dict["triggers_recieved"][eventnumber] = []
self.dict["triggers_recieved"][eventnumber].append(message)
self.lock.release()
def check(self, checklist):
if not isinstance(checklist, list):
checklist = [str(checklist)]
for number in checklist:
if str(number) not in list(self.dict["triggers_recieved"].keys()):
return False
return True
def startup_add(self, startlist):
self.lock.acquire()
if not isinstance(startlist, list):
startlist = [str(startlist)]
for eventitem in startlist:
if eventitem not in self.dict["startup_required"]:
self.dict["startup_required"].append(eventitem)
self.lock.release()
def startup_check(self):
for number in self.dict["startup_required"]:
if str(number) not in list(self.dict["triggers_recieved"].keys()):
return False
return True
def startup_debug(self):
not_done = []
for number in self.dict["startup_required"]:
if str(number) not in list(self.dict["triggers_recieved"].keys()):
not_done.append(int(number))
reference_not_done = []
for item in not_done:
reference_not_done.append(str(self.dict["assigned_IDs"][item]))
return reference_not_done
def check_ready(self, checklist):
def actual_decorator(function):
@functools.wraps(function)
def _nop(*args, **kwargs):
while not self.check(checklist):
pass
return function(*args, **kwargs)
return _nop
return actual_decorator
def startup_check_ready(self):
def actual_decorator(function):
@functools.wraps(function)
def _nop(*args, **kwargs):
while not self.startup_check():
pass
return function(*args, **kwargs)
return _nop
return actual_decorator

View File

@ -0,0 +1,18 @@
from sopel import tools
class Logger():
def __init__(self):
self.logger = tools.get_logger('SpiceBot')
def __getattr__(self, name):
"""
Quick and dirty shortcuts. Will only get called for undefined attributes.
"""
if hasattr(self.logger, name):
return eval("self.logger.%s" % name)
elif hasattr(self.logger, name.lower()):
return eval("self.logger.%s" % name.lower())

View File

@ -0,0 +1,111 @@
import os
import sys
import platform
import pathlib
import json
import re
class Versions():
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.dict = {}
self.register_spicebot()
self.register_env()
def get_core_versions(self):
returndict = {}
for item in list(self.dict.keys()):
if self.dict[item]["type"] == "SpiceBot":
returndict[item] = self.dict[item].copy()
return returndict
def register_version(self, item_name, item_version, item_type):
"""
Register a version item.
"""
self.logger.debug("Registering %s item: %s %s" % (item_type, item_name, item_version))
self.dict[item_name] = {
"name": item_name,
"version": item_version,
"type": item_type
}
def register_spicebot(self):
"""
Register core version items.
"""
version_file = pathlib.Path(self.config.script_dir).joinpath("version.json")
with open(version_file, 'r') as jsonversion:
versions = json.load(jsonversion)
for key in list(versions.keys()):
self.register_version(key, versions[key], "SpiceBot")
def is_docker(self):
path = "/proc/self/cgroup"
if not os.path.isfile(path):
return False
with open(path) as f:
for line in f:
if re.match("\d+:[\w=]+:/docker(-[ce]e)?/\w+", line):
return True
return False
def is_virtualenv(self):
# return True if started from within a virtualenv or venv
base_prefix = getattr(sys, "base_prefix", None)
# real_prefix will return None if not in a virtualenv enviroment or the default python path
real_prefix = getattr(sys, "real_prefix", None) or sys.prefix
return base_prefix != real_prefix
def register_env(self):
"""
Register env version items.
"""
self.register_version("Python", sys.version, "env")
if sys.version_info.major == 2 or sys.version_info < (3, 7):
self.logger.error('Error: SpiceBot requires python 3.7+. Do NOT expect support for older versions of python.')
opersystem = platform.system()
self.register_version("Operating System", opersystem, "env")
system_alias = platform.release()
self.register_version("OS Release", system_alias, "env")
if opersystem in ["Linux", "Darwin"]:
# Linux/Mac
if os.getuid() == 0 or os.geteuid() == 0:
self.logger.warning('Do not run SpiceBot with root privileges.')
elif opersystem in ["Windows"]:
# Windows
if os.environ.get("USERNAME") == "Administrator":
self.logger.warning('Do not run SpiceBot as Administrator.')
else:
# ['Java']
if not len(opersystem):
os_string = "."
else:
os_string = ": %s" % opersystem
self.logger.warning("Uncommon Operating System, use at your own risk%s" % os_string)
cpu_type = platform.machine()
self.register_version("CPU Type", cpu_type, "env")
isvirtualenv = self.is_virtualenv()
self.register_version("Virtualenv", isvirtualenv, "env")
isdocker = self.is_docker()
self.register_version("Docker", isdocker, "env")

View File

@ -0,0 +1,18 @@
# coding=utf8
"""SpiceBot
A Niche Wrapper around Sopel
"""
from __future__ import unicode_literals, absolute_import, division, print_function
import os
import pathlib
from .SBCore import SpiceBotCore_OBJ
SCRIPT_DIR = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
sb = SpiceBotCore_OBJ(SCRIPT_DIR)
def setup(bot):
sb.setup(bot)

View File

@ -0,0 +1,3 @@
{
"SpiceBot": "v0.9.3-beta"
}

View File

@ -0,0 +1,148 @@
import functools
from sopel_SpiceBot_Core_1 import sb
def prerun():
"""This decorator is the hub of handling for all SpiceBot Commands"""
def actual_decorator(function):
@functools.wraps(function)
def internal_prerun(bot, trigger, *args, **kwargs):
comrun = ComRun(function, trigger)
# Since there was more than one command,
# we are going to redispatch commands
# This will give sopel the appearance of recieving individual commands
if comrun.is_multi_command:
if len(comrun.commands) > 1:
for trigger_dict in comrun.commands:
sb.commands.dispatch(trigger_dict)
return
# If the original trigger is not the same after splits
# so we will now redispatch to help get the correct function passed
if comrun.has_command_been_sanitized:
if comrun.is_pipe_command:
trigger_dict = rebuild_pipes(comrun.commands)
else:
trigger_dict = comrun.command
sb.commands.dispatch(trigger_dict)
return
if comrun.is_rulematch and comrun.is_real_command:
return
# Run function
function(bot, trigger, comrun, *args, **kwargs)
# if not comrun.is_pipe_command:
# bot.say(comrun.say)
return internal_prerun
return actual_decorator
def rebuild_pipes(commands):
repipe_trigger_dict = commands[0]
for trigger_dict in commands[1:]:
if trigger_dict["trigger_type"] == "command":
repipe_trigger_dict["trigger_str"] += " %s %s%s %s" % (sb.commands.pipe_split_key,
trigger_dict["trigger_prefix"],
trigger_dict["trigger_command"],
trigger_dict["trigger_str"])
elif trigger_dict["trigger_type"] == "nickname_command":
repipe_trigger_dict["trigger_str"] += " %s %s %s %s" % (sb.commands.pipe_split_key,
trigger_dict["trigger_prefix"],
trigger_dict["trigger_command"],
trigger_dict["trigger_str"])
elif trigger_dict["trigger_type"] == "action_command":
repipe_trigger_dict["trigger_str"] += " %s %s %s %s" % (sb.commands.pipe_split_key,
"/me",
trigger_dict["trigger_command"],
trigger_dict["trigger_str"])
return repipe_trigger_dict
class ComRun():
def __init__(self, function, trigger):
self.function = function
self.trigger = trigger
@property
def is_real_command(self):
return sb.commands.is_real_command(self.command)
@property
def command_type(self):
return sb.commands.what_command_type(self.trigger)
@property
def is_rulematch(self):
return sb.commands.is_rulematch(self.function, self.command_type)
@property
def is_multi_command(self):
if sb.commands.multi_split_key in self.trigger.args[1]:
return True
return False
@property
def is_pipe_command(self):
if sb.commands.pipe_split_key in self.trigger.args[1]:
return True
return False
@property
def multi_split_count(self):
if self.is_multi_command:
return len([x.strip() for x in self.trigger.args[1].split(sb.commands.multi_split_key)])
return "N/A"
@property
def pipe_split_count(self):
if self.is_pipe_command:
return len([x.strip() for x in self.trigger.args[1].split(sb.commands.pipe_split_key)])
return "N/A"
@property
def commands(self):
if self.is_multi_command:
return sb.commands.get_commands_split(self.trigger, sb.commands.multi_split_key)
elif self.is_pipe_command:
return sb.commands.get_commands_split(self.trigger, sb.commands.pipe_split_key)
else:
return sb.commands.get_commands_nosplit(self.trigger)
@property
def command(self):
return self.commands[0]
@property
def has_command_been_sanitized(self):
trigger_command = sb.commands.get_command_from_trigger(self.trigger)
multi_split_count = self.multi_split_count
pipe_split_count = self.pipe_split_count
if trigger_command != self.command["trigger_command"]:
return True
elif multi_split_count != "N/A":
if multi_split_count != len(self.commands):
return True
elif pipe_split_count != "N/A":
if pipe_split_count != len(self.commands):
return True
return False

View File

@ -0,0 +1,81 @@
# coding=utf8
"""SpiceBot
A Niche Wrapper around Sopel
"""
from __future__ import unicode_literals, absolute_import, division, print_function
from threading import Thread
from sopel import plugin
from sopel_SpiceBot_Core_1 import sb
"""
Events
"""
@plugin.event("001")
@plugin.rule('.*')
def welcome_setup_start(bot, trigger):
sb.comms.ircbackend_initialize(bot)
@plugin.event(sb.events.BOT_CONNECTED)
@plugin.rule('.*')
def bot_events_start_set_hostmask(bot, trigger):
sb.comms.hostmask_set(bot)
@plugin.event(sb.events.BOT_WELCOME, sb.events.BOT_READY, sb.events.BOT_CONNECTED, sb.events.BOT_LOADED)
@plugin.rule('.*')
def bot_events_complete(bot, trigger):
"""This is here simply to log to stderr that this was recieved."""
sb.logger.info('SpiceBot_Events: %s' % trigger.args[1])
@plugin.event(sb.events.RPL_WELCOME)
@plugin.rule('.*')
def bot_events_connected(bot, trigger):
# Handling for connection count
sb.events.dict["RPL_WELCOME_Count"] += 1
if sb.events.dict["RPL_WELCOME_Count"] > 1:
sb.events.trigger(bot, sb.events.BOT_RECONNECTED, "Bot ReConnected to IRC")
else:
sb.events.trigger(bot, sb.events.BOT_WELCOME, "Welcome to the SpiceBot Events System")
"""For items tossed in a queue, this will trigger them accordingly"""
Thread(target=events_thread, args=(bot,)).start()
def events_thread(bot):
while True:
if len(sb.events.dict["trigger_queue"]):
pretriggerdict = sb.events.dict["trigger_queue"][0]
sb.events.dispatch(bot, pretriggerdict)
try:
del sb.events.dict["trigger_queue"][0]
except IndexError:
pass
@plugin.event(sb.events.BOT_WELCOME)
@plugin.rule('.*')
def bot_events_start(bot, trigger):
"""This stage is redundant, but shows the system is working."""
sb.events.trigger(bot, sb.events.BOT_READY, "Ready To Process plugin setup procedures")
"""Here, we wait until we are in at least one channel"""
while not len(list(bot.channels.keys())) > 0:
pass
sb.events.trigger(bot, sb.events.BOT_CONNECTED, "Bot Connected to IRC")
@sb.events.startup_check_ready()
@plugin.event(sb.events.BOT_READY)
@plugin.rule('.*')
def bot_events_startup_complete(bot, trigger):
"""All events registered as required for startup have completed"""
sb.events.trigger(bot, sb.events.BOT_LOADED, "All registered plugins setup procedures have completed")

View File

@ -0,0 +1,13 @@
from sopel import plugin
from sopel_SpiceBot_Core_1 import sb
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.action_command('test')
def sb_test_commands(bot, trigger, comrun):
bot.say("%s" % trigger.raw)

View File

@ -0,0 +1,32 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('test', "testnew")
def commands_test(bot, trigger, comrun):
bot.say("%s" % trigger.raw)
@prerun()
@plugin.command('testa')
def commands_test_a(bot, trigger, comrun):
bot.say("test a")
bot.say("%s" % trigger.raw)
@prerun()
@plugin.command('testb')
def commands_test_b(bot, trigger, comrun):
bot.say("test b")
bot.say("%s" % trigger.raw)
@plugin.command('testc')
def commands_test_c(bot, trigger, comrun):
bot.say("test c")
bot.say("test c: %s" % trigger.raw)

View File

@ -0,0 +1,44 @@
from sopel import plugin
from sopel_SpiceBot_Core_1 import sb
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.nickname_command('test')
def sb_test_commands(bot, trigger, comrun):
bot.say("%s" % trigger.raw)
@plugin.nickname_command('plugins')
def sb_test_command_groups(bot, trigger, comrun):
for bplugin in list(bot._plugins.keys()):
sb.osd(str(bot._plugins[bplugin].get_meta_description()), trigger.sender)
@prerun()
@plugin.nickname_command('commands')
def sopel_commands(bot, trigger, comrun):
bot.say("testing commands")
sb.osd("%s" % sb.commands.sopel_commands, trigger.sender)
@prerun()
@plugin.nickname_command('nickname_commands')
def sopel_nickname_commands(bot, trigger, comrun):
bot.say("testing nickname_commands")
sb.osd("%s" % sb.commands.sopel_nickname_commands, trigger.sender)
@prerun()
@plugin.nickname_command('action_commands')
def sopel_action_commands(bot, trigger, comrun):
bot.say("testing action_commands")
sb.osd("%s" % sb.commands.sopel_action_commands, trigger.sender)

View File

@ -0,0 +1,22 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('(.*)')
def rule_command(bot, trigger, comrun):
bot.say("%s" % trigger.raw)
@prerun()
@plugin.nickname_command('(.*)')
def rule_nickname_command(bot, trigger, comrun):
bot.say("%s" % trigger.raw)
@prerun()
@plugin.action_command('(.*)')
def rule_action_command(bot, trigger, comrun):
bot.say("%s" % trigger.raw)

View File

@ -0,0 +1,12 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.nickname_command('lower')
@plugin.command('lower')
def lower(bot, trigger, comrun):
comrun.say = str(comrun.trigger_dict["trigger_str"]).lower()

View File

@ -0,0 +1,11 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('upper')
def upper(bot, trigger, comrun):
comrun.say = str(comrun.trigger_dict["trigger_str"]).upper()

334
spicemanip/__init__.py Normal file
View File

@ -0,0 +1,334 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Top-level package for spicemanip."""
__author__ = """Sam Zick"""
__email__ = 'sam@deathbybandaid.net'
__version__ = '0.1.8'
import random
import collections
# TODO 'this*that' or '1*that' replace either all strings matching, or an index value
# TODO reverse sort z.sort(reverse = True)
# list.extend adds lists to eachother
class Spicemanip():
def __init__(self):
pass
def __call__(self, inputs, outputtask, output_type='default'):
mainoutputtask, suboutputtask = None, None
# Input needs to be a list, but don't split a word into letters
if not inputs:
inputs = []
if isinstance(inputs, collections.abc.KeysView):
inputs = list(inputs)
elif isinstance(inputs, dict):
inputs = list(inputs.keys())
if not isinstance(inputs, list):
inputs = list(inputs.split(" "))
inputs = [x for x in inputs if x and x not in ['', ' ']]
inputs = [inputspart.strip() for inputspart in inputs]
# Create return
if outputtask == 'create':
return inputs
# Make temparray to preserve original order
temparray = []
for inputpart in inputs:
temparray.append(inputpart)
inputs = temparray
# Convert outputtask to standard
if outputtask in [0, 'complete']:
outputtask = 'string'
elif outputtask == 'index':
mainoutputtask = inputs[1]
suboutputtask = inputs[2]
inputs = inputs[0]
elif str(outputtask).isdigit():
mainoutputtask, outputtask = int(outputtask), 'number'
elif "^" in str(outputtask):
mainoutputtask = str(outputtask).split("^", 1)[0]
suboutputtask = str(outputtask).split("^", 1)[1]
outputtask = 'rangebetween'
if int(suboutputtask) < int(mainoutputtask):
mainoutputtask, suboutputtask = suboutputtask, mainoutputtask
elif str(outputtask).startswith("split_"):
mainoutputtask = str(outputtask).replace("split_", "")
outputtask = 'split'
elif str(outputtask).endswith(tuple(["!", "+", "-", "<", ">"])):
mainoutputtask = str(outputtask)
if str(outputtask).endswith("!"):
outputtask = 'exclude'
if str(outputtask).endswith("+"):
outputtask = 'incrange_plus'
if str(outputtask).endswith("-"):
outputtask = 'incrange_minus'
if str(outputtask).endswith(">"):
outputtask = 'excrange_plus'
if str(outputtask).endswith("<"):
outputtask = 'excrange_minus'
for r in (("!", ""), ("+", ""), ("-", ""), ("<", ""), (">", "")):
mainoutputtask = mainoutputtask.replace(*r)
if mainoutputtask == 'last':
mainoutputtask = len(inputs)
if outputtask == 'string':
returnvalue = inputs
else:
returnvalue = eval(
'self.' + outputtask +
'(inputs, outputtask, mainoutputtask, suboutputtask)')
# default return if not specified
if output_type == 'default':
if outputtask in [
'string', 'number', 'rangebetween', 'exclude', 'random',
'incrange_plus', 'incrange_minus', 'excrange_plus',
'excrange_minus'
]:
output_type = 'string'
elif outputtask in ['count']:
output_type = 'dict'
# verify output is correct
if output_type == 'return':
return returnvalue
if output_type == 'string':
if isinstance(returnvalue, list):
returnvalue = ' '.join(returnvalue)
elif output_type in ['list', 'array']:
if not isinstance(returnvalue, list):
returnvalue = list(returnvalue.split(" "))
returnvalue = [x for x in returnvalue if x and x not in ['', ' ']]
returnvalue = [inputspart.strip() for inputspart in returnvalue]
return returnvalue
# compare 2 lists, based on the location of an index item, passthrough needs to be [indexitem, arraytoindex, arraytocompare]
def index(self, indexitem, outputtask, arraytoindex, arraytocompare):
item = ''
for x, y in zip(arraytoindex, arraytocompare):
if x == indexitem:
item = y
return item
# split list by string
def split(self, inputs, outputtask, mainoutputtask, suboutputtask):
split_array = []
restring = ' '.join(inputs)
if mainoutputtask not in inputs:
split_array = [restring]
else:
split_array = restring.split(mainoutputtask)
split_array = [x for x in split_array if x and x not in ['', ' ']]
split_array = [inputspart.strip() for inputspart in split_array]
if split_array == []:
split_array = [[]]
return split_array
# dedupe list
def dedupe(self, inputs, outputtask, mainoutputtask, suboutputtask):
newlist = []
for inputspart in inputs:
if inputspart not in newlist:
newlist.append(inputspart)
return newlist
# Sort list
def sort(self, inputs, outputtask, mainoutputtask, suboutputtask):
return sorted(inputs)
# reverse sort list
def rsort(self, inputs, outputtask, mainoutputtask, suboutputtask):
return sorted(inputs)[::-1]
# count items in list, return dictionary
def count(self, inputs, outputtask, mainoutputtask, suboutputtask):
returndict = dict()
if not len(inputs):
return returndict
uniqueinputitems, uniquecount = [], []
for inputspart in inputs:
if inputspart not in uniqueinputitems:
uniqueinputitems.append(inputspart)
for uniqueinputspart in uniqueinputitems:
count = 0
for ele in inputs:
if (ele == uniqueinputspart):
count += 1
uniquecount.append(count)
for inputsitem, unumber in zip(uniqueinputitems, uniquecount):
returndict[inputsitem] = unumber
return returndict
# random item from list
def random(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
randomselectlist = []
for temppart in inputs:
randomselectlist.append(temppart)
while len(randomselectlist) > 1:
random.shuffle(randomselectlist)
randomselect = randomselectlist[random.randint(
0,
len(randomselectlist) - 1)]
randomselectlist.remove(randomselect)
randomselect = randomselectlist[0]
return randomselect
# remove random item from list
def exrandom(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return []
randremove = self.random(inputs, outputtask, mainoutputtask, suboutputtask)
inputs.remove(randremove)
return inputs
# Convert list into lowercase
def lower(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return [inputspart.lower() for inputspart in inputs]
# Convert list to uppercase
def upper(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return [inputspart.upper() for inputspart in inputs]
# Convert list to uppercase
def title(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return [inputspart.title() for inputspart in inputs]
# Reverse List Order
def reverse(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return []
return inputs[::-1]
# comma seperated list
def list(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return ', '.join(str(x) for x in inputs)
def list_nospace(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return ','.join(str(x) for x in inputs)
# comma seperated list with and
def andlist(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
if len(inputs) < 2:
return ' '.join(inputs)
lastentry = str("and " + str(inputs[len(inputs) - 1]))
del inputs[-1]
inputs.append(lastentry)
if len(inputs) == 2:
return ' '.join(inputs)
return ', '.join(str(x) for x in inputs)
# comma seperated list with or
def orlist(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
if len(inputs) < 2:
return ' '.join(inputs)
lastentry = str("or " + str(inputs[len(inputs) - 1]))
del inputs[-1]
inputs.append(lastentry)
if len(inputs) == 2:
return ' '.join(inputs)
return ', '.join(str(x) for x in inputs)
# exclude number
def exclude(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
del inputs[int(mainoutputtask) - 1]
return ' '.join(inputs)
# Convert list to string
def string(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return ' '.join(inputs)
# Get number item from list
def number(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
elif int(mainoutputtask) > len(inputs) or int(mainoutputtask) < 0:
return ''
else:
return inputs[int(mainoutputtask) - 1]
# Get Last item from list
def last(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return inputs[len(inputs) - 1]
# range between items in list
def rangebetween(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
if not str(mainoutputtask).isdigit() or not str(
suboutputtask).isdigit():
return ''
mainoutputtask, suboutputtask = int(mainoutputtask), int(suboutputtask)
if suboutputtask == mainoutputtask:
return self.number(inputs, outputtask, mainoutputtask, suboutputtask)
if suboutputtask < mainoutputtask:
return []
if mainoutputtask < 0:
mainoutputtask = 1
if suboutputtask > len(inputs):
suboutputtask = len(inputs)
newlist = []
for i in range(mainoutputtask, suboutputtask + 1):
newlist.append(
str(
self.number(inputs, outputtask, i, suboutputtask)))
if newlist == []:
return ''
return ' '.join(newlist)
# Forward Range includes index number
def incrange_plus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, int(mainoutputtask), len(inputs))
# Reverse Range includes index number
def incrange_minus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, 1, int(mainoutputtask))
# Forward Range excludes index number
def excrange_plus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, int(mainoutputtask) + 1, len(inputs))
# Reverse Range excludes index number
def excrange_minus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, 1, int(mainoutputtask) - 1)
spicemanip = Spicemanip()