Compare commits

...

154 Commits
master ... dev

Author SHA1 Message Date
deathbybandaid
c861fdfe71 test 2023-01-24 11:28:50 -05:00
deathbybandaid
3c141f9cf2 test 2023-01-24 11:28:17 -05:00
deathbybandaid
d65a9ac8f8 test 2023-01-24 11:27:17 -05:00
deathbybandaid
520b1d7988 test 2023-01-24 11:20:40 -05:00
deathbybandaid
37ed83057c test 2023-01-24 11:19:59 -05:00
deathbybandaid
3000c16966 test 2023-01-24 11:19:15 -05:00
deathbybandaid
e382ab5740 test 2023-01-24 11:17:31 -05:00
deathbybandaid
ce9ef93c5a test 2023-01-24 11:12:50 -05:00
deathbybandaid
32526e7487 test 2023-01-24 11:10:35 -05:00
deathbybandaid
74b368bc1c test 2023-01-24 11:09:31 -05:00
deathbybandaid
c9a91790f3 test 2023-01-24 11:07:13 -05:00
deathbybandaid
b454231cfb test 2023-01-24 11:05:29 -05:00
deathbybandaid
0e84e4ced4 test 2023-01-24 11:04:39 -05:00
deathbybandaid
6d4e59f589 test 2023-01-24 11:00:19 -05:00
deathbybandaid
2560bb42d6 test 2023-01-24 10:59:49 -05:00
deathbybandaid
485aa6a2c8 test 2023-01-24 10:58:54 -05:00
deathbybandaid
0c21b2ce92 testt 2023-01-24 10:52:13 -05:00
deathbybandaid
1772db3ef1 test 2022-07-06 11:49:42 -04:00
deathbybandaid
72b93df3f4 test 2022-07-06 11:47:12 -04:00
deathbybandaid
8792ff352d test 2022-07-05 13:05:53 -04:00
deathbybandaid
9b3ae72030 test 2022-06-30 10:51:38 -04:00
deathbybandaid
cb29505f08 test 2022-06-30 09:43:23 -04:00
deathbybandaid
e4d3cd325f test 2022-06-29 12:58:32 -04:00
deathbybandaid
e84b887e9e test 2022-06-29 12:45:06 -04:00
deathbybandaid
4db23124d2 test 2022-06-29 12:34:32 -04:00
deathbybandaid
fb5c0a88ed test 2022-06-29 12:32:48 -04:00
deathbybandaid
d2205d01c2 test 2022-05-13 08:10:55 -04:00
deathbybandaid
41f4fb18c1 test 2022-05-13 08:09:57 -04:00
deathbybandaid
6b4c5a940a test 2022-05-06 14:04:35 -04:00
deathbybandaid
a839e3e361 test 2022-05-06 14:01:53 -04:00
deathbybandaid
aae8db0bbd test 2022-05-06 14:01:24 -04:00
deathbybandaid
fb68567ba0 test 2022-05-06 14:00:49 -04:00
deathbybandaid
57aa0cfa92 test 2022-05-06 13:52:42 -04:00
deathbybandaid
0a46b1c356 test 2022-05-06 13:47:54 -04:00
deathbybandaid
ec579b51f9 test 2022-05-06 13:45:00 -04:00
deathbybandaid
2b714411c7 test 2022-05-06 13:44:10 -04:00
deathbybandaid
3ff6f08c17 test 2022-05-06 13:42:44 -04:00
deathbybandaid
022472f414 test 2022-05-06 13:19:38 -04:00
deathbybandaid
f72fd42743 test 2022-05-06 13:18:51 -04:00
deathbybandaid
18fc938107 test 2022-05-06 13:17:55 -04:00
deathbybandaid
48f7e75463 test 2022-05-06 10:28:56 -04:00
deathbybandaid
42e626828a test 2022-05-06 10:28:06 -04:00
deathbybandaid
6d45b84243 test 2022-05-06 10:23:19 -04:00
deathbybandaid
e2c6150d4e test 2022-05-06 10:18:59 -04:00
deathbybandaid
c588eb3091 test 2022-05-06 10:17:38 -04:00
deathbybandaid
d6de6b2643 test 2022-05-06 10:12:38 -04:00
deathbybandaid
e63cb5999c test 2022-05-06 10:11:26 -04:00
deathbybandaid
d78289cb56 test 2022-05-06 10:10:09 -04:00
deathbybandaid
d81b4b13d2 test 2022-05-06 10:09:04 -04:00
deathbybandaid
39e6e1ad6b test 2022-05-06 10:04:59 -04:00
deathbybandaid
2a1f2ea051 test 2022-05-06 10:03:29 -04:00
deathbybandaid
1ec53ce1a6 test 2022-05-06 10:03:13 -04:00
deathbybandaid
68263628f6 test 2022-05-06 10:01:13 -04:00
deathbybandaid
968cbc5aea test 2022-05-06 09:50:21 -04:00
deathbybandaid
b745b835fc test 2022-05-06 09:45:19 -04:00
deathbybandaid
356805abaf test 2022-05-06 09:43:51 -04:00
deathbybandaid
a0740c9289 test 2022-05-06 09:37:46 -04:00
deathbybandaid
3710517186 test 2022-05-06 09:34:49 -04:00
deathbybandaid
21504e5ff0 test 2022-05-06 09:32:01 -04:00
deathbybandaid
ccf74a689f test 2022-05-06 09:30:21 -04:00
deathbybandaid
7ae9daa98d test 2022-05-06 09:15:21 -04:00
deathbybandaid
f0617e06fe test 2022-05-06 09:00:36 -04:00
deathbybandaid
184eb097a9 test 2022-05-06 08:58:56 -04:00
deathbybandaid
e6c22da6c5 test 2022-05-05 16:47:34 -04:00
deathbybandaid
e3f6c6e369 test 2022-05-05 16:44:42 -04:00
deathbybandaid
bacf4f22a5 test 2022-05-05 16:37:00 -04:00
deathbybandaid
b39559d841 test 2022-05-05 16:36:50 -04:00
deathbybandaid
b5cbf333bd test 2022-05-05 16:35:39 -04:00
deathbybandaid
65b22a2975 test 2022-05-05 16:11:25 -04:00
deathbybandaid
d624800839 test 2022-05-05 16:06:49 -04:00
deathbybandaid
303db8f650 test 2022-05-05 16:03:34 -04:00
deathbybandaid
4cc65de30b test 2022-05-05 16:02:24 -04:00
deathbybandaid
b9d66c4ed8 test 2022-05-05 15:59:43 -04:00
deathbybandaid
b7b4f1499f test 2022-05-05 15:50:18 -04:00
deathbybandaid
6723d309e2 test 2022-05-05 15:49:23 -04:00
deathbybandaid
ffd7a0ebdb test 2022-05-05 15:48:09 -04:00
deathbybandaid
3f372fe494 test 2022-05-05 15:19:50 -04:00
deathbybandaid
27ca97999c test 2022-05-05 15:18:37 -04:00
deathbybandaid
fbfe20b1da test 2022-05-05 15:13:22 -04:00
deathbybandaid
6102214173 test 2022-05-05 15:03:01 -04:00
deathbybandaid
9986f5188f test 2022-05-05 14:58:54 -04:00
deathbybandaid
18e08f9643 test 2022-05-05 14:58:06 -04:00
deathbybandaid
62b6532aba test 2022-05-05 14:52:55 -04:00
deathbybandaid
a4c1e113e0 test 2022-05-05 14:51:06 -04:00
deathbybandaid
e9df0721a2 test 2022-05-05 14:45:59 -04:00
deathbybandaid
0166950b7a test 2022-05-05 14:30:06 -04:00
deathbybandaid
7e308082e3 test 2022-05-05 14:29:46 -04:00
deathbybandaid
a0edd673a8 test 2022-05-05 14:14:49 -04:00
deathbybandaid
c0f6ada898 test 2022-05-05 14:12:58 -04:00
deathbybandaid
beaab95605 test 2022-05-05 14:09:03 -04:00
deathbybandaid
2b14a88f19 test 2022-05-05 14:07:24 -04:00
deathbybandaid
87a5c85390 test 2022-05-05 14:06:33 -04:00
deathbybandaid
858e137c99 test 2022-05-05 14:03:59 -04:00
deathbybandaid
23d67e238c test 2022-05-05 13:59:30 -04:00
deathbybandaid
6fa90e700b test 2022-05-05 13:53:06 -04:00
deathbybandaid
ef8f50ce82 test 2022-05-05 13:49:47 -04:00
deathbybandaid
06955aac8f test 2022-05-05 13:49:27 -04:00
deathbybandaid
ab17ba2c4e test 2022-05-05 13:46:50 -04:00
deathbybandaid
e4ccb7d4cb test 2022-05-05 13:41:00 -04:00
deathbybandaid
5ee3526ac9 test 2022-05-05 13:38:08 -04:00
deathbybandaid
b53bcac337 test 2022-05-05 13:37:32 -04:00
deathbybandaid
2591322f7c test 2022-05-05 13:33:30 -04:00
deathbybandaid
5102d13617 test 2022-05-05 13:20:09 -04:00
deathbybandaid
8ae5cc68cd test 2022-05-05 13:12:03 -04:00
deathbybandaid
c03cbdc018 test 2022-05-05 13:03:20 -04:00
deathbybandaid
6f62f880ae test 2022-05-05 13:01:37 -04:00
deathbybandaid
e4abb21e8f test 2022-05-05 13:00:02 -04:00
deathbybandaid
1069f87333 test 2022-05-05 12:36:41 -04:00
deathbybandaid
c5bc9d0839 test 2022-05-05 12:30:17 -04:00
deathbybandaid
0eaecc1c7d test 2022-05-05 12:24:09 -04:00
deathbybandaid
a075f17a85 test 2022-05-05 12:13:08 -04:00
deathbybandaid
a85ed1dabb test 2022-05-05 12:12:36 -04:00
deathbybandaid
85b9a8888d test 2022-05-05 12:12:01 -04:00
deathbybandaid
6031d823a2 test 2022-05-05 12:08:03 -04:00
deathbybandaid
a835829c40 test 2022-05-05 12:04:55 -04:00
deathbybandaid
e4dd478aab test 2022-05-05 11:46:45 -04:00
deathbybandaid
cf10e756ad test 2022-05-05 11:40:40 -04:00
deathbybandaid
9cd2990a3b test 2022-02-24 16:18:07 -05:00
deathbybandaid
2d250c80ba test 2022-02-24 16:17:15 -05:00
deathbybandaid
182818be40 test 2022-02-24 16:14:00 -05:00
deathbybandaid
cf46c0e3bb test 2022-02-24 16:12:06 -05:00
deathbybandaid
c809332507 test 2022-02-24 15:16:48 -05:00
deathbybandaid
dcca51f6e4 test 2022-02-24 15:15:52 -05:00
deathbybandaid
a9ea8bccab test 2022-02-24 15:13:51 -05:00
deathbybandaid
cffd77525d test 2022-02-24 15:12:12 -05:00
deathbybandaid
c25d6a32da test 2022-02-24 15:00:31 -05:00
deathbybandaid
c02dfb656d test 2022-02-24 14:50:12 -05:00
deathbybandaid
2ddc6cf0e9 test 2022-02-24 14:47:05 -05:00
deathbybandaid
d18c0b74c6 test 2022-02-24 14:32:40 -05:00
deathbybandaid
d5eb53e599 test 2022-02-24 14:16:36 -05:00
deathbybandaid
7305a75fe7 test 2022-02-24 14:09:44 -05:00
deathbybandaid
52ceb5eb64 test 2022-02-24 14:01:38 -05:00
deathbybandaid
4081921ccc test 2022-02-24 14:00:12 -05:00
deathbybandaid
8cff6fe135 test 2022-02-24 13:50:15 -05:00
deathbybandaid
59de20f694 test 2022-02-24 11:52:56 -05:00
deathbybandaid
637c77bd55 test 2022-02-24 11:52:36 -05:00
deathbybandaid
c2bc6273b3 test 2022-02-24 11:50:19 -05:00
deathbybandaid
4846c8f1d8 test 2022-02-24 11:46:45 -05:00
deathbybandaid
5e050dc012 test 2022-02-24 11:42:17 -05:00
deathbybandaid
181aa32dd3 test 2022-02-24 11:35:34 -05:00
deathbybandaid
1e50902504 test 2022-02-24 11:33:27 -05:00
deathbybandaid
8662bcc864 test 2022-02-24 11:28:52 -05:00
deathbybandaid
418e85da00 test 2022-02-24 11:27:14 -05:00
deathbybandaid
95f0aee41e test 2022-02-24 11:24:34 -05:00
deathbybandaid
9d86ab9fe8 test 2022-02-24 11:24:10 -05:00
deathbybandaid
3ad69fb64f test 2022-02-24 11:17:02 -05:00
deathbybandaid
9be0fb3698 test 2022-02-24 11:12:56 -05:00
deathbybandaid
6c249f5c95 test 2022-02-24 11:09:38 -05:00
deathbybandaid
aef147df2b tewst 2022-02-24 11:07:20 -05:00
deathbybandaid
4f07cdeddc test 2022-02-24 11:06:15 -05:00
deathbybandaid
0ef9dc154b test 2022-02-24 11:02:37 -05:00
deathbybandaid
59041d6a9c test 2022-02-24 11:01:56 -05:00
deathbybandaid
3375af0eb0 test 2022-02-24 11:00:27 -05:00
deathbybandaid
46f6966eda Core Bot 2022-02-24 10:58:48 -05:00
34 changed files with 2741 additions and 0 deletions

