Skip to content

Commit

Permalink
Add base of group commands.
Browse files Browse the repository at this point in the history
  • Loading branch information
EvieePy committed Sep 20, 2024
1 parent dea8e9f commit 6e4e798
Show file tree
Hide file tree
Showing 3 changed files with 130 additions and 20 deletions.
5 changes: 3 additions & 2 deletions twitchio/ext/commands/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class _MetaComponent:
__all_checks__: list[Callable[..., Coroutine[Any, Any, None]]]

@classmethod
def _component_special(cls, obj: Any) -> ...:
def _component_special(cls, obj: Any) -> Any:
setattr(obj, "__component_special__", True)
cls.__component_specials__.append(obj.__name__)

Expand Down Expand Up @@ -89,7 +89,8 @@ def __new__(cls, *args: Any, **Kwargs: Any) -> Self:
if not member.extras:
member._extras = self.__component_extras__

commands[name] = member
if not member.parent: # type: ignore
commands[name] = member

elif hasattr(member, "__listener_name__"):
if name.startswith("component_"):
Expand Down
24 changes: 17 additions & 7 deletions twitchio/ext/commands/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
from collections.abc import Iterable
from typing import TYPE_CHECKING, Any

from .core import Command, Group
from .exceptions import *
from .view import StringView

Expand All @@ -41,6 +40,7 @@
from user import PartialUser

from .bot import Bot
from .core import Command


class Context:
Expand All @@ -51,7 +51,9 @@ def __init__(self, message: ChatMessage, bot: Bot) -> None:

self._raw_content: str = self._message.text
self._command: Command[Any, ...] | None = None
self._invoked_subcommand: Command[Any, ...] | None = None
self._invoked_with: str | None = None
self._subcommand_trigger: str | None = None
self._command_failed: bool = False
self._error_dispatched: bool = False

Expand All @@ -65,6 +67,18 @@ def message(self) -> ChatMessage:
def command(self) -> Command[Any, ...] | None:
return self._command

@property
def invoked_subcommand(self) -> Command[Any, ...] | None:
return self._invoked_subcommand

@property
def subcommand_trigger(self) -> str | None:
return self._subcommand_trigger

@property
def invoked_with(self) -> str | None:
return self._invoked_with

@property
def chatter(self) -> PartialUser:
return self._message.chatter
Expand Down Expand Up @@ -147,12 +161,8 @@ def _get_command(self) -> None:
if not command:
return

if isinstance(command, Group):
...

else:
self._command = command
return
self._command = command
return

async def _prepare(self) -> None:
await self._get_prefix()
Expand Down
121 changes: 110 additions & 11 deletions twitchio/ext/commands/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
"Mixin",
"Group",
"command",
"group",
)


