From 64b105657afd43c59ea9214d92db1cdebe0432d8 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Mon, 25 Apr 2022 08:02:28 -0400 Subject: [PATCH] Socket connect PoC --- src/kloop/includes/libc.pxd | 12 ++++-- src/kloop/loop.pxd | 2 + src/kloop/loop.pyx | 18 ++++++--- src/kloop/tcp.pxd | 34 +++++++++++++++++ src/kloop/tcp.pyx | 76 +++++++++++++++++++++++++++++++++++++ src/kloop/uring.pxd | 6 +++ src/kloop/uring.pyx | 58 +++++++++++++++++++++++++--- 7 files changed, 193 insertions(+), 13 deletions(-) create mode 100644 src/kloop/tcp.pxd create mode 100644 src/kloop/tcp.pyx diff --git a/src/kloop/includes/libc.pxd b/src/kloop/includes/libc.pxd index d87805e..7a50081 100644 --- a/src/kloop/includes/libc.pxd +++ b/src/kloop/includes/libc.pxd @@ -24,9 +24,6 @@ cdef extern from "sys/socket.h" nogil: ctypedef int socklen_t int SOL_TLS - int setsockopt(int socket, int level, int option_name, - const void *option_value, socklen_t option_len); - struct in_addr: pass @@ -52,6 +49,15 @@ cdef extern from "sys/socket.h" nogil: unsigned char* CMSG_DATA(cmsghdr* cmsg) size_t CMSG_SPACE(size_t length) + int socket(int domain, int type, int protocol) + int setsockopt( + int socket, + int level, + int option_name, + const void* option_value, + socklen_t option_len, + ) + cdef extern from "arpa/inet.h" nogil: int inet_pton(int af, char* src, void* dst) diff --git a/src/kloop/loop.pxd b/src/kloop/loop.pxd index 344a960..86bc8f2 100644 --- a/src/kloop/loop.pxd +++ b/src/kloop/loop.pxd @@ -21,6 +21,7 @@ include "./handle.pxd" include "./queue.pxd" include "./heapq.pxd" include "./uring.pxd" +include "./tcp.pxd" cdef struct Loop: @@ -38,6 +39,7 @@ cdef class KLoopImpl: object thread_id Loop loop + cpdef create_future(self) cdef inline check_closed(self) cdef inline bint _is_running(self) cdef inline check_running(self) diff --git a/src/kloop/loop.pyx b/src/kloop/loop.pyx index 95bd15c..613fc1b 100644 --- a/src/kloop/loop.pyx +++ b/src/kloop/loop.pyx @@ -16,6 +16,7 @@ import functools import inspect import os import reprlib +import socket import threading import traceback @@ -42,6 +43,7 @@ include "./handle.pyx" include "./queue.pyx" include "./heapq.pyx" include "./uring.pyx" +include "./tcp.pyx" cdef long long monotonic_ns() nogil except -1: @@ -169,8 +171,8 @@ cdef inline int loop_run_once( cdef: Callback* callback long long timeout = -1, now - int nready, res - void* data + int nready + RingCallback* cb if scheduled.tail: if not filter_cancelled_calls(loop): @@ -188,7 +190,9 @@ cdef inline int loop_run_once( if nready < 0: return 0 while nready: - res = ring_cq_pop(&ring.cq, &data) + ring_cq_pop(&ring.cq, &cb) + if cb != NULL and not cb.callback(cb): + return 0 nready -= 1 now = monotonic_ns() + 1 @@ -483,7 +487,7 @@ cdef class KLoopImpl: async def shutdown_default_executor(self): pass - def create_future(self): + cpdef create_future(self): return asyncio_Future(loop=self) def _timer_handle_cancelled(self, handle): @@ -506,7 +510,11 @@ cdef class KLoopImpl: happy_eyeballs_delay=None, interleave=None, ): - pass + cdef TCPTransport transport = TCPTransport.new(protocol_factory, self) + r = socket.getaddrinfo(host, port)[0] + host, port = r[-1] + waiter = transport.connect(host, port) + return transport, await waiter class KLoop(KLoopImpl, asyncio.AbstractEventLoop): diff --git a/src/kloop/tcp.pxd b/src/kloop/tcp.pxd new file mode 100644 index 0000000..3d3abe9 --- /dev/null +++ b/src/kloop/tcp.pxd @@ -0,0 +1,34 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef struct TCPConnect: + int fd + libc.sockaddr_in addr + RingCallback ring_cb + Loop* loop + Callback* cb + + +cdef class TCPTransport: + cdef: + KLoopImpl loop + int fd + TCPConnect connector + object waiter + object protocol_factory + object host_bytes + Handle handle + + @staticmethod + cdef TCPTransport new(object protocol_factory, KLoopImpl loop) + + cdef connect(self, host, port) + cdef connect_cb(self) diff --git a/src/kloop/tcp.pyx b/src/kloop/tcp.pyx new file mode 100644 index 0000000..50fb876 --- /dev/null +++ b/src/kloop/tcp.pyx @@ -0,0 +1,76 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef int tcp_connect(TCPConnect* connector) nogil: + return ring_sq_submit_connect( + &connector.loop.ring.sq, + connector.fd, + &connector.addr, + &connector.ring_cb, + ) + + +cdef int tcp_connect_cb(RingCallback* cb) nogil except 0: + cdef TCPConnect* connector = cb.data + return queue_push(&connector.loop.ready, connector.cb) + + +cdef class TCPTransport: + @staticmethod + cdef TCPTransport new(object protocol_factory, KLoopImpl loop): + cdef TCPTransport rv = TCPTransport.__new__(TCPTransport) + rv.protocol_factory = protocol_factory + rv.loop = loop + return rv + + cdef connect(self, host, port): + cdef: + int fd + TCPConnect* c = &self.connector + + fd = libc.socket(socket.AF_INET, socket.SOCK_STREAM, 0) + if fd == -1: + PyErr_SetFromErrno(IOError) + return + self.host_bytes = host.encode() + if not libc.inet_pton( + socket.AF_INET, self.host_bytes, &c.addr.sin_addr + ): + PyErr_SetFromErrno(IOError) + return + c.addr.sin_family = socket.AF_INET + c.addr.sin_port = libc.htons(port) + c.fd = self.fd = fd + c.loop = &self.loop.loop + c.ring_cb.callback = tcp_connect_cb + c.ring_cb.data = c + self.handle = Handle(self.connect_cb, (self,), self.loop, None) + c.cb = &self.handle.cb + if not tcp_connect(c): + raise ValueError("Submission queue is full!") + self.waiter = self.loop.create_future() + return self.waiter + + cdef connect_cb(self): + if self.connector.ring_cb.res != 0: + try: + errno.errno = abs(self.connector.ring_cb.res) + PyErr_SetFromErrno(IOError) + except IOError as e: + self.waiter.set_exception(e) + return + + protocol = self.protocol_factory() + self.waiter.set_result(protocol) + self.loop.call_soon(protocol.connection_made, self) + + def get_extra_info(self, x): + return None diff --git a/src/kloop/uring.pxd b/src/kloop/uring.pxd index 5496aac..e0dfcc9 100644 --- a/src/kloop/uring.pxd +++ b/src/kloop/uring.pxd @@ -50,3 +50,9 @@ cdef struct Ring: linux.__u8 int_flags linux.__u8 pad[3] unsigned pad2 + + +cdef struct RingCallback: + void* data + int res + int (*callback)(RingCallback* cb) nogil except 0 diff --git a/src/kloop/uring.pyx b/src/kloop/uring.pyx index 6799c53..91d75ca 100644 --- a/src/kloop/uring.pyx +++ b/src/kloop/uring.pyx @@ -168,14 +168,62 @@ cdef int ring_select(Ring* ring, long long timeout) nogil except -1: return ready -cdef inline int ring_cq_pop(CompletionQueue* cq, void** data) nogil: +cdef inline void ring_cq_pop(CompletionQueue* cq, RingCallback** callback) nogil: cdef: unsigned head linux.io_uring_cqe* cqe - int res + RingCallback* ret head = cq.khead[0] cqe = cq.cqes + (head & cq.kring_mask[0]) - data[0] = cqe.user_data - res = cqe.res + ret = cqe.user_data + ret.res = cqe.res + callback[0] = ret barrier.io_uring_smp_store_release(cq.khead, head + 1) - return res + + +cdef inline int ring_sq_submit( + SubmissionQueue* sq, + linux.__u8 op, + int fd, + unsigned long addr, + unsigned len, + linux.__u64 offset, + bint link, + RingCallback* callback, +) nogil: + cdef: + unsigned int head, next + linux.io_uring_sqe* sqe + head = barrier.io_uring_smp_load_acquire(sq.khead) + next = sq.sqe_tail + 1 + if next - head <= sq.kring_entries[0]: + sqe = &sq.sqes[sq.sqe_tail & sq.kring_mask[0]] + sq.sqe_tail = next + + string.memset(sqe, 0, sizeof(linux.io_uring_sqe)) + sqe.opcode = op + sqe.fd = fd + sqe.off = offset + sqe.addr = addr + sqe.len = len + if link: + sqe.flags = linux.IOSQE_IO_LINK + sqe.user_data = callback + return 1 + else: + return 0 + + +cdef int ring_sq_submit_connect( + SubmissionQueue* sq, int fd, libc.sockaddr_in* addr, RingCallback* callback +) nogil: + return ring_sq_submit( + sq, + linux.IORING_OP_CONNECT, + fd, + addr, + 0, + sizeof(addr[0]), + 0, + callback, + )