57
.gitignore vendored Normal file
View File

@ -0,0 +1,57 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
# C extensions
*.so
# Distribution / packaging
.Python
env/
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
*.egg-info/
.installed.cfg
*.egg
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*,cover
# Translations
*.mo
*.pot
# Django stuff:
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/

23
COPYING Normal file
View File

@ -0,0 +1,23 @@
Eiffel Forum License, version 2
1. Permission is hereby granted to use, copy, modify and/or
distribute this package, provided that:
* copyright notices are retained unchanged,
* any distribution of this package, whether modified or not,
includes this license text.
2. Permission is hereby also granted to distribute binary programs
which depend on this package. If the binary program depends on a
modified version of this package, you are encouraged to publicly
release the modified version of this package.
***********************
THIS PACKAGE IS PROVIDED "AS IS" AND WITHOUT WARRANTY. ANY EXPRESS OR
IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL THE AUTHORS BE LIABLE TO ANY PARTY FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
DAMAGES ARISING IN ANY WAY OUT OF THE USE OF THIS PACKAGE.
***********************

9
MANIFEST.in Normal file
View File

@ -0,0 +1,9 @@
include NEWS
include COPYING
include README.md
recursive-exclude * __pycache__
recursive-exclude * *.py[co]
recursive-include *.py
recursive-include * *.py
recursive-include * * *.py

0
NEWS Normal file
View File

View File

@ -1,2 +1,3 @@
# SpiceBot
A Niche Wrapper around Sopel

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
schedule

39
setup.cfg Normal file
View File

@ -0,0 +1,39 @@
[metadata]
name = SpiceBot
version = 0.1.0
description = A Niche Wrapper around Sopel
author = deathbybandaid
author_email = sam@deathbybandaid.net
url = https://git.deathbybandaid.net/deathbybandaid/SpiceBot.git
license = Eiffel Forum License, version 2
classifiers =
Intended Audience :: Developers
Intended Audience :: System Administrators
License :: Eiffel Forum License (EFL)
License :: OSI Approved :: Eiffel Forum License
Topic :: Communications :: Chat :: Internet Relay Chat
[options]
packages = find:
zip_safe = false
include_package_data = true
install_requires =
sopel>=7.0,<8
[options.entry_points]
sopel.plugins =
sopel_SpiceBot_Core_1 = sopel_SpiceBot_Core_1
sopel_SpiceBot_Core_Prerun = sopel_SpiceBot_Core_Prerun
sopel_SpiceBot_Core_Startup = sopel_SpiceBot_Core_Startup
sopel_SpiceBot_Runtime_Commands = sopel_SpiceBot_Runtime_Commands
sopel_SpiceBot_Runtime_Nickname_Commands = sopel_SpiceBot_Runtime_Nickname_Commands
sopel_SpiceBot_Runtime_Action_Commands = sopel_SpiceBot_Runtime_Action_Commands
sopel_SpiceBot_Runtime_Unmatched_Commands = sopel_SpiceBot_Runtime_Unmatched_Commands
spicemanip = spicemanip
spicebot_command_lower = spicebot_command_lower
spicebot_command_upper = spicebot_command_upper
spicebot_command_echo = spicebot_command_echo
spicebot_command_spongemock = spicebot_command_spongemock
spicebot_command_leetspeak = spicebot_command_leetspeak
spicebot_command_dadjoke = spicebot_command_dadjoke
spicebot_command_devexcuse = spicebot_command_devexcuse

25
setup.py Normal file
View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
from __future__ import print_function
import os
import sys
from setuptools import setup, find_packages
if __name__ == '__main__':
print('Sopel does not correctly load plugins installed with setup.py '
'directly. Please use "pip install .", or add '
'{}/sopel_SpiceBot to core.extra in your config.'
.format(os.path.dirname(os.path.abspath(__file__))),
file=sys.stderr)
with open('README.md') as readme_file:
readme = readme_file.read()
with open('NEWS') as history_file:
history = history_file.read()
setup(
long_description=readme + '\n\n' + history,
long_description_content_type='text/markdown',
)

View File

@ -0,0 +1,65 @@
from .config import Config
from .versions import Versions
from .logger import Logger
from .database import Database
from .comms import Comms
from .events import Events
from .commands import Commands
from .users import Users
class SpiceBotCore_OBJ():
def __init__(self, script_dir):
# Setup sopel bot access for later
self.bot = None
# Set directory for the plugin
self.script_dir = script_dir
# Allow SpiceBot to interact with Sopel Logger
self.logger = Logger()
self.logger.info("SpiceBot Logging Interface Setup Complete.")
# Allow Spicebot to mimic Sopel Config
self.config = Config(script_dir)
self.logger.info("SpiceBot Config Interface Setup Complete.")
# Parse Version Information for the ENV
self.versions = Versions(self.config, self.logger)
self.logger.info("SpiceBot Versions Interface Setup Complete.")
# Mimic Sopel DB, with enhancements
self.database = Database(self.config)
self.logger.info("SpiceBot Database Interface Setup Complete.")
# SpiceBots manual event system
self.events = Events(self.logger)
self.logger.info("SpiceBot Events Interface Setup Complete.")
# Bypass Sopel's method for writing to IRC
self.comms = Comms(self.config)
self.logger.info("SpiceBot Comms Interface Setup Complete.")
# SpiceBots access to Sopel Command listing
self.commands = Commands(self.config, self.bot)
self.logger.info("SpiceBot Commands Interface Setup Complete.")
# SpiceBots access to Sopel Command listing
self.users = Users(self.config, self.bot)
self.logger.info("SpiceBot Users Interface Setup Complete.")
def setup(self, bot):
"""This runs with the plugin setup routine"""
# store an access interface to sopel.bot
self.bot = bot
# Re-initialize the bot config properly during plugin setup routine
self.config.config = bot.config
# OSD shortcut
def osd(self, messages, recipients=None, text_method='PRIVMSG', max_messages=-1):
return self.comms.osd(messages, recipients, text_method, max_messages)

View File

@ -0,0 +1,374 @@
from operator import itemgetter
from difflib import SequenceMatcher
from sopel.trigger import PreTrigger
class Commands():
def __init__(self, config, bot):
self.bot = bot
self.config = config
@property
def prefix_list(self):
return str(self.config.core.prefix).replace("\\", '').split("|")
@property
def botnick(self):
return self.config.core.nick
@property
def multi_split_key(self):
# TODO config
return self.config.spicebot.multi_split_key
return "&&"
@property
def pipe_split_key(self):
# TODO config
return self.config.spicebot.pipe_split_key
return "|"
@property
def query_command_key(self):
# TODO config
return self.config.spicebot.query_command_key
return "?"
@property
def valid_sopel_commands(self):
found = []
for command_dict in self.sopel_commands:
found.append(command_dict["name"])
found.extend(command_dict["aliases"])
return found
@property
def valid_sopel_nickname_commands(self):
found = []
for command_dict in self.sopel_nickname_commands:
found.append(command_dict["name"])
found.extend(command_dict["aliases"])
return found
@property
def valid_sopel_action_commands(self):
found = []
for command_dict in self.sopel_action_commands:
found.append(command_dict["name"])
found.extend(command_dict["aliases"])
return found
@property
def sopel_commands(self):
commands_list = []
for plugin_name, commands in self.bot.rules.get_all_commands():
for command in commands.values():
commands_list.append({
"name": command.name,
"aliases": command.aliases,
"type": "command",
"plugin_name": plugin_name
})
return commands_list
@property
def sopel_nickname_commands(self):
commands_list = []
for plugin_name, commands in self.bot.rules.get_all_nick_commands():
for command in commands.values():
commands_list.append({
"name": command.name,
"aliases": command.aliases,
"type": "nickname_command",
"plugin_name": plugin_name
})
return commands_list
@property
def sopel_action_commands(self):
commands_list = []
for plugin_name, commands in self.bot.rules.get_all_action_commands():
for command in commands.values():
commands_list.append({
"name": command.name,
"aliases": command.aliases,
"type": "action_command",
"plugin_name": plugin_name
})
return commands_list
def dispatch(self, trigger_dict):
pretrigger = self.get_pretrigger(trigger_dict)
self.bot.dispatch(pretrigger)
def get_pretrigger(self, trigger_dict):
if trigger_dict["trigger_type"] == "command":
pretrigger = self.generate_pretrigger_command(trigger_dict)
elif trigger_dict["trigger_type"] == "nickname_command":
pretrigger = self.generate_pretrigger_nickname_command(trigger_dict)
elif trigger_dict["trigger_type"] == "action_command":
pretrigger = self.generate_pretrigger_action_command(trigger_dict)
return pretrigger
def generate_pretrigger_command(self, trigger_dict):
# @time=2022-02-23T15:04:01.447Z :
pretrigger = PreTrigger(
self.bot.nick,
":%s %s %s :%s%s %s" % (trigger_dict["trigger_hostmask"], "PRIVMSG", trigger_dict["trigger_sender"],
trigger_dict["trigger_prefix"], trigger_dict["trigger_command"], trigger_dict["trigger_str"])
)
return pretrigger
def generate_pretrigger_nickname_command(self, trigger_dict):
pretrigger = PreTrigger(
self.bot.nick,
":%s %s %s :%s %s %s" % (trigger_dict["trigger_hostmask"], "PRIVMSG", trigger_dict["trigger_sender"],
trigger_dict["trigger_prefix"], trigger_dict["trigger_command"], trigger_dict["trigger_str"])
)
return pretrigger
def generate_pretrigger_action_command(self, trigger_dict):
pretrigger = PreTrigger(
self.bot.nick,
":%s %s %s :%s%s %s%s %s" % (trigger_dict["trigger_hostmask"], "PRIVMSG", trigger_dict["trigger_sender"],
"\x01", "ACTION", trigger_dict["trigger_command"], trigger_dict["trigger_str"], "\x01")
)
return pretrigger
def get_command_from_trigger(self, trigger):
commstring = trigger.args[1]
if commstring.startswith(tuple(self.config.prefix_list)):
command = commstring[1:].split(" ")[0]
elif commstring.startswith(self.bot.nick):
command = " ".join(commstring.split(" ")[1:]).split(" ")[0]
elif "intent" in trigger.tags and trigger.tags["intent"] == "ACTION":
command = commstring.split(" ")[0]
else:
command = ""
return command
def what_command_type(self, trigger):
full_trigger_str = trigger.args[1]
if full_trigger_str.startswith(tuple(self.config.prefix_list)):
return "command"
elif full_trigger_str.startswith(self.bot.nick):
return "nickname_command"
elif "intent" in trigger.tags and trigger.tags["intent"] == "ACTION":
return "action_command"
else:
return "rule"
def search_similar_commands(self, searchitem):
sim_listitems, sim_num = [], []
command_types = ["command", "nickname_command", "action_command"]
searchlists = [self.valid_sopel_commands, self.valid_sopel_nickname_commands, self.valid_sopel_action_commands]
for command_type, searchlist in zip(command_types, searchlists):
for listitem in searchlist:
similarlevel = SequenceMatcher(None, searchitem.lower(), listitem.lower()).ratio()
if similarlevel >= .75:
if command_type == "command":
prefix = "%s" % self.prefix_list[0]
elif command_type == "nickname_command":
prefix = "%s " % self.botnick
elif command_type == "action_command":
prefix = "%s " % "/me"
sim_listitems.append("%s%s" % (prefix, listitem))
sim_num.append(similarlevel)
if len(sim_listitems) and len(sim_num):
sim_num, sim_listitems = (list(x) for x in zip(*sorted(zip(sim_num, sim_listitems), key=itemgetter(0))))
return sim_listitems
def is_real_command(self, trigger_dict):
if trigger_dict["trigger_type"] == "command":
commands_list = self.valid_sopel_commands
elif trigger_dict["trigger_type"] == "nickname_command":
commands_list = self.valid_sopel_nickname_commands
elif trigger_dict["trigger_type"] == "action_command":
commands_list = self.valid_sopel_action_commands
return (trigger_dict["trigger_command"].lower() in [x.lower() for x in commands_list])
def is_catchall(self, function, command_type):
"""Determine if function could be called with a rule match"""
command_aliases = self.get_command_aliases(function, command_type)
return ('(.*)' in command_aliases)
def get_command_aliases(self, function, command_type):
if command_type == "command":
if hasattr(function, 'commands'):
command_aliases = function.commands
elif command_type == "nickname_command":
if hasattr(function, 'nickname_commands'):
command_aliases = function.nickname_commands
elif command_type == "action_command":
if hasattr(function, 'action_commands'):
command_aliases = function.action_commands
return command_aliases
def get_commands_nosplit(self, trigger):
commands = []
first_full_trigger_str = trigger.args[1]
first_trigger_type = self.what_command_type(trigger)
if first_trigger_type == "command":
first_trigger_prefix = first_full_trigger_str[0]
first_trigger_noprefix = first_full_trigger_str[1:]
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif first_trigger_type == "nickname_command":
first_trigger_prefix = str(first_full_trigger_str.split(" ")[0])
first_trigger_noprefix = " ".join(first_full_trigger_str.split(" ")[1:])
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif first_trigger_type == "action_command":
first_trigger_prefix = "ACTION"
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
else:
first_trigger_type = "rule"
first_trigger_prefix = None
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
commands.append({
"trigger_type": first_trigger_type,
"trigger_prefix": first_trigger_prefix,
"trigger_str": first_trigger_str,
"trigger_command": first_trigger_command,
"trigger_hostmask": trigger.hostmask,
"trigger_sender": trigger.sender,
"trigger_time": str(trigger.time)
})
return commands
def get_commands_split(self, trigger, splitkey=None):
commands = []
# Get split for multiple commands
if splitkey in trigger.args[1]:
triggers = [x.strip() for x in trigger.args[1].split(splitkey)]
else:
triggers = [trigger.args[1]]
first_full_trigger_str = triggers[0]
first_trigger_type = self.what_command_type(trigger)
if first_trigger_type == "command":
first_trigger_prefix = first_full_trigger_str[0]
first_trigger_noprefix = first_full_trigger_str[1:]
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif first_trigger_type == "nickname_command":
first_trigger_prefix = str(first_full_trigger_str.split(" ")[0])
first_trigger_noprefix = " ".join(first_full_trigger_str.split(" ")[1:])
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
elif first_trigger_type == "action_command":
first_trigger_prefix = "ACTION"
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
else:
first_trigger_type = "rule"
first_trigger_prefix = None
first_trigger_noprefix = first_full_trigger_str
first_trigger_command = first_trigger_noprefix.split(" ")[0]
first_trigger_str = " ".join([x.strip() for x in first_trigger_noprefix.split(" ")[1:]])
commands.append({
"trigger_type": first_trigger_type,
"trigger_prefix": first_trigger_prefix,
"trigger_str": first_trigger_str,
"trigger_command": first_trigger_command,
"trigger_hostmask": trigger.hostmask,
"trigger_sender": trigger.sender,
"trigger_time": str(trigger.time)
})
if not len(triggers) > 1:
return commands
for full_trigger_str in triggers[1:]:
if full_trigger_str.startswith(tuple(self.config.prefix_list)):
trigger_type = "command"
trigger_prefix = full_trigger_str[0]
trigger_noprefix = full_trigger_str[1:]
trigger_command = trigger_noprefix.split(" ")[0]
trigger_str = " ".join([x.strip() for x in trigger_noprefix.split(" ")[1:]])
elif full_trigger_str.startswith(self.bot.nick):
trigger_type = "nickname_command"
trigger_prefix = str(full_trigger_str.split(" ")[0])
trigger_noprefix = " ".join(full_trigger_str.split(" ")[1:])
trigger_command = trigger_noprefix.split(" ")[0]
trigger_str = " ".join([x.strip() for x in trigger_noprefix.split(" ")[1:]])
elif full_trigger_str.startswith(tuple(["ACTION", "/me"])):
trigger_type = "action_command"
trigger_prefix = "ACTION"
trigger_noprefix = full_trigger_str
trigger_command = trigger_noprefix.split(" ")[0]
trigger_str = " ".join([x.strip() for x in trigger_noprefix.split(" ")[1:]])
else:
trigger_command = full_trigger_str.split(" ")[0]
trigger_str = " ".join([x.strip() for x in full_trigger_str.split(" ")[1:]])
command_types = ["command", "nickname_command", "action_command"]
# Assume same command type until proven otherwise
assumed_trigger_type = first_trigger_type
assumed_trigger_prefix = first_trigger_prefix
# Still under the assumption that the command is most likely the same type as first command
command_types.remove(assumed_trigger_type)
command_types.insert(0, assumed_trigger_type)
found = []
for command_type in command_types:
if command_type == "command":
commands_list = self.valid_sopel_commands
elif command_type == "nickname_command":
commands_list = self.valid_sopel_nickname_commands
elif command_type == "action_command":
commands_list = self.valid_sopel_action_commands
if trigger_command in commands_list:
found.append(command_type)
if len(found):
trigger_type = found[0]
if trigger_type == "command":
trigger_prefix = self.config.prefix_list[0]
elif trigger_type == "nickname_command":
trigger_prefix = "%s," % self.bot.nick
elif trigger_type == "action_command":
trigger_prefix = "ACTION"
else:
trigger_type = assumed_trigger_type
trigger_prefix = assumed_trigger_prefix
if (not full_trigger_str.isspace() and full_trigger_str not in ['', None]
and not trigger_command.isspace() and trigger_command not in ['', None]):
commands.append({
"trigger_type": trigger_type,
"trigger_prefix": trigger_prefix,
"trigger_str": trigger_str.replace(splitkey, ""),
"trigger_command": trigger_command.replace(splitkey, ""),
"trigger_hostmask": trigger.hostmask,
"trigger_sender": trigger.sender,
"trigger_time": str(trigger.time)
})
return commands

View File

