Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
161 changes: 161 additions & 0 deletions irc/client_aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,15 @@
import logging
import threading

from jaraco.stream import buffer
from . import connection
from .client import (
Event,
Reactor,
ServerConnection,
ServerNotConnectedError,
SimpleIRCClient,
DCCConnection,
_ping_ponger,
)

Expand Down Expand Up @@ -212,6 +214,148 @@ def disconnect(self, message=""):
self._handle_event(Event("disconnect", self.server, "", [message]))


class DCCProtocol(IrcProtocol):
"""
A protocol for handling DCC connections.

Currently, DCCProtocol uses the same methods as `IrcProtocol`
for handling incoming data. This should be fine for most use
cases, but in the unlikely event that a DCC connection needs to
handle incoming data in a different way than an IRC connection,
this class will need to be overridden.
"""


class AioDCCConnection(DCCConnection):
"""
An asyncio-based DCCConnection.

This class overrides select-based methods with asyncio-based ones.
"""

rector: "AioReactor"
buffer_class = buffer.DecodingLineBuffer

protocol_class = DCCProtocol
protocol: DCCProtocol
socket: None
connected: bool
passive: bool
peeraddress: str
peerport: int

async def connect(
self, address: str, port: int, connect_factory: connection.AioFactory = connection.AioFactory()
) -> "AioDCCConnection":
"""Connect/reconnect to a DCC peer.

Arguments:
address -- Host/IP address of the peer.
port -- The port number to connect to.
connect_factory -- A callable that takes the event loop and the
server address, and returns a connection (with a socket interface)

Returns the DCCConnection object.
"""
self.peeraddress = address
self.peerport = port
self.handlers = {}
self.buffer = self.buffer_class()

self.connect_factory = connect_factory
protocol_instance = self.protocol_class(self, self.reactor.loop)
connection = self.connect_factory(protocol_instance, (self.peeraddress, self.peerport))
transport, protocol = await connection

self.transport = transport
self.protocol = protocol

self.connected = True
self.reactor._on_connect(self.protocol, self.transport)
return self

# TODO: implement listen() in asyncio way
async def listen(self, addr=None) -> "AioDCCConnection":
"""Wait for a connection/reconnection from a DCC peer.

Returns the DCCConnection object.

The local IP address and port are available as
self.peeraddress and self.peerport.
"""

raise NotImplementedError()

def disconnect(self, message: str = "") -> None:
"""Hang up the connection and close the object.

Arguments:

message -- Quit message.
"""
try:
del self.connected
except AttributeError:
return

self.transport.close()

self.reactor._handle_event(
self, Event("dcc_disconnect", self.peeraddress, "", [message])
)
self.reactor._remove_connection(self)

def process_data(self, new_data: bytes) -> None:
"""
handles incoming data from the `DCCProtocol` connection.
"""

if self.passive and not self.connected:
raise NotImplementedError()
# TODO: implement passive DCC connection

if self.dcctype == "chat":
self.buffer.feed(new_data)

chunks = list(self.buffer)

if len(self.buffer) > 2**14:
# Bad peer! Naughty peer!
log.info(
"Received >16k from a peer without a newline; " "disconnecting."
)
self.disconnect()
return
else:
chunks = [new_data]

command = "dccmsg"
prefix = self.peeraddress
target = None
for chunk in chunks:
log.debug("FROM PEER: %s", chunk)
arguments = [chunk]
log.debug(
"command: %s, source: %s, target: %s, arguments: %s",
command,
prefix,
target,
arguments,
)
event = Event(command, prefix, target, arguments)
self.reactor._handle_event(self, event)

def send_bytes(self, bytes: bytes) -> None:
"""
Send data to DCC peer.
"""
try:
self.transport.write(bytes)
log.debug("TO PEER: %r\n", bytes)
except OSError:
self.disconnect("Connection reset by peer.")


class AioReactor(Reactor):
"""
Processes message from on or more asyncio-based IRC server connections.
Expand Down Expand Up @@ -248,6 +392,7 @@ async def my_repeating_message(connection):
"""

connection_class = AioConnection
dcc_connection_class = AioDCCConnection

def __do_nothing(*args, **kwargs):
pass
Expand Down Expand Up @@ -275,6 +420,21 @@ class definied above.
"""
self.loop.run_forever()

def dcc(self, dcctype="chat"):
"""Creates and returns a DCCConnection object.

Arguments:

dcctype -- "chat" for DCC CHAT connections or "raw" for
DCC SEND (or other DCC types). If "chat",
incoming data will be split in newline-separated
chunks. If "raw", incoming data is not touched.
"""
with self.mutex:
conn = self.dcc_connection_class(self, dcctype)
self.connections.append(conn)
return conn


class AioSimpleIRCClient(SimpleIRCClient):
"""A simple single-server IRC client class.
Expand All @@ -288,6 +448,7 @@ class AioSimpleIRCClient(SimpleIRCClient):
"""

reactor_class = AioReactor
reactor: AioReactor

def connect(self, *args, **kwargs):
self.reactor.loop.run_until_complete(self.connection.connect(*args, **kwargs))