Expand Down Expand Up @@ -75,14 +76,26 @@ def __init__(
self._injected: Component_T | None = None
self._error: Callable[[Component_T, CommandErrorPayload], Coro] | Callable[[CommandErrorPayload], Coro] | None = None
self._extras: dict[Any, Any] = kwargs.get("extras", {})
self._parent: Group[Component_T, P] | None = kwargs.get("parent", None)

def __repr__(self) -> str:
return f"Command(name={self._name}, parent={self.parent})"

def __str__(self) -> str:
return self._name

async def __call__(self, context: Context) -> None:
callback = self._callback(self._injected, context) if self._injected else self._callback(context) # type: ignore
await callback

@property
def component(self) -> Component_T | None:
return self._injected

@property
def parent(self) -> Group[Component_T, P] | None:
return self._parent

@property
def name(self) -> str:
return self._name
Expand Down Expand Up @@ -180,23 +193,109 @@ def remove_command(self, name: str, /) -> Command[Any, ...] | None:
return command


class Group(Mixin[Component_T], Command[Component_T, P]):
def walk_commands(self) -> ...: ...


def command(name: str | None = None, aliases: list[str] | None = None, extras: dict[Any, Any] | None = None) -> Any:
def command(
name: str | None = None, aliases: list[str] | None = None, extras: dict[Any, Any] | None = None, **kwargs: Any
) -> Any:
def wrapper(
func: Callable[Concatenate[Component_T, Context, P], Coro]
| Callable[Concatenate[Context, P], Coro]
| Command[Any, ...],
func: Callable[Concatenate[Component_T, Context, P], Coro] | Callable[Concatenate[Context, P], Coro],
) -> Command[Any, ...]:
if isinstance(func, Command):
raise ValueError(f'Callback "{func._callback.__name__}" is already a Command.')
raise ValueError(f'Callback "{func._callback}" is already a Command.') # type: ignore

if not asyncio.iscoroutinefunction(func):
raise TypeError(f'Command callback for "{func.__qualname__}" must be a coroutine function.')

name_ = name or func.__name__
return Command(name=name_, callback=func, aliases=aliases or [], extras=extras or {})
func_name = func.__name__
name_ = func_name if not name else name.strip().replace(" ", "") or func_name

return Command(name=name_, callback=func, aliases=aliases or [], extras=extras or {}, **kwargs)

return wrapper


def group(
name: str | None = None, aliases: list[str] | None = None, extras: dict[Any, Any] | None = None, **kwargs: Any
) -> Any:
def wrapper(
func: Callable[Concatenate[Component_T, Context, P], Coro] | Callable[Concatenate[Context, P], Coro],
) -> Group[Any, ...]:
if isinstance(func, Command):
raise ValueError(f'Callback "{func._callback.__name__}" is already a Command.') # type: ignore

if not asyncio.iscoroutinefunction(func):
raise TypeError(f'Group callback for "{func.__qualname__}" must be a coroutine function.')

func_name = func.__name__
name_ = func_name if not name else name.strip().replace(" ", "") or func_name

return Group(name=name_, callback=func, aliases=aliases or [], extras=extras or {}, **kwargs)

return wrapper


class Group(Mixin[Component_T], Command[Component_T, P]):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self._invoke_fallback: bool = kwargs.get("invoke_fallback", False)

def walk_commands(self) -> ...:
for command in self._commands.values():
yield command

if isinstance(command, Group):
yield from command.walk_commands()

async def _invoke(self, context: Context) -> None:
view = context._view
view.skip_ws()
trigger = view.get_word()

next_ = self._commands.get(trigger, None)
context._command = next_ or self
context._invoked_subcommand = next_
context._invoked_with = f"{context._invoked_with} {trigger}"
context._subcommand_trigger = trigger or None

if not trigger:
await super()._invoke(context=context)

elif trigger and next_:
await next_.invoke(context=context)

elif self._invoke_fallback:
await super()._invoke(context=context)

else:
raise CommandNotFound(f'The sub-command "{trigger}" for group "{self._name}" was not found.')

async def invoke(self, context: Context) -> None:
try:
await self._invoke(context)
except CommandError as e:
await self._dispatch_error(context, e)

def command(
self, name: str | None = None, aliases: list[str] | None = None, extras: dict[Any, Any] | None = None
) -> Any:
def wrapper(
func: Callable[Concatenate[Component_T, Context, P], Coro] | Callable[Concatenate[Context, P], Coro],
) -> Command[Any, ...]:
new = command(name=name, aliases=aliases, extras=extras, parent=self)(func)

self.add_command(new)
return new

return wrapper

def group(
self, name: str | None = None, aliases: list[str] | None = None, extras: dict[Any, Any] | None = None, **kwargs: Any
) -> Any:
def wrapper(
func: Callable[Concatenate[Component_T, Context, P], Coro] | Callable[Concatenate[Context, P], Coro],
) -> Command[Any, ...]:
new = group(name=name, aliases=aliases, extras=extras, parent=self)(func)

self.add_command(new)
return new

return wrapper

0 comments on commit 6e4e798

Please sign in to comment.