@ -0,0 +1,257 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
from sopel.tools import Identifier
from sopel.irc.utils import safe
import threading
import sys
if sys.version_info.major >= 3:
from collections import abc
if sys.version_info.major >= 3:
unicode = str
class Comms():
def __init__(self, config):
self.config = config
self.backend = None
self.sending = threading.RLock()
self.stack = {}
self.hostmask = None
def ircbackend_initialize(self, bot):
self.backend = bot.backend
self.dispatch = bot.dispatch
def hostmask_set(self, bot):
self.hostmask = bot.hostmask
@property
def botnick(self):
return self.config.core.nick
@property
def bothostmask(self):
return self.hostmask
def write(self, args, text=None):
while not self.backend:
pass
args = [safe(arg) for arg in args]
self.backend.send_command(*args, text=text)
def get_message_recipientgroups(self, recipients, text_method):
"""
Split recipients into groups based on server capabilities.
This defaults to 4
Input can be
* unicode string
* a comma-seperated unicode string
* list
* dict_keys handy for list(bot.channels.keys())
"""
if sys.version_info.major >= 3:
if isinstance(recipients, abc.KeysView):
recipients = [x for x in recipients]
if isinstance(recipients, dict):
recipients = [x for x in recipients]
if not isinstance(recipients, list):
recipients = recipients.split(",")
if not len(recipients):
raise ValueError("Recipients list empty.")
if text_method == 'NOTICE':
maxtargets = 4
elif text_method in ['PRIVMSG', 'ACTION']:
maxtargets = 4
maxtargets = int(maxtargets)
recipientgroups = []
while len(recipients):
recipients_part = ','.join(x for x in recipients[-maxtargets:])
recipientgroups.append(recipients_part)
del recipients[-maxtargets:]
return recipientgroups
def get_available_message_bytes(self, recipientgroups, text_method):
"""
Get total available bytes for sending a message line
Total sendable bytes is 512
* 15 are reserved for basic IRC NOTICE/PRIVMSG and a small buffer.
* The bots hostmask plays a role in this count
Note: if unavailable, we calculate the maximum length of a hostmask
* The recipients we send to also is a factor. Multiple recipients reduces
sendable message length
"""
if text_method == 'ACTION':
text_method_bytes = (len('PRIVMSG')
+ len("\x01ACTION \x01")
)
else:
text_method_bytes = len(text_method)
if self.bothostmask:
hostmaskbytes = len((self.bothostmask).encode('utf-8'))
else:
hostmaskbytes = (len((self.botnick).encode('utf-8')) # Bot's NICKLEN
+ 1 # (! separator)
+ len('~') # (for the optional ~ in user)
+ 9 # max username length
+ 1 # (@ separator)
+ 63 # <hostname> has a maximum length of 63 characters.
)
# find the maximum target group length, and use the max
groupbytes = []
for recipients_part in recipientgroups:
groupbytes.append(len((recipients_part).encode('utf-8')))
max_recipients_bytes = max(groupbytes)
allowedLength = (512
- len(':') - hostmaskbytes
- len(' ') - text_method_bytes - len(' ')
- max_recipients_bytes
- len(' :')
- len('\r\n')
)
return allowedLength
def get_sendable_message_list(self, messages, max_length=400):
"""Get a sendable ``text`` message list.
:param str txt: unicode string of text to send
:param int max_length: maximum length of the message to be sendable
:return: a tuple of two values, the sendable text and its excess text
We're arbitrarily saying that the max is 400 bytes of text when
messages will be split. Otherwise, we'd have to account for the bot's
hostmask, which is hard.
The `max_length` is the max length of text in **bytes**, but we take
care of unicode 2-bytes characters, by working on the unicode string,
then making sure the bytes version is smaller than the max length.
"""
if not isinstance(messages, list):
messages = [messages]
messages_list = ['']
message_padding = 4 * " "
for message in messages:
if isinstance(message, (bytes, bytearray)):
message = str(message, 'UTF-8')
if len((messages_list[-1] + message_padding + message).encode('utf-8')) <= max_length:
if messages_list[-1] == '':
messages_list[-1] = message
else:
messages_list[-1] = messages_list[-1] + message_padding + message
else:
text_list = []
while len(message.encode('utf-8')) > max_length and not message.isspace():
last_space = message.rfind(' ', 0, max_length)
if last_space == -1:
# No last space, just split where it is possible
splitappend = message[:max_length]
if not splitappend.isspace():
text_list.append(splitappend)
message = message[max_length:]
else:
# Split at the last best space found
splitappend = message[:last_space]
if not splitappend.isspace():
text_list.append(splitappend)
message = message[last_space:]
if len(message.encode('utf-8')) and not message.isspace():
text_list.append(message)
messages_list.extend(text_list)
return messages_list
def osd(self, messages, recipients=None, text_method='PRIVMSG', max_messages=-1):
"""Send ``text`` as a PRIVMSG, CTCP ACTION, or NOTICE to ``recipients``.
In the context of a triggered callable, the ``recipient`` defaults to
the channel (or nickname, if a private message) from which the message
was received.
By default, unless specified in the configuration file, there is some
built-in flood protection. Messages displayed over 5 times in 2 minutes
will be displayed as '...'.
The ``recipient`` can be in list format or a comma seperated string,
with the ability to send to multiple recipients simultaneously. The
default recipients that the bot will send to is 4 if the IRC server
doesn't specify a limit for TARGMAX.
Text can be sent to this function in either string or list format.
List format will insert as small buffering space between entries in the
list.
There are 512 bytes available in a single IRC message. This includes
hostmask of the bot as well as around 15 bytes of reserved IRC message
type. This also includes the destinations/recipients of the message.
This will split given strings/lists into a displayable format as close
to the maximum 512 bytes as possible.
If ``max_messages`` is given, the split mesage will display in as many
lines specified by this argument. Specifying ``0`` or a negative number
will display without limitation. By default this is set to ``-1`` when
called directly. When called from the say/msg/reply/notice/action it
will default to ``1``.
"""
if not hasattr(self, 'stack'):
self.stack = {}
text_method = text_method.upper()
if text_method == 'SAY' or text_method not in ['NOTICE', 'ACTION']:
text_method = 'PRIVMSG'
recipientgroups = self.get_message_recipientgroups(recipients, text_method)
available_bytes = self.get_available_message_bytes(recipientgroups, text_method)
messages_list = self.get_sendable_message_list(messages, available_bytes)
if max_messages >= 1:
messages_list = messages_list[:max_messages]
text_method_orig = text_method
for recipientgroup in recipientgroups:
text_method = text_method_orig
recipient_id = Identifier(recipientgroup)
recipient_stack = self.stack.setdefault(recipient_id, {
'messages': [],
'flood_left': 999,
'dots': 0,
})
recipient_stack['dots'] = 0
with self.sending:
for text in messages_list:
if recipient_stack['dots'] <= 3:
if text_method == 'ACTION':
text = '\001ACTION {}\001'.format(text)
self.write(('PRIVMSG', recipientgroup), text)
text_method = 'PRIVMSG'
elif text_method == 'NOTICE':
self.write(('NOTICE', recipientgroup), text)
else:
self.write(('PRIVMSG', recipientgroup), text)
def __getattr__(self, name):
"""
Quick and dirty shortcuts. Will only get called for undefined attributes.
"""
if hasattr(self.bot, name):
return eval("self.bot.%s" % name)

View File

@ -0,0 +1,54 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
import sys
import os
from sopel.cli.run import build_parser, get_configuration
from sopel.config.types import StaticSection, ValidatedAttribute
class Config():
def __init__(self, script_dir):
self.script_dir = script_dir
# Load config
self.config = get_configuration(self.get_opts())
@property
def basename(self):
return os.path.basename(self.config.filename).rsplit('.', 1)[0]
@property
def prefix_list(self):
return str(self.config.core.prefix).replace("\\", '').split("|")
def define_section(self, name, cls_, validate=True):
return self.config.define_section(name, cls_, validate)
def get_opts(self):
parser = build_parser()
if not len(sys.argv[1:]):
argv = ['legacy']
else:
argv = sys.argv[1:]
return parser.parse_args(argv)
def __getattr__(self, name):
''' will only get called for undefined attributes '''
"""We will try to find a core value, or return None"""
if hasattr(self.config, name):
return eval("self.config." + name)
else:
return None
class SpiceBot_Conf(StaticSection):
multi_split_key = ValidatedAttribute('multi_split_key', default="&&")
pipe_split_key = ValidatedAttribute('pipe_split_key', default="|")
query_command_key = ValidatedAttribute('query_command_key', default="?")

View File

@ -0,0 +1,294 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
import json
from sopel.tools import Identifier
from sopel.db import SopelDB, NickValues, ChannelValues, PluginValues
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.exc import SQLAlchemyError
BASE = declarative_base()
class SpiceDB(object):
# NICK FUNCTIONS
def adjust_nick_value(self, nick, key, value):
"""Sets the value for a given key to be associated with the nick."""
nick = Identifier(nick)
value = json.dumps(value, ensure_ascii=False)
nick_id = self.get_nick_id(nick)
session = self.ssession()
try:
result = session.query(NickValues) \
.filter(NickValues.nick_id == nick_id) \
.filter(NickValues.key == key) \
.one_or_none()
# NickValue exists, update
if result:
result.value = float(result.value) + float(value)
session.commit()
# DNE - Insert
else:
new_nickvalue = NickValues(nick_id=nick_id, key=key, value=float(value))
session.add(new_nickvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
def adjust_nick_list(self, nick, key, entries, adjustmentdirection):
"""Sets the value for a given key to be associated with the nick."""
nick = Identifier(nick)
if not isinstance(entries, list):
entries = [entries]
entries = json.dumps(entries, ensure_ascii=False)
nick_id = self.get_nick_id(nick)
session = self.ssession()
try:
result = session.query(NickValues) \
.filter(NickValues.nick_id == nick_id) \
.filter(NickValues.key == key) \
.one_or_none()
# NickValue exists, update
if result:
if adjustmentdirection == 'add':
for entry in entries:
if entry not in result.value:
result.value.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in result.value:
result.value.remove(entry)
session.commit()
# DNE - Insert
else:
values = []
if adjustmentdirection == 'add':
for entry in entries:
if entry not in values:
values.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in values:
values.remove(entry)
new_nickvalue = NickValues(nick_id=nick_id, key=key, value=values)
session.add(new_nickvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
# CHANNEL FUNCTIONS
def adjust_channel_value(self, channel, key, value):
"""Sets the value for a given key to be associated with the channel."""
channel = Identifier(channel).lower()
value = json.dumps(value, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(ChannelValues) \
.filter(ChannelValues.channel == channel)\
.filter(ChannelValues.key == key) \
.one_or_none()
# ChannelValue exists, update
if result:
result.value = float(result.value) + float(value)
session.commit()
# DNE - Insert
else:
new_channelvalue = ChannelValues(channel=channel, key=key, value=float(value))
session.add(new_channelvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
def adjust_channel_list(self, channel, key, entries, adjustmentdirection):
"""Sets the value for a given key to be associated with the channel."""
channel = Identifier(channel).lower()
if not isinstance(entries, list):
entries = [entries]
entries = json.dumps(entries, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(ChannelValues) \
.filter(ChannelValues.channel == channel)\
.filter(ChannelValues.key == key) \
.one_or_none()
# ChannelValue exists, update
if result:
if adjustmentdirection == 'add':
for entry in entries:
if entry not in result.value:
result.value.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in result.value:
result.value.remove(entry)
session.commit()
# DNE - Insert
else:
values = []
if adjustmentdirection == 'add':
for entry in entries:
if entry not in values:
values.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in values:
values.remove(entry)
new_channelvalue = ChannelValues(channel=channel, key=key, value=values)
session.add(new_channelvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
# PLUGIN FUNCTIONS
def adjust_plugin_value(self, plugin, key, value):
"""Sets the value for a given key to be associated with the plugin."""
plugin = plugin.lower()
value = json.dumps(value, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(PluginValues) \
.filter(PluginValues.plugin == plugin)\
.filter(PluginValues.key == key) \
.one_or_none()
# PluginValue exists, update
if result:
result.value = float(result.value) + float(value)
session.commit()
# DNE - Insert
else:
new_pluginvalue = PluginValues(plugin=plugin, key=key, value=float(value))
session.add(new_pluginvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
def adjust_plugin_list(self, plugin, key, entries, adjustmentdirection):
"""Sets the value for a given key to be associated with the plugin."""
plugin = plugin.lower()
if not isinstance(entries, list):
entries = [entries]
entries = json.dumps(entries, ensure_ascii=False)
session = self.ssession()
try:
result = session.query(PluginValues) \
.filter(PluginValues.plugin == plugin)\
.filter(PluginValues.key == key) \
.one_or_none()
# PluginValue exists, update
if result:
if adjustmentdirection == 'add':
for entry in entries:
if entry not in result.value:
result.value.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in result.value:
result.value.remove(entry)
session.commit()
# DNE - Insert
else:
values = []
if adjustmentdirection == 'add':
for entry in entries:
if entry not in values:
values.append(entry)
elif adjustmentdirection == 'del':
for entry in entries:
while entry in values:
values.remove(entry)
new_pluginvalue = PluginValues(plugin=plugin, key=key, value=values)
session.add(new_pluginvalue)
session.commit()
except SQLAlchemyError:
session.rollback()
raise
finally:
session.close()
class Database():
def __init__(self, config):
SopelDB.adjust_nick_value = SpiceDB.adjust_nick_value
SopelDB.adjust_nick_list = SpiceDB.adjust_nick_list
SopelDB.adjust_channel_value = SpiceDB.adjust_channel_value
SopelDB.adjust_channel_list = SpiceDB.adjust_channel_list
SopelDB.adjust_plugin_value = SpiceDB.adjust_plugin_value
SopelDB.adjust_plugin_list = SpiceDB.adjust_plugin_list
self.db = SopelDB(config)
BASE.metadata.create_all(self.db.engine)
@property
def botnick(self):
return self.config.core.nick
def __getattr__(self, name):
''' will only get called for undefined attributes '''
if hasattr(self.db, name):
return eval("self.db." + name)
else:
return None
"""Nick"""
def adjust_nick_value(self, nick, key, value):
return self.db.adjust_nick_value(nick, key, value)
def adjust_nick_list(self, nick, key, entries, adjustmentdirection):
return self.db.adjust_nick_list(nick, key, entries, adjustmentdirection)
"""Bot"""
def get_bot_value(self, key):
return self.db.get_nick_value(self.botnick, key)
def set_bot_value(self, key, value):
return self.db.set_nick_value(self.botnick, key, value)
def delete_bot_value(self, key):
return self.db.delete_nick_value(self.botnick, key)
def adjust_bot_value(self, key, value):
return self.db.adjust_nick_value(self.botnick, key, value)
def adjust_bot_list(self, key, entries, adjustmentdirection):
return self.db.adjust_nick_list(self.botnick, key, entries, adjustmentdirection)
"""Channels"""
def adjust_channel_value(self, channel, key, value):
return self.db.adjust_channel_value(channel, key, value)
def adjust_channel_list(self, nick, key, entries, adjustmentdirection):
return self.db.adjust_channel_list(nick, key, entries, adjustmentdirection)
"""Plugins"""
def adjust_plugin_value(self, plugin, key, value):
return self.db.adjust_plugin_value(plugin, key, value)
def adjust_plugin_list(self, plugin, key, entries, adjustmentdirection):
return self.db.adjust_plugin_list(plugin, key, entries, adjustmentdirection)

View File

@ -0,0 +1,147 @@
# coding=utf8
from __future__ import unicode_literals, absolute_import, division, print_function
"""
This is the SpiceBot events system.
We utilize the Sopel code for event numbers and
self-trigger the bot into performing actions
"""
from sopel.trigger import PreTrigger
import functools
import threading
import time
class Events(object):
"""A dynamic listing of all the notable Bot numeric events.
Events will be assigned a 4-digit number above 1000.
This allows you to do, ``@plugin.event(events.BOT_WELCOME)````
Triggers handled by this plugin will be processed immediately.
Others will be placed into a queue.
Triggers will be logged by ID and content
"""
def __init__(self, logger):
self.logger = logger
self.lock = threading.Lock()
# This is a defined IRC event
self.RPL_WELCOME = '001'
self.RPL_MYINFO = '004'
self.RPL_ISUPPORT = '005'
self.RPL_WHOREPLY = '352'
self.RPL_NAMREPLY = '353'
# this is an unrealircd event
self.RPL_WHOISREGNICK = '307'
# These Are Speicebot generated events
self.BOT_UPTIME = time.time()
self.BOT_WELCOME = '1001'
self.BOT_READY = '1002'
self.BOT_CONNECTED = '1003'
self.BOT_LOADED = '1004'
self.BOT_RECONNECTED = '1005'
self.defaultevents = [self.BOT_WELCOME, self.BOT_READY, self.BOT_CONNECTED, self.BOT_LOADED, self.BOT_RECONNECTED]
self.dict = {
"assigned_IDs": {1000: "BOT_UPTIME", 1001: "BOT_WELCOME", 1002: "BOT_READY", 1003: "BOT_CONNECTED", 1004: "BOT_LOADED", 1005: "BOT_RECONNECTED"},
"triggers_recieved": {},
"trigger_queue": [],
"startup_required": [self.BOT_WELCOME, self.BOT_READY, self.BOT_CONNECTED],
"RPL_WELCOME_Count": 0
}
def __getattr__(self, name):
''' will only get called for undefined attributes '''
self.lock.acquire()
eventnumber = max(list(self.dict["assigned_IDs"].keys())) + 1
self.dict["assigned_IDs"][eventnumber] = str(name)
setattr(self, name, str(eventnumber))
self.lock.release()
return str(eventnumber)
def trigger(self, bot, number, message="SpiceBot_Events"):
pretriggerdict = {"number": str(number), "message": message}
if number in self.defaultevents:
self.dispatch(bot, pretriggerdict)
else:
self.dict["trigger_queue"].append(pretriggerdict)
def dispatch(self, bot, pretriggerdict):
number = pretriggerdict["number"]
message = pretriggerdict["message"]
pretrigger = PreTrigger(
bot.nick,
":SpiceBot_Events %s %s :%s" % (number, bot.nick, message)
)
bot.dispatch(pretrigger)
self.recieved({"number": number, "message": message})
def recieved(self, trigger):
self.lock.acquire()
if isinstance(trigger, dict):
eventnumber = str(trigger["number"])
message = str(trigger["message"])
else:
eventnumber = str(trigger.event)
message = trigger.args[1]
self.logger.info('SpiceBot_Events: %s %s' % (eventnumber, message))
if eventnumber not in self.dict["triggers_recieved"]:
self.dict["triggers_recieved"][eventnumber] = []
self.dict["triggers_recieved"][eventnumber].append(message)
self.lock.release()
def check(self, checklist):
if not isinstance(checklist, list):
checklist = [str(checklist)]
for number in checklist:
if str(number) not in list(self.dict["triggers_recieved"].keys()):
return False
return True
def startup_add(self, startlist):
self.lock.acquire()
if not isinstance(startlist, list):
startlist = [str(startlist)]
for eventitem in startlist:
if eventitem not in self.dict["startup_required"]:
self.dict["startup_required"].append(eventitem)
self.lock.release()
def startup_check(self):
for number in self.dict["startup_required"]:
if str(number) not in list(self.dict["triggers_recieved"].keys()):
return False
return True
def startup_debug(self):
not_done = []
for number in self.dict["startup_required"]:
if str(number) not in list(self.dict["triggers_recieved"].keys()):
not_done.append(int(number))
reference_not_done = []
for item in not_done:
reference_not_done.append(str(self.dict["assigned_IDs"][item]))
return reference_not_done
def check_ready(self, checklist):
def actual_decorator(function):
@functools.wraps(function)
def _nop(*args, **kwargs):
while not self.check(checklist):
pass
return function(*args, **kwargs)
return _nop
return actual_decorator
def startup_check_ready(self):
def actual_decorator(function):
@functools.wraps(function)
def _nop(*args, **kwargs):
while not self.startup_check():
pass
return function(*args, **kwargs)
return _nop
return actual_decorator

View File

@ -0,0 +1,18 @@
from sopel import tools
class Logger():
def __init__(self):
self.logger = tools.get_logger('SpiceBot')
def __getattr__(self, name):
"""
Quick and dirty shortcuts. Will only get called for undefined attributes.
"""
if hasattr(self.logger, name):
return eval("self.logger.%s" % name)
elif hasattr(self.logger, name.lower()):
return eval("self.logger.%s" % name.lower())

View File

@ -0,0 +1,195 @@
from sopel import plugin
class Users():
def __init__(self, config, bot):
self.bot = bot
self.config = config
self.bot_priv_dict = {
"OWNER": 10,
"ADMIN": 5
}
self.channel_privilege_dict = {
"OPER": plugin.OPER,
"OWNER": plugin.OWNER,
"ADMIN": plugin.ADMIN,
"OP": plugin.OP,
"HALFOP": plugin.HALFOP,
"VOICE": plugin.VOICE,
}
"""Bot Priviledges"""
def get_bot_privilege_name(self, privilege):
priv_list = []
for priv_name in list(self.bot_priv_dict.keys()):
if self.bot_priv_dict[priv_name] >= privilege:
priv_list.append(priv_name)
return priv_list[-1]
def has_bot_privilege(self, nick, privilege):
if not isinstance(privilege, int):
if privilege.upper() in list(self.bot_priv_dict.keys()):
privilege = self.bot_priv_dict[privilege.upper()]
else:
privilege = 0
return self.get_nick_bot_privilege_level(nick) >= privilege
def get_nick_bot_privilege_level(self, nick):
if self.is_bot_owner(nick):
return self.bot_priv_dict["OWNER"]
elif self.is_bot_admin(nick):
return self.bot_priv_dict["ADMIN"]
return 0
"""Bot Owner"""
def list_bot_owner(self):
list_owner = self.bot.config.owner
if not isinstance(list_owner, list):
list_owner = [list_owner]
return list_owner
def is_bot_owner(self, nick):
if nick in self.list_bot_owner():
return True
return False
"""Bot Admins"""
def list_bot_admin(self):
list_admins = self.bot.config.admins
if not isinstance(list_admins, list):
list_admins = [list_admins]
return list_admins
def is_bot_admin(self, user):
if user in self.list_bot_admin():
return True
return False
"""Channel Users"""
def list_channel_users(self, channelstr):
channel = self.bot.channels[channelstr]
return list(channel.users.items())
def is_channel_user(self, channelstr, user):
if user in self.list_channel_users(channelstr):
return True
return False
"""Channel Privileges"""
def get_channel_privilege_name(self, privilege):
priv_list = []
for priv_name in list(self.channel_privilege_dict.keys()):
if self.channel_privilege_dict[priv_name] >= privilege:
priv_list.append(priv_name)
return priv_list[-1]
def has_channel_privilege(self, nick, channelstr, privilege):
if not isinstance(privilege, int):
if privilege.upper() == "OPER":
privilege = plugin.OPER
elif privilege.upper() == "OWNER":
privilege = plugin.OWNER
elif privilege == "ADMIN":
privilege = plugin.ADMIN
elif privilege.upper() == "OP":
privilege = plugin.OP
elif privilege.upper() in ["HALFOP", "HOP"]:
privilege = plugin.HALFOP
elif privilege.upper() == "VOICE":
privilege = plugin.VOICE
else:
return False
channel = self.bot.channels[channelstr]
if nick in [user for nick, user in channel.users if channel.has_privileges(nick, privilege)]:
return True
return False
"""Channel OPER"""
def list_channel_users_oper(self, channelstr):
channel = self.bot.channels[channelstr]
return [user for nick, user in channel.users if channel.is_oper(nick, plugin.OPER)]
def is_channel_oper(self, channelstr, user):
if user in self.list_channel_users_oper(channelstr):
return True
return False
"""Channel OWNER Users"""
def list_channel_users_owner(self, channelstr):
channel = self.bot.channels[channelstr]
return [user for nick, user in channel.users if channel.is_owner(nick, plugin.OWNER)]
def is_channel_owner(self, channelstr, user):
if user in self.list_channel_users_owner(channelstr):
return True
return False
"""Channel ADMIN Users"""
def list_channel_users_admin(self, channelstr):
channel = self.bot.channels[channelstr]
return [user for nick, user in channel.users if channel.is_admin(nick, plugin.ADMIN)]
def is_channel_admin(self, channelstr, user):
if user in self.list_channel_users_admin(channelstr):
return True
return False
"""Channel OP Users"""
def list_channel_users_op(self, channelstr):
channel = self.bot.channels[channelstr]
return [user for nick, user in channel.users if channel.is_op(nick, plugin.OP)]
def is_channel_op(self, channelstr, user):
if user in self.list_channel_users_op(channelstr):
return True
return False
"""Channel HALFOP Users"""
def list_channel_users_halfop(self, channelstr):
channel = self.bot.channels[channelstr]
return [user for nick, user in channel.users if channel.is_halfop(nick, plugin.HALFOP)]
def is_channel_halfop(self, channelstr, user):
if user in self.list_channel_users_halfop(channelstr):
return True
return False
"""Channel VOICE Users"""
def list_channel_users_voice(self, channelstr):
channel = self.bot.channels[channelstr]
return [user for nick, user in channel.users if channel.is_voice(nick, plugin.VOICE)]
def is_channel_voice(self, channelstr, user):
if user in self.list_channel_users_voice(channelstr):
return True
return False
"""Nick Channels"""
def list_nick_channels(self, nick):
user_channels = []
for channel in self.bot.channels:
if nick in list(channel.users.items()):
user_channels.append(channel)
return user_channels

View File

@ -0,0 +1,111 @@
import os
import sys
import platform
import pathlib
import json
import re
class Versions():
def __init__(self, config, logger):
self.config = config
self.logger = logger
self.dict = {}
self.register_spicebot()
self.register_env()
def get_core_versions(self):
returndict = {}
for item in list(self.dict.keys()):
if self.dict[item]["type"] == "SpiceBot":
returndict[item] = self.dict[item].copy()
return returndict
def register_version(self, item_name, item_version, item_type):
"""
Register a version item.
"""
self.logger.debug("Registering %s item: %s %s" % (item_type, item_name, item_version))
self.dict[item_name] = {
"name": item_name,
"version": item_version,
"type": item_type
}
def register_spicebot(self):
"""
Register core version items.
"""
version_file = pathlib.Path(self.config.script_dir).joinpath("version.json")
with open(version_file, 'r') as jsonversion:
versions = json.load(jsonversion)
for key in list(versions.keys()):
self.register_version(key, versions[key], "SpiceBot")
def is_docker(self):
path = "/proc/self/cgroup"
if not os.path.isfile(path):
return False
with open(path) as f:
for line in f:
if re.match("\d+:[\w=]+:/docker(-[ce]e)?/\w+", line):
return True
return False
def is_virtualenv(self):
# return True if started from within a virtualenv or venv
base_prefix = getattr(sys, "base_prefix", None)
# real_prefix will return None if not in a virtualenv enviroment or the default python path
real_prefix = getattr(sys, "real_prefix", None) or sys.prefix
return base_prefix != real_prefix
def register_env(self):
"""
Register env version items.
"""
self.register_version("Python", sys.version, "env")
if sys.version_info.major == 2 or sys.version_info < (3, 7):
self.logger.error('Error: SpiceBot requires python 3.7+. Do NOT expect support for older versions of python.')
opersystem = platform.system()
self.register_version("Operating System", opersystem, "env")
system_alias = platform.release()
self.register_version("OS Release", system_alias, "env")
if opersystem in ["Linux", "Darwin"]:
# Linux/Mac
if os.getuid() == 0 or os.geteuid() == 0:
self.logger.warning('Do not run SpiceBot with root privileges.')
elif opersystem in ["Windows"]:
# Windows
if os.environ.get("USERNAME") == "Administrator":
self.logger.warning('Do not run SpiceBot as Administrator.')
else:
# ['Java']
if not len(opersystem):
os_string = "."
else:
os_string = ": %s" % opersystem
self.logger.warning("Uncommon Operating System, use at your own risk%s" % os_string)
cpu_type = platform.machine()
self.register_version("CPU Type", cpu_type, "env")
isvirtualenv = self.is_virtualenv()
self.register_version("Virtualenv", isvirtualenv, "env")
isdocker = self.is_docker()
self.register_version("Docker", isdocker, "env")

View File

@ -0,0 +1,27 @@
# coding=utf8
"""SpiceBot
A Niche Wrapper around Sopel
"""
from __future__ import unicode_literals, absolute_import, division, print_function
import os
import pathlib
from .SBCore import SpiceBotCore_OBJ
from .SBCore.config import SpiceBot_Conf
SCRIPT_DIR = pathlib.Path(os.path.dirname(os.path.abspath(__file__)))
sb = SpiceBotCore_OBJ(SCRIPT_DIR)
def configure(config):
config.define_section("spicebot", SpiceBot_Conf, validate=False)
config.spicebot.configure_setting('multi_split_key', 'key to split multi-commands')
config.spicebot.configure_setting('pipe_split_key', 'key to split multi-commands')
config.spicebot.configure_setting('query_command_key', 'key to split multi-commands')
def setup(bot):
print(bot.config.spicebot)
sb.setup(bot)

View File

@ -0,0 +1,3 @@
{
"SpiceBot": "v0.9.3-beta"
}

View File

@ -0,0 +1,287 @@
import functools
import re
from sopel.trigger import Trigger
from sopel_SpiceBot_Core_1 import sb
def prerun(metadata={}):
"""This decorator is the hub of handling for all SpiceBot Commands"""
def actual_decorator(function):
@functools.wraps(function)
def internal_prerun(bot, trigger, *args, **kwargs):
runfunc = True
comrun = ComRun(bot, trigger, function, metadata)
# Check that nick has the correct bot or channel privileges
# to run the command.
if not comrun.is_nick_privileged():
comrun.osd(comrun.required_privileges_text())
return
# Since there was more than one command,
# we are going to redispatch commands
# This will give sopel the appearance of recieving individual commands
if comrun.is_multi_command:
if not comrun.is_catchall:
sb.commands.dispatch(comrun.command)
elif comrun.is_catchall and comrun.has_command_been_sanitized:
sb.commands.dispatch(comrun.command)
for trigger_dict in comrun.commands[1:]:
if not comrun.is_catchall:
sb.commands.dispatch(trigger_dict)
elif comrun.is_catchall and comrun.has_command_been_sanitized:
sb.commands.dispatch(trigger_dict)
return
# If the original trigger is not the same after splits
# so we will now redispatch to help get the correct function passed
if comrun.has_command_been_sanitized:
if comrun.is_pipe_command:
trigger_dict = rebuild_pipes(comrun.commands)
else:
trigger_dict = comrun.command
sb.commands.dispatch(trigger_dict)
return
# Block the catch-all from running a command twice
if comrun.is_catchall and comrun.is_real_command:
return
# The command is not valid
if comrun.is_catchall and not comrun.is_real_command:
valid_command_results = sb.commands.search_similar_commands(comrun.command["trigger_command"])
# Handling for invalid nickname commands
if comrun.command_type == "nickname_command":
if not len(valid_command_results):
comrun.osd("I'm not sure what you are asking me to do! What do you mean by \"%s%s\" ?" %
(comrun.command["trigger_command"], " %s" % comrun.command["trigger_str"] if comrun.command["trigger_str"] != "" else ""))
else:
comrun.osd("%s does not appear to be a valid command. Possible Matches: %s" % (comrun.command["trigger_command"], valid_command_results))
# normal prefixed command handling
elif comrun.command_type == "command":
# warn that a command is not valid
if not len(valid_command_results):
comrun.osd("%s does not appear to be a valid command." % comrun.command["trigger_command"])
else:
comrun.osd("%s does not appear to be a valid command. Possible Matches: %s" % (comrun.command["trigger_command"], valid_command_results))
# Don't be annoying when /me is conversation and not a command
# if comrun.command_type == "action_command":
return
# Run function
if runfunc:
# At this point, we update the re.match for trigger
trigger = rebuild_trigger(comrun, function)
function(bot, trigger, comrun, *args, **kwargs)
# If not piping the replies into pipe, let'sprint to IRC now
if not comrun.is_pipe_command:
for message_item in comrun._say:
comrun.osd(message_item)
# Pipe text back to bot for next piped command
else:
for say_message in comrun._say:
trigger_dict = rebuild_pipes(comrun.commands, say_message)
sb.commands.dispatch(trigger_dict)
return
return internal_prerun
return actual_decorator
def rebuild_trigger(comrun, function):
fakematch = re.match('.*', comrun.command["trigger_str"])
pretrigger = sb.commands.get_pretrigger(comrun.command)
trigger = Trigger(comrun.bot.config, pretrigger, fakematch)
return trigger
def rebuild_pipes(commands, trigger_str_add=None):
if not trigger_str_add:
repipe_trigger_dict = commands[0]
next_index_value = 1
else:
repipe_trigger_dict = commands[1]
repipe_trigger_dict["trigger_str"] += " %s" % trigger_str_add
next_index_value = 2
try:
for trigger_dict in commands[next_index_value:]:
if trigger_dict["trigger_type"] == "command":
repipe_trigger_dict["trigger_str"] += " %s %s%s %s" % (sb.commands.pipe_split_key,
trigger_dict["trigger_prefix"],
trigger_dict["trigger_command"],
trigger_dict["trigger_str"])
elif trigger_dict["trigger_type"] == "nickname_command":
repipe_trigger_dict["trigger_str"] += " %s %s %s %s" % (sb.commands.pipe_split_key,
trigger_dict["trigger_prefix"],
trigger_dict["trigger_command"],
trigger_dict["trigger_str"])
elif trigger_dict["trigger_type"] == "action_command":
repipe_trigger_dict["trigger_str"] += " %s %s %s %s" % (sb.commands.pipe_split_key,
"/me",
trigger_dict["trigger_command"],
trigger_dict["trigger_str"])
except IndexError:
repipe_trigger_dict = repipe_trigger_dict
return repipe_trigger_dict
class ComRun():
def __init__(self, bot, trigger, function, metadata):
self.bot = bot
self.orig_trigger = trigger
self.trigger = trigger
self.function = function
self.metadata = metadata
self._say = []
@property
def instigator(self):
return self.trigger.nick
@property
def channel(self):
return self.trigger.sender
@property
def is_in_channel(self):
return self.channel.startswith("#")
def say(self, message):
self._say.append(message)
def osd(self, messages, recipients=None, text_method='PRIVMSG', max_messages=-1):
if not recipients:
recipients = self.trigger.sender
sb.osd(messages, recipients, text_method, max_messages=-1)
# @property
def required_privileges(self):
bot_level = 0
channel_level = 0
if "required_privileges" in list(self.metadata.keys()):
if "bot_level" in list(self.metadata["required_privileges"].keys()):
bot_level = self.metadata["required_privileges"]["bot_level"]
if "channel_level" in list(self.metadata["required_privileges"].keys()):
channel_level = self.metadata["required_privileges"]["channel_level"]
return {
"bot": bot_level,
"channel": channel_level
}
# @property()
def required_privileges_text(self):
ret_text = "This command requries privileges of/above the following"
if self.required_privileges["bot"]:
ret_text += " Bot: %s" % sb.users.get_bot_privilege_name(self.required_privileges["bot"])
if self.required_privileges["channel"] and self.is_in_channel:
ret_text += " or "
ret_text += "Channel: %s" % sb.users.get_channel_privilege_name(self.required_privileges["channel"])
ret_text += "."
return ret_text
# @property
def is_nick_privileged(self):
has_bot_priv = True
has_channel_priv = True
priv_dict = self.required_privileges()
has_bot_priv = sb.users.has_bot_privilege(self.trigger.nick, priv_dict["bot"])
if self.is_in_channel:
has_channel_priv = sb.users.has_channel_privilege(self.trigger.nick, self.trigger.sender, priv_dict["channel"])
if has_bot_priv or has_channel_priv:
return True
return False
@property
def author(self):
author = "deathbybandaid"
if "author" in list(self.metadata.keys()):
author = self.metadata["author"]
return author
@property
def is_real_command(self):
return sb.commands.is_real_command(self.command)
@property
def command_type(self):
return sb.commands.what_command_type(self.trigger)
@property
def is_catchall(self):
return sb.commands.is_catchall(self.function, self.command_type)
@property
def is_multi_command(self):
if sb.commands.multi_split_key in self.orig_trigger.args[1]:
return True
return False
@property
def is_pipe_command(self):
if sb.commands.pipe_split_key in self.orig_trigger.args[1]:
return True
return False
@property
def multi_split_count(self):
if self.is_multi_command:
return len([x.strip() for x in self.orig_trigger.args[1].split(sb.commands.multi_split_key)])
return "N/A"
@property
def pipe_split_count(self):
if self.is_pipe_command:
return len([x.strip() for x in self.orig_trigger.args[1].split(sb.commands.pipe_split_key)])
return "N/A"
@property
def commands(self):
if self.is_multi_command:
return sb.commands.get_commands_split(self.orig_trigger, sb.commands.multi_split_key)
elif self.is_pipe_command:
return sb.commands.get_commands_split(self.orig_trigger, sb.commands.pipe_split_key)
else:
return sb.commands.get_commands_nosplit(self.orig_trigger)
@property
def command(self):
return self.commands[0]
@property
def has_command_been_sanitized(self):
trigger_command = sb.commands.get_command_from_trigger(self.orig_trigger)
if trigger_command != self.command["trigger_command"]:
return True
return False
@property
def re_match(self):
return self.trigger.match

View File

@ -0,0 +1,81 @@
# coding=utf8
"""SpiceBot
A Niche Wrapper around Sopel
"""
from __future__ import unicode_literals, absolute_import, division, print_function
from threading import Thread
from sopel import plugin
from sopel_SpiceBot_Core_1 import sb
"""
Events
"""
@plugin.event("001")
@plugin.rule('.*')
def welcome_setup_start(bot, trigger):
sb.comms.ircbackend_initialize(bot)
@plugin.event(sb.events.BOT_CONNECTED)
@plugin.rule('.*')
def bot_events_start_set_hostmask(bot, trigger):
sb.comms.hostmask_set(bot)
# @plugin.event(sb.events.BOT_WELCOME, sb.events.BOT_READY, sb.events.BOT_CONNECTED, sb.events.BOT_LOADED)
# @plugin.rule('.*')
# def bot_events_complete(bot, trigger):
# """This is here simply to log to stderr that this was recieved."""
# sb.logger.info('Recieved SpiceBot_Events: %s' % trigger.args[1])
@plugin.event(sb.events.RPL_WELCOME)
@plugin.rule('.*')
def bot_events_connected(bot, trigger):
# Handling for connection count
sb.events.dict["RPL_WELCOME_Count"] += 1
if sb.events.dict["RPL_WELCOME_Count"] > 1:
sb.events.trigger(bot, sb.events.BOT_RECONNECTED, "Bot ReConnected to IRC")
else:
sb.events.trigger(bot, sb.events.BOT_WELCOME, "Welcome to the SpiceBot Events System")
"""For items tossed in a queue, this will trigger them accordingly"""
Thread(target=events_thread, args=(bot,)).start()
def events_thread(bot):
while True:
if len(sb.events.dict["trigger_queue"]):
pretriggerdict = sb.events.dict["trigger_queue"][0]
sb.events.dispatch(bot, pretriggerdict)
try:
del sb.events.dict["trigger_queue"][0]
except IndexError:
pass
@plugin.event(sb.events.BOT_WELCOME)
@plugin.rule('.*')
def bot_events_start(bot, trigger):
"""This stage is redundant, but shows the system is working."""
sb.events.trigger(bot, sb.events.BOT_READY, "Ready To Process plugin setup procedures")
"""Here, we wait until we are in at least one channel"""
while not len(list(bot.channels.keys())) > 0:
pass
sb.events.trigger(bot, sb.events.BOT_CONNECTED, "Bot Connected to IRC")
@sb.events.startup_check_ready()
@plugin.event(sb.events.BOT_READY)
@plugin.rule('.*')
def bot_events_startup_complete(bot, trigger):
"""All events registered as required for startup have completed"""
sb.events.trigger(bot, sb.events.BOT_LOADED, "All registered plugins setup procedures have completed")

View File

@ -0,0 +1,13 @@
from sopel import plugin
from sopel_SpiceBot_Core_1 import sb
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.action_command('test')
def sb_test_commands(bot, trigger, comrun):
bot.say("%s" % trigger.raw)

View File

@ -0,0 +1,32 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('test', "testnew")
def commands_test(bot, trigger, comrun):
bot.say("%s" % trigger.raw)
@prerun()
@plugin.command('testa')
def commands_test_a(bot, trigger, comrun):
bot.say("test a")
bot.say("%s" % trigger.raw)
@prerun()
@plugin.command('testb')
def commands_test_b(bot, trigger, comrun):
bot.say("test b")
bot.say("%s" % trigger.raw)
@plugin.command('testc')
def commands_test_c(bot, trigger, comrun):
bot.say("test c")
bot.say("test c: %s" % trigger.raw)

View File

@ -0,0 +1,44 @@
from sopel import plugin
from sopel_SpiceBot_Core_1 import sb
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.nickname_command('test')
def sb_test_commands(bot, trigger, comrun):
bot.say("%s" % trigger.raw)
@plugin.nickname_command('plugins')
def sb_test_command_groups(bot, trigger, comrun):
for bplugin in list(bot._plugins.keys()):
sb.osd(str(bot._plugins[bplugin].get_meta_description()), trigger.sender)
@prerun()
@plugin.nickname_command('commands')
def sopel_commands(bot, trigger, comrun):
bot.say("testing commands")
sb.osd("%s" % sb.commands.sopel_commands, trigger.sender)
@prerun()
@plugin.nickname_command('nickname_commands')
def sopel_nickname_commands(bot, trigger, comrun):
bot.say("testing nickname_commands")
sb.osd("%s" % sb.commands.sopel_nickname_commands, trigger.sender)
@prerun()
@plugin.nickname_command('action_commands')
def sopel_action_commands(bot, trigger, comrun):
bot.say("testing action_commands")
sb.osd("%s" % sb.commands.sopel_action_commands, trigger.sender)

View File

@ -0,0 +1,22 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('(.*)')
def rule_command(bot, trigger, comrun):
return
@prerun()
@plugin.nickname_command('(.*)')
def rule_nickname_command(bot, trigger, comrun):
return
@prerun()
@plugin.action_command('(.*)')
def rule_action_command(bot, trigger, comrun):
return

View File

@ -0,0 +1,14 @@
def setup(bot, trigger, comrun):
return
"""
`bot.rules.register_command(Command('name', prefix=settings_prefix, plugin="your_plugin_id", ...))`
All you need is to create and register an instance of `sopel.plugins.rules.Command` (or use a subclass) with `bot.rules.register_command(generated_command)`.
do a for loop and register commands with the query prefix set in the configuration
unregister built-in commands
"""

View File

@ -0,0 +1,22 @@
import requests
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('dad', 'dadjoke')
def upper(bot, trigger, comrun):
fetched_str = fetch_string()
if not fetched_str:
fetched_str = 'My humor module is broken.'
comrun.say(fetched_str)
def fetch_string():
content_url = 'https://icanhazdadjoke.com'
content_page = requests.get(content_url, headers={'Accept': 'text/plain'})
fetched_str = content_page.text
return fetched_str

View File

@ -0,0 +1,25 @@
import requests
from lxml import html
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('devexcuse')
def upper(bot, trigger, comrun):
fetched_str = fetch_string()
if not fetched_str:
fetched_str = 'My humor module is broken.'
comrun.say(fetched_str)
def fetch_string():
content_url = 'http://developerexcuses.com'
response = requests.get(content_url)
tree = html.fromstring(response.content)
title_elem = tree.xpath('/html/body/div[1]/center/a')[0].text
fetched_str = str(title_elem)
return fetched_str

View File

@ -0,0 +1,19 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.nickname_command('echo')
@plugin.command('echo')
def echo(bot, trigger, comrun):
echo_count = 3
trigger_str = comrun.command["trigger_str"]
if trigger_str.split(" ")[0].isdigit():
echo_count = int(trigger_str.split(" ")[0])
trigger_str = " ".join(trigger_str.split(" ")[1:])
for x in range(echo_count):
comrun.say(trigger_str)

View File

@ -0,0 +1,71 @@
import random
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.command('leetspeak', 'leet', '1337')
def leatspeak(bot, trigger, comrun):
comrun.say(leet_convert(comrun.command["trigger_str"]))
def leet_convert(message):
message = message.strip()
replacements = (('hacker', 'haxor'), ('elite', 'eleet'), ('a', '4'), ('e', '3'),
('l', '1'), ('o', '0'), ('t', '+'))
char_map = {
"a": ["4", "@", "/-\\", "^"],
"b": ["I3", "8", "13", "|3"],
"c": ["[", "{", "<", "("],
"d": [")", "|)", "[)", "|>"],
"e": ["3", "[-"],
"f": ["|=", "ph", "|#", "/="],
"g": ["&", "6", "(_+]", "9", "C-", "gee"],
"h": ["#", "/-/", "[-]", "]-[", ")-(", "(-)", ":-:", "|-|", "}{"],
"i": ["1", "[]", "!", "|", "eye", "3y3", "]["],
"j": [",_|", "_|", "._|", "._]", ",_]", "]"],
"k": [">|", "|<", "/<", "1<", "|c", "|(", "|{"],
"l": ["1", "7", "|_", "|"],
"m": ["/\\/\\", "/V\\", "JVI", "[V]", "[]V[]", "|\\/|", "^^"],
"n": ["^/", "|\\|", "/\\/", "[\]", "<\\>", "{\\}", "|V", "/V"],
"o": ["0", "Q", "()", "oh", "[]"],
"p": ["|*", "|o", "?", "|^", "[]D"],
"q": ["(_,)", "()_", "2", "O_"],
"r": ["12", "|`", "|~", "|?", "/2", "|^", "Iz", "|9"],
"s": ["$", "5", "z", "ehs", "es"],
"t": ["7", "+", "-|-", "']['", '"|"', "~|~"],
"u": ["|_|", "(_)", "V", "L|"],
"v": ["\\/", "|/", "\\|"],
"w": ["\\/\\/", "VV", "\\N", "'//", "\\\\'", "\\^/", "\\X/"],
"x": ["><", ">|<", "}{", "ecks"],
"y": ["j", "`/", "\\|/", "\\//"],
"z": ["2", "7_", "-/_", "%", ">_", "~/_", "-\_", "-|_"],
}
leetspeak = []
# Split by individual word
for word in message.split(" "):
# Attempt full word replacement
if word.lower() in [old for old, new in replacements]:
leet_word = word.replace([old for old, new in replacements][0], [new for old, new in replacements][0])
leetspeak.append(leet_word)
# Replace indiviual characters
else:
word_chars = []
for char in word:
if char.lower() in char_map and random.random() <= 0.70: # 70% convert
possible_replacements = char_map[char.lower()]
leet_replacement = random.choice(possible_replacements)
word_chars.append(leet_replacement)
else:
word_chars.append(char)
leetspeak.append("".join(word_chars))
return " ".join(leetspeak)

View File

@ -0,0 +1,12 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.nickname_command('lower')
@plugin.command('lower')
def lower(bot, trigger, comrun):
comrun.say(str(comrun.command["trigger_str"]).lower())

View File

@ -0,0 +1,53 @@
import unicodedata
import random
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
# TODO
# allow channel operators to set a mode for the bot to mock everything a person says
@prerun()
@plugin.command('spongemock', 'smock')
def upper(bot, trigger, comrun):
comrun.say(mock_case(comrun.command["trigger_str"]))
def mock_case(text):
text = text.strip()
out = text[0].lower()
lower = True
repeat = 1
for char in text[1:]:
lo = char.lower()
up = char.upper()
if unicodedata.category(char) == 'Zs' or lo == up:
# whitespace shouldn't affect the case-repeat counting
# nor should characters whose case cannot be transformed
out += char
continue
if repeat == 2:
repeat = 1
lower = not lower
out += lo if lower else up
else:
which = random.choice([True, False])
if which:
out += lo
else:
out += up
if lower == which:
repeat += 1
else:
repeat = 1
lower = which
return out

View File

@ -0,0 +1,12 @@
from sopel import plugin
from sopel_SpiceBot_Core_Prerun import prerun
@prerun()
@plugin.nickname_command('upper')
@plugin.command('upper')
def upper(bot, trigger, comrun):
comrun.say(str(comrun.command["trigger_str"]).upper())

334
spicemanip/__init__.py Normal file
View File

@ -0,0 +1,334 @@
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Top-level package for spicemanip."""
__author__ = """Sam Zick"""
__email__ = 'sam@deathbybandaid.net'
__version__ = '0.1.8'
import random
import collections
# TODO 'this*that' or '1*that' replace either all strings matching, or an index value
# TODO reverse sort z.sort(reverse = True)
# list.extend adds lists to eachother
class Spicemanip():
def __init__(self):
pass
def __call__(self, inputs, outputtask, output_type='default'):
mainoutputtask, suboutputtask = None, None
# Input needs to be a list, but don't split a word into letters
if not inputs:
inputs = []
if isinstance(inputs, collections.abc.KeysView):
inputs = list(inputs)
elif isinstance(inputs, dict):
inputs = list(inputs.keys())
if not isinstance(inputs, list):
inputs = list(inputs.split(" "))
inputs = [x for x in inputs if x and x not in ['', ' ']]
inputs = [inputspart.strip() for inputspart in inputs]
# Create return
if outputtask == 'create':
return inputs
# Make temparray to preserve original order
temparray = []
for inputpart in inputs:
temparray.append(inputpart)
inputs = temparray
# Convert outputtask to standard
if outputtask in [0, 'complete']:
outputtask = 'string'
elif outputtask == 'index':
mainoutputtask = inputs[1]
suboutputtask = inputs[2]
inputs = inputs[0]
elif str(outputtask).isdigit():
mainoutputtask, outputtask = int(outputtask), 'number'
elif "^" in str(outputtask):
mainoutputtask = str(outputtask).split("^", 1)[0]
suboutputtask = str(outputtask).split("^", 1)[1]
outputtask = 'rangebetween'
if int(suboutputtask) < int(mainoutputtask):
mainoutputtask, suboutputtask = suboutputtask, mainoutputtask
elif str(outputtask).startswith("split_"):
mainoutputtask = str(outputtask).replace("split_", "")
outputtask = 'split'
elif str(outputtask).endswith(tuple(["!", "+", "-", "<", ">"])):
mainoutputtask = str(outputtask)
if str(outputtask).endswith("!"):
outputtask = 'exclude'
if str(outputtask).endswith("+"):
outputtask = 'incrange_plus'
if str(outputtask).endswith("-"):
outputtask = 'incrange_minus'
if str(outputtask).endswith(">"):
outputtask = 'excrange_plus'
if str(outputtask).endswith("<"):
outputtask = 'excrange_minus'
for r in (("!", ""), ("+", ""), ("-", ""), ("<", ""), (">", "")):
mainoutputtask = mainoutputtask.replace(*r)
if mainoutputtask == 'last':
mainoutputtask = len(inputs)
if outputtask == 'string':
returnvalue = inputs
else:
returnvalue = eval(
'self.' + outputtask +
'(inputs, outputtask, mainoutputtask, suboutputtask)')
# default return if not specified
if output_type == 'default':
if outputtask in [
'string', 'number', 'rangebetween', 'exclude', 'random',
'incrange_plus', 'incrange_minus', 'excrange_plus',
'excrange_minus'
]:
output_type = 'string'
elif outputtask in ['count']:
output_type = 'dict'
# verify output is correct
if output_type == 'return':
return returnvalue
if output_type == 'string':
if isinstance(returnvalue, list):
returnvalue = ' '.join(returnvalue)
elif output_type in ['list', 'array']:
if not isinstance(returnvalue, list):
returnvalue = list(returnvalue.split(" "))
returnvalue = [x for x in returnvalue if x and x not in ['', ' ']]
returnvalue = [inputspart.strip() for inputspart in returnvalue]
return returnvalue
# compare 2 lists, based on the location of an index item, passthrough needs to be [indexitem, arraytoindex, arraytocompare]
def index(self, indexitem, outputtask, arraytoindex, arraytocompare):
item = ''
for x, y in zip(arraytoindex, arraytocompare):
if x == indexitem:
item = y
return item
# split list by string
def split(self, inputs, outputtask, mainoutputtask, suboutputtask):
split_array = []
restring = ' '.join(inputs)
if mainoutputtask not in inputs:
split_array = [restring]
else:
split_array = restring.split(mainoutputtask)
split_array = [x for x in split_array if x and x not in ['', ' ']]
split_array = [inputspart.strip() for inputspart in split_array]
if split_array == []:
split_array = [[]]
return split_array
# dedupe list
def dedupe(self, inputs, outputtask, mainoutputtask, suboutputtask):
newlist = []
for inputspart in inputs:
if inputspart not in newlist:
newlist.append(inputspart)
return newlist
# Sort list
def sort(self, inputs, outputtask, mainoutputtask, suboutputtask):
return sorted(inputs)
# reverse sort list
def rsort(self, inputs, outputtask, mainoutputtask, suboutputtask):
return sorted(inputs)[::-1]
# count items in list, return dictionary
def count(self, inputs, outputtask, mainoutputtask, suboutputtask):
returndict = dict()
if not len(inputs):
return returndict
uniqueinputitems, uniquecount = [], []
for inputspart in inputs:
if inputspart not in uniqueinputitems:
uniqueinputitems.append(inputspart)
for uniqueinputspart in uniqueinputitems:
count = 0
for ele in inputs:
if (ele == uniqueinputspart):
count += 1
uniquecount.append(count)
for inputsitem, unumber in zip(uniqueinputitems, uniquecount):
returndict[inputsitem] = unumber
return returndict
# random item from list
def random(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
randomselectlist = []
for temppart in inputs:
randomselectlist.append(temppart)
while len(randomselectlist) > 1:
random.shuffle(randomselectlist)
randomselect = randomselectlist[random.randint(
0,
len(randomselectlist) - 1)]
randomselectlist.remove(randomselect)
randomselect = randomselectlist[0]
return randomselect
# remove random item from list
def exrandom(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return []
randremove = self.random(inputs, outputtask, mainoutputtask, suboutputtask)
inputs.remove(randremove)
return inputs
# Convert list into lowercase
def lower(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return [inputspart.lower() for inputspart in inputs]
# Convert list to uppercase
def upper(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return [inputspart.upper() for inputspart in inputs]
# Convert list to uppercase
def title(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return [inputspart.title() for inputspart in inputs]
# Reverse List Order
def reverse(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return []
return inputs[::-1]
# comma seperated list
def list(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return ', '.join(str(x) for x in inputs)
def list_nospace(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return ','.join(str(x) for x in inputs)
# comma seperated list with and
def andlist(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
if len(inputs) < 2:
return ' '.join(inputs)
lastentry = str("and " + str(inputs[len(inputs) - 1]))
del inputs[-1]
inputs.append(lastentry)
if len(inputs) == 2:
return ' '.join(inputs)
return ', '.join(str(x) for x in inputs)
# comma seperated list with or
def orlist(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
if len(inputs) < 2:
return ' '.join(inputs)
lastentry = str("or " + str(inputs[len(inputs) - 1]))
del inputs[-1]
inputs.append(lastentry)
if len(inputs) == 2:
return ' '.join(inputs)
return ', '.join(str(x) for x in inputs)
# exclude number
def exclude(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
del inputs[int(mainoutputtask) - 1]
return ' '.join(inputs)
# Convert list to string
def string(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return ' '.join(inputs)
# Get number item from list
def number(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
elif int(mainoutputtask) > len(inputs) or int(mainoutputtask) < 0:
return ''
else:
return inputs[int(mainoutputtask) - 1]
# Get Last item from list
def last(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return inputs[len(inputs) - 1]
# range between items in list
def rangebetween(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
if not str(mainoutputtask).isdigit() or not str(
suboutputtask).isdigit():
return ''
mainoutputtask, suboutputtask = int(mainoutputtask), int(suboutputtask)
if suboutputtask == mainoutputtask:
return self.number(inputs, outputtask, mainoutputtask, suboutputtask)
if suboutputtask < mainoutputtask:
return []
if mainoutputtask < 0:
mainoutputtask = 1
if suboutputtask > len(inputs):
suboutputtask = len(inputs)
newlist = []
for i in range(mainoutputtask, suboutputtask + 1):
newlist.append(
str(
self.number(inputs, outputtask, i, suboutputtask)))
if newlist == []:
return ''
return ' '.join(newlist)
# Forward Range includes index number
def incrange_plus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, int(mainoutputtask), len(inputs))
# Reverse Range includes index number
def incrange_minus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, 1, int(mainoutputtask))
# Forward Range excludes index number
def excrange_plus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, int(mainoutputtask) + 1, len(inputs))
# Reverse Range excludes index number
def excrange_minus(self, inputs, outputtask, mainoutputtask, suboutputtask):
if not len(inputs):
return ''
return self.rangebetween(inputs, outputtask, 1, int(mainoutputtask) - 1)
spicemanip = Spicemanip()