From bb7c378c95727fb67155855ecbf9479347382a34 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sat, 23 Apr 2022 14:35:17 -0400 Subject: [PATCH] New implementation: nogil for most of the stuff --- setup.py | 35 +- src/kloop/__init__.py | 1 + src/kloop/handle.pxd | 37 ++ src/kloop/handle.pyx | 152 +++++++ src/kloop/heapq.pxd | 15 + src/kloop/heapq.pyx | 184 ++++++++ src/kloop/includes/barrier.pxd | 1 + src/kloop/includes/libc.pxd | 11 + src/kloop/includes/linux.pxd | 1 + src/kloop/includes/openssl/__init__.py | 9 + src/kloop/includes/openssl/bio.pxd | 88 ++++ src/kloop/includes/openssl/err.pxd | 14 + src/kloop/includes/openssl/ssl.pxd | 17 + src/kloop/includes/pyssl.pxd | 34 ++ src/kloop/includes/ssl.h | 20 - src/kloop/includes/ssl.pxd | 39 -- src/kloop/ktls.pxd | 3 + src/kloop/ktls.pyx | 246 +++++------ src/kloop/loop.pxd | 48 +++ src/kloop/loop.py | 473 --------------------- src/kloop/loop.pyx | 535 +++++++++++++++++++++++ src/kloop/queue.pxd | 16 + src/kloop/queue.pyx | 165 ++++++++ src/kloop/uring.pxd | 135 ++---- src/kloop/uring.pyx | 565 +++++++------------------ 25 files changed, 1657 insertions(+), 1187 deletions(-) create mode 100644 src/kloop/handle.pxd create mode 100644 src/kloop/handle.pyx create mode 100644 src/kloop/heapq.pxd create mode 100644 src/kloop/heapq.pyx create mode 100644 src/kloop/includes/openssl/__init__.py create mode 100644 src/kloop/includes/openssl/bio.pxd create mode 100644 src/kloop/includes/openssl/err.pxd create mode 100644 src/kloop/includes/openssl/ssl.pxd create mode 100644 src/kloop/includes/pyssl.pxd delete mode 100644 src/kloop/includes/ssl.h delete mode 100644 src/kloop/includes/ssl.pxd create mode 100644 src/kloop/loop.pxd delete mode 100644 src/kloop/loop.py create mode 100644 src/kloop/loop.pyx create mode 100644 src/kloop/queue.pxd create mode 100644 src/kloop/queue.pyx diff --git a/setup.py b/setup.py index 94143bb..2c76f65 100644 --- a/setup.py +++ b/setup.py @@ -9,18 +9,47 @@ # See the Mulan PSL v2 for more details. +import sysconfig + +from setuptools import setup from Cython.Build import cythonize from Cython.Distutils import Extension -from setuptools import setup + setup( ext_modules=cythonize( [ - Extension("kloop.uring", ["src/kloop/uring.pyx"]), + Extension("kloop.loop", ["src/kloop/loop.pyx"]), Extension( "kloop.ktls", ["src/kloop/ktls.pyx"], - libraries=["ssl", "crypto"], + libraries=[ + lib.strip().removeprefix("-l") + for lib in sysconfig.get_config_var("OPENSSL_LIBS").split() + ], + include_dirs=[ + d.strip().removeprefix("-I") + for d in sysconfig.get_config_var( + "OPENSSL_INCLUDES" + ).split() + ], + library_dirs=[ + d.strip().removeprefix("-L") + for d in sysconfig.get_config_var( + "OPENSSL_LDFLAGS" + ).split() + if d.strip().startswith("-L") + ], + extra_link_args=[ + d.strip() + for d in sysconfig.get_config_var( + "OPENSSL_LDFLAGS" + ).split() + if not d.strip().startswith("-L") + ], + runtime_library_dirs=(lambda x: [x] if x else [])( + sysconfig.get_config_var("OPENSSL_RPATH") + ), ), ], language_level="3", diff --git a/src/kloop/__init__.py b/src/kloop/__init__.py index c385634..89ecbe1 100644 --- a/src/kloop/__init__.py +++ b/src/kloop/__init__.py @@ -8,4 +8,5 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. + from .loop import KLoop, KLoopPolicy diff --git a/src/kloop/handle.pxd b/src/kloop/handle.pxd new file mode 100644 index 0000000..7cdd893 --- /dev/null +++ b/src/kloop/handle.pxd @@ -0,0 +1,37 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef unsigned char CANCELLED_MASK = 1 +cdef unsigned char SCHEDULED_MASK = 1 << 1 + + +cdef struct Callback: + unsigned char mask + PyObject* handle + long long when + + +cdef class Handle: + cdef: + Callback cb + object callback + object args + KLoopImpl loop + object source_traceback + object repr + object context + + cdef run(self) + + +cdef class TimerHandle(Handle): + cdef: + bint scheduled diff --git a/src/kloop/handle.pyx b/src/kloop/handle.pyx new file mode 100644 index 0000000..8213b68 --- /dev/null +++ b/src/kloop/handle.pyx @@ -0,0 +1,152 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef class Handle: + def __init__(self, callback, args, loop, context=None): + if context is None: + context = contextvars.copy_context() + self.context = context + self.loop = loop + self.callback = callback + self.args = args + self.repr = None + self.cb.handle = self + # if self._loop.get_debug(): + # self._source_traceback = format_helpers.extract_stack( + # sys._getframe(1)) + # else: + # self._source_traceback = None + + def _repr_info(self): + info = [self.__class__.__name__] + if self.cb.mask & CANCELLED_MASK: + info.append('cancelled') + if self.callback is not None: + info.append(_format_callback_source(self.callback, self.args)) + # if self._source_traceback: + # frame = self._source_traceback[-1] + # info.append(f'created at {frame[0]}:{frame[1]}') + return info + + def __repr__(self): + # if self._repr is not None: + # return self._repr + info = self._repr_info() + return '<{}>'.format(' '.join(info)) + + def cancel(self): + if self.cb.mask & CANCELLED_MASK == 0: + self.cb.mask |= CANCELLED_MASK + # if self._loop.get_debug(): + # # Keep a representation in debug mode to keep callback and + # # parameters. For example, to log the warning + # # "Executing took 2.5 second" + # self._repr = repr(self) + self.callback = None + self.args = None + + def cancelled(self): + return self.cb.mask & CANCELLED_MASK == 1 + + cdef run(self): + try: + self.context.run(self.callback, *self.args) + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + cb = _format_callback_source(self.callback, self.args) + msg = f'Exception in callback {cb}' + context = { + 'message': msg, + 'exception': exc, + 'handle': self, + } + if self.source_traceback: + context['source_traceback'] = self.source_traceback + self.loop.call_exception_handler(context) + self = None # Needed to break cycles when an exception occurs. + + +cdef class TimerHandle(Handle): + """Object returned by timed callback registration methods.""" + + def __init__(self, when, callback, args, loop, context=None): + assert when is not None + super().__init__(callback, args, loop, context) + if self.source_traceback: + del self.source_traceback[-1] + self.cb.when = when + + # def _repr_info(self): + # info = super()._repr_info() + # pos = 2 if self._cancelled else 1 + # info.insert(pos, f'when={self._when}') + # return info + + def cancel(self): + if self.cb.mask & (CANCELLED_MASK | SCHEDULED_MASK) == SCHEDULED_MASK: + self.loop.loop.timer_cancelled_count += 1 + super().cancel() + + def when(self): + return self.cb.when + + +def _get_function_source(func): + func = inspect.unwrap(func) + if inspect.isfunction(func): + code = func.__code__ + return (code.co_filename, code.co_firstlineno) + if isinstance(func, functools.partial): + return _get_function_source(func.func) + if isinstance(func, functools.partialmethod): + return _get_function_source(func.func) + return None + + +def _format_callback_source(func, args): + func_repr = _format_callback(func, args, None) + source = _get_function_source(func) + if source: + func_repr += f' at {source[0]}:{source[1]}' + return func_repr + + +def _format_args_and_kwargs(args, kwargs): + """Format function arguments and keyword arguments. + + Special case for a single parameter: ('hello',) is formatted as ('hello'). + """ + # use reprlib to limit the length of the output + items = [] + if args: + items.extend(reprlib.repr(arg) for arg in args) + if kwargs: + items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items()) + return '({})'.format(', '.join(items)) + + +def _format_callback(func, args, kwargs, suffix=''): + if isinstance(func, functools.partial): + suffix = _format_args_and_kwargs(args, kwargs) + suffix + return _format_callback(func.func, func.args, func.keywords, suffix) + + if hasattr(func, '__qualname__') and func.__qualname__: + func_repr = func.__qualname__ + elif hasattr(func, '__name__') and func.__name__: + func_repr = func.__name__ + else: + func_repr = repr(func) + + func_repr += _format_args_and_kwargs(args, kwargs) + if suffix: + func_repr += suffix + return func_repr diff --git a/src/kloop/heapq.pxd b/src/kloop/heapq.pxd new file mode 100644 index 0000000..b7fe594 --- /dev/null +++ b/src/kloop/heapq.pxd @@ -0,0 +1,15 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef struct HeapQueue: + Callback** array + int size + int tail diff --git a/src/kloop/heapq.pyx b/src/kloop/heapq.pyx new file mode 100644 index 0000000..78a032a --- /dev/null +++ b/src/kloop/heapq.pyx @@ -0,0 +1,184 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef int HEAP_BLOCK_SIZE = 1024 + + +cdef void siftup(HeapQueue* heap, int pos, int endpos) nogil: + cdef: + int childpos, limit = endpos >> 1 + Callback** array = heap.array + + while pos < limit: + childpos = 2 * pos + 1 + if childpos + 1 < endpos: + if array[childpos].when >= array[childpos + 1].when: + childpos += 1 + array[childpos], array[pos] = array[pos], array[childpos] + pos = childpos + siftdown(heap, pos, endpos) + + +cdef void siftdown(HeapQueue* heap, int pos, int size) nogil: + cdef: + int parentpos + Callback** array = heap.array + long long new_when = array[pos].when + while pos > 0: + parentpos = (pos - 1) >> 1 + if new_when >= array[parentpos].when: + break + new_when = array[pos].when + array[pos], array[parentpos] = array[parentpos], array[pos] + pos = parentpos + + +cdef int heapq_init(HeapQueue* heap) nogil except 0: + heap.array = PyMem_RawMalloc( + sizeof(Callback*) * HEAP_BLOCK_SIZE + ) + if heap.array == NULL: + with gil: + raise MemoryError + heap.size = HEAP_BLOCK_SIZE + heap.tail = 0 + return 1 + + +cdef void heapq_uninit(HeapQueue* heap) nogil: + cdef: + int i = 0, tail = heap.tail + Callback** array = heap.array + + if array == NULL: + return + if i < tail: + with gil: + while i < tail: + Py_DECREF(array[i].handle) + PyMem_RawFree(array) + heap.array = NULL + + +cdef heapq_push_py(HeapQueue* heap, Handle handle): + cdef Callback* callback = &handle.cb + Py_INCREF(handle) + with nogil: + heapq_push(heap, callback, 1) + + +cdef int heapq_push( + HeapQueue* heap, Callback* callback, int keep +) nogil except 0: + cdef: + int size = heap.size, tail = heap.tail + Callback** array = heap.array + + if tail == size: + size += HEAP_BLOCK_SIZE + array = PyMem_RawRealloc(array, sizeof(Callback*) * size) + if array == NULL: + with gil: + raise MemoryError + heap.size = size + array[tail] = callback + size = heap.tail = tail + 1 + if keep: + siftdown(heap, tail, size) + return 1 + + +cdef Handle heapq_pop_py(HeapQueue* heap): + cdef: + Handle handle + Callback* callback + + with nogil: + callback = heapq_pop(heap) + if callback == NULL: + return None + else: + handle = callback.handle + Py_DECREF(handle) + return handle + + +cdef Callback* heapq_pop(HeapQueue* heap) nogil: + cdef: + Callback* rv + Callback** array = heap.array + int size = heap.size, tail = heap.tail + + if tail == 0: + return NULL + + tail = heap.tail = tail - 1 + rv = array[tail] + + if tail == 0: + if size > HEAP_BLOCK_SIZE: + size = HEAP_BLOCK_SIZE + if PyMem_RawRealloc(array, sizeof(Callback*) * size) != NULL: + heap.size = size + return rv + + rv, array[0] = array[0], rv + if tail > 1: + siftup(heap, 0, tail) + if size - tail >= HEAP_BLOCK_SIZE * 2: + size -= HEAP_BLOCK_SIZE + if PyMem_RawRealloc(array, sizeof(Callback*) * size) != NULL: + heap.size = size + return rv + + +cdef inline int keep_top_bit(int n) nogil: + cdef int i = 0 + while n > 1: + n >>= 1 + i += 1 + return n << i + + +cdef inline void heapq_cache_friendly_heapify(HeapQueue* heap, int tail) nogil: + cdef: + int m = tail >> 1, mhalf = m >> 1 + int leftmost = keep_top_bit(m + 1) - 1 + int i = leftmost - 1, j + + while i >= mhalf: + j = i + while True: + siftup(heap, j, tail) + if not (j & 1): + break + j >>= 1 + i -= 1 + + i = m - 1 + while i >= leftmost: + j = i + while True: + siftup(heap, j, tail) + if not (j & 1): + break + j >>= 1 + i -= 1 + + +cdef void heapq_heapify(HeapQueue* heap) nogil: + cdef int tail = heap.tail, i = (tail >> 1) - 1 + if tail > 2500: + heapq_cache_friendly_heapify(heap, tail) + else: + while i >= 0: + siftup(heap, i, tail) + i -= 1 diff --git a/src/kloop/includes/barrier.pxd b/src/kloop/includes/barrier.pxd index f06fc13..65b0c33 100644 --- a/src/kloop/includes/barrier.pxd +++ b/src/kloop/includes/barrier.pxd @@ -8,6 +8,7 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. + cdef extern from "includes/barrier.h" nogil: unsigned IO_URING_READ_ONCE(unsigned var) void io_uring_smp_store_release(void* p, unsigned v) diff --git a/src/kloop/includes/libc.pxd b/src/kloop/includes/libc.pxd index ae75fea..d87805e 100644 --- a/src/kloop/includes/libc.pxd +++ b/src/kloop/includes/libc.pxd @@ -1,3 +1,14 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + cdef extern from "sys/syscall.h" nogil: int SYS_io_uring_setup int SYS_io_uring_enter diff --git a/src/kloop/includes/linux.pxd b/src/kloop/includes/linux.pxd index 9b9afc0..c0b564b 100644 --- a/src/kloop/includes/linux.pxd +++ b/src/kloop/includes/linux.pxd @@ -8,6 +8,7 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. + cdef extern from "linux/fs.h" nogil: ctypedef int __kernel_rwf_t diff --git a/src/kloop/includes/openssl/__init__.py b/src/kloop/includes/openssl/__init__.py new file mode 100644 index 0000000..e0326fc --- /dev/null +++ b/src/kloop/includes/openssl/__init__.py @@ -0,0 +1,9 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. diff --git a/src/kloop/includes/openssl/bio.pxd b/src/kloop/includes/openssl/bio.pxd new file mode 100644 index 0000000..ccfc315 --- /dev/null +++ b/src/kloop/includes/openssl/bio.pxd @@ -0,0 +1,88 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef extern from "openssl/bio.h" nogil: + enum BIOCtrl: + BIO_CTRL_RESET # 1 opt - rewind/zero etc + BIO_CTRL_EOF # 2 opt - are we at the eof + BIO_CTRL_INFO # 3 opt - extra tit-bits + BIO_CTRL_SET # 4 man - set the 'IO' type + BIO_CTRL_GET # 5 man - get the 'IO' type + BIO_CTRL_PUSH # 6 opt - internal, used to signify change + BIO_CTRL_POP # 7 opt - internal, used to signify change + BIO_CTRL_GET_CLOSE # 8 man - set the 'close' on free + BIO_CTRL_SET_CLOSE # 9 man - set the 'close' on free + BIO_CTRL_PENDING # 10 opt - is their more data buffered + BIO_CTRL_FLUSH # 11 opt - 'flush' buffered output + BIO_CTRL_DUP # 12 man - extra stuff for 'duped' BIO + BIO_CTRL_WPENDING # 13 opt - number of bytes still to write + BIO_CTRL_SET_CALLBACK # 14 opt - set callback function + BIO_CTRL_GET_CALLBACK # 15 opt - set callback function + + ctypedef struct Method "BIO_METHOD": + pass + + ctypedef struct BIO: + pass + + int get_new_index "BIO_get_new_index" () + + Method* meth_new "BIO_meth_new" (int type, const char* name) + + int meth_set_write_ex "BIO_meth_set_write_ex" ( + Method* biom, + int (*bwrite)(BIO*, const char*, size_t, size_t*), + ) + int meth_set_write "BIO_meth_set_write" ( + Method* biom, + int (*write)(BIO*, const char*, int), + ) + + int meth_set_read_ex "BIO_meth_set_read_ex" ( + Method* biom, + int (*bread)(BIO*, char*, size_t, size_t*), + ) + int meth_set_read "BIO_meth_set_read"( + Method* biom, + int (*read)(BIO*, char*, int), + ) + + int meth_set_ctrl "BIO_meth_set_ctrl" ( + Method* biom, long (*ctrl)(BIO*, int, long, void*) + ) + + int meth_set_create "BIO_meth_set_create" ( + Method* biom, + int (*create)(BIO*), + ) + + int meth_set_destroy "BIO_meth_set_destroy" ( + Method* biom, + int (*destroy)(BIO*), + ) + + ctypedef int info_cb "BIO_info_cb" (BIO*, int, int) + int meth_set_callback_ctrl "BIO_meth_set_callback_ctrl" ( + Method* biom, + long (*callback_ctrl)(BIO*, int, info_cb*), + ) + + BIO* new "BIO_new" (const Method* type) + int up_ref "BIO_up_ref" (BIO* a) + int free "BIO_free" (BIO* a) + + void set_data "BIO_set_data" (BIO* a, void* ptr) + void* get_data "BIO_get_data" (BIO* a) + void set_init "BIO_set_init" (BIO* a, int init) + void set_shutdown "BIO_set_shutdown" (BIO* a, int shut) + + void set_retry_read "BIO_set_retry_read" (BIO *b) + void set_retry_write "BIO_set_retry_write" (BIO *b) diff --git a/src/kloop/includes/openssl/err.pxd b/src/kloop/includes/openssl/err.pxd new file mode 100644 index 0000000..3870325 --- /dev/null +++ b/src/kloop/includes/openssl/err.pxd @@ -0,0 +1,14 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef extern from "openssl/err.h" nogil: + unsigned long get_error "ERR_get_error" () + const char* reason_error_string "ERR_reason_error_string" (unsigned long e) diff --git a/src/kloop/includes/openssl/ssl.pxd b/src/kloop/includes/openssl/ssl.pxd new file mode 100644 index 0000000..b0afd60 --- /dev/null +++ b/src/kloop/includes/openssl/ssl.pxd @@ -0,0 +1,17 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef extern from "openssl/ssl.h" nogil: + ctypedef struct SSL: + pass + + int OP_ENABLE_KTLS "SSL_OP_ENABLE_KTLS" + int set_options "SSL_set_options" (SSL* ssl, int options) diff --git a/src/kloop/includes/pyssl.pxd b/src/kloop/includes/pyssl.pxd new file mode 100644 index 0000000..de79dc0 --- /dev/null +++ b/src/kloop/includes/pyssl.pxd @@ -0,0 +1,34 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +from .openssl.ssl cimport SSL +from .openssl.bio cimport BIO + +cdef extern from *: + """ + typedef struct { + PyObject_HEAD + PyObject *Socket; /* weakref to socket on which we're layered */ + SSL *ssl; + } PySSLSocket; + + typedef struct { + PyObject_HEAD + BIO *bio; + int eof_written; + } PySSLMemoryBIO; + """ + + ctypedef struct PySSLSocket: + SSL* ssl + + ctypedef struct PySSLMemoryBIO: + BIO* bio diff --git a/src/kloop/includes/ssl.h b/src/kloop/includes/ssl.h deleted file mode 100644 index fc98f7a..0000000 --- a/src/kloop/includes/ssl.h +++ /dev/null @@ -1,20 +0,0 @@ -/* -Copyright (c) 2022 Fantix King http://fantix.pro -kLoop is licensed under Mulan PSL v2. -You can use this software according to the terms and conditions of the Mulan PSL v2. -You may obtain a copy of Mulan PSL v2 at: - http://license.coscl.org.cn/MulanPSL2 -THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -See the Mulan PSL v2 for more details. -*/ - -#include "Python.h" -#include "openssl/ssl.h" - -typedef struct { - PyObject_HEAD - PyObject *Socket; /* weakref to socket on which we're layered */ - SSL *ssl; -} PySSLSocket; diff --git a/src/kloop/includes/ssl.pxd b/src/kloop/includes/ssl.pxd deleted file mode 100644 index 32c3af0..0000000 --- a/src/kloop/includes/ssl.pxd +++ /dev/null @@ -1,39 +0,0 @@ -# Copyright (c) 2022 Fantix King http://fantix.pro -# kLoop is licensed under Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -# See the Mulan PSL v2 for more details. - -cdef extern from "openssl/ssl.h" nogil: - int EVP_GCM_TLS_FIXED_IV_LEN - - ctypedef struct SSL: - pass - - ctypedef struct SSL_CTX: - pass - - int SSL_version(const SSL *s) - ctypedef void(*SSL_CTX_keylog_cb_func)(SSL *ssl, char *line) - void SSL_CTX_set_keylog_callback(SSL_CTX* ctx, SSL_CTX_keylog_cb_func cb) - SSL_CTX_keylog_cb_func SSL_CTX_get_keylog_callback(SSL_CTX* ctx) - SSL_CTX* SSL_get_SSL_CTX(SSL* ssl) - - ctypedef enum OSSL_HANDSHAKE_STATE: - pass - - OSSL_HANDSHAKE_STATE SSL_get_state(const SSL *ssl); - - unsigned int SSL3_RT_CHANGE_CIPHER_SPEC - unsigned int SSL3_RT_ALERT - unsigned int SSL3_RT_HANDSHAKE - unsigned int SSL3_RT_APPLICATION_DATA - - -cdef extern from "includes/ssl.h" nogil: - ctypedef struct PySSLSocket: - SSL *ssl diff --git a/src/kloop/ktls.pxd b/src/kloop/ktls.pxd index bf73ec7..71d0651 100644 --- a/src/kloop/ktls.pxd +++ b/src/kloop/ktls.pxd @@ -8,3 +8,6 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. + +cdef struct BIO: + int data diff --git a/src/kloop/ktls.pyx b/src/kloop/ktls.pyx index ce3f4d3..4971c93 100644 --- a/src/kloop/ktls.pyx +++ b/src/kloop/ktls.pyx @@ -8,157 +8,119 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -import socket -import hmac -import hashlib -import struct -from ssl import SSLWantReadError -from cpython cimport PyErr_SetFromErrno +import ssl +from cpython cimport PyMem_RawMalloc, PyMem_RawFree from libc cimport string -from .includes cimport libc, linux, ssl +from .includes.openssl cimport bio, err, ssl as ssl_h +from .includes cimport pyssl -cdef ssl.SSL_CTX_keylog_cb_func orig_cb -cdef secrets = {} - - -cdef void _capture_secrets(const ssl.SSL* s, const char* line) nogil: - if line != NULL: - try: - with gil: - global secrets - parts = line.decode("ISO-8859-1").split() - secrets[parts[0]] = bytes.fromhex(parts[-1]) - finally: - if orig_cb != NULL: - orig_cb(s, line) - - -def do_handshake_capturing_secrets(sslobj): +cdef object fromOpenSSLError(object err_type): cdef: - ssl.SSL* s = ( sslobj._sslobj).ssl - ssl.SSL_CTX* ctx = ssl.SSL_get_SSL_CTX(s) - global orig_cb - orig_cb = ssl.SSL_CTX_get_keylog_callback(ctx) - ssl.SSL_CTX_set_keylog_callback( - ctx, _capture_secrets - ) - try: - try: - sslobj.do_handshake() - except SSLWantReadError: - success = False - else: - success = True - if secrets: - rv = dict(secrets) - secrets.clear() - else: - rv = {} - return success, rv - finally: - ssl.SSL_CTX_set_keylog_callback(ctx, orig_cb) - - -def hkdf_expand(pseudo_random_key, label, length, hash_method=hashlib.sha384): - ''' - Expand `pseudo_random_key` and `info` into a key of length `bytes` using - HKDF's expand function based on HMAC with the provided hash (default - SHA-512). See the HKDF draft RFC and paper for usage notes. - ''' - info = struct.pack("!HB", length, len(label)) + label + b'\0' - hash_len = hash_method().digest_size - blocks_needed = length // hash_len + (0 if length % hash_len == 0 else 1) # ceil - okm = b"" - output_block = b"" - for counter in range(blocks_needed): - output_block = hmac.new( - pseudo_random_key, - (output_block + info + bytearray((counter + 1,))), - hash_method, - ).digest() - okm += output_block - return okm[:length] - - -def enable_ulp(sock): - cdef char *tls = b"tls" - if libc.setsockopt(sock.fileno(), socket.SOL_TCP, linux.TCP_ULP, tls, 4): - PyErr_SetFromErrno(IOError) - return - - -def get_state(sslobj): - cdef: - ssl.SSL* s = (sslobj._sslobj).ssl - print(ssl.SSL_get_state(s)) - - -def upgrade_aes_gcm_256(sslobj, sock, secret, sending): - cdef: - ssl.SSL* s = (sslobj._sslobj).ssl - linux.tls12_crypto_info_aes_gcm_256 crypto_info - char* seq - - if sending: - # s->rlayer->write_sequence - seq = ((s) + 6112) + unsigned long e = err.get_error() + const char* msg = err.reason_error_string(e) + if msg == NULL: + return err_type() else: - # s->rlayer->read_sequence - seq = ((s) + 6104) + return err_type(msg.decode("ISO-8859-1")) - # print(sslobj.cipher()) - string.memset(&crypto_info, 0, sizeof(crypto_info)) - crypto_info.info.cipher_type = linux.TLS_CIPHER_AES_GCM_256 - crypto_info.info.version = ssl.SSL_version(s) +cdef int bio_write_ex( + bio.BIO* b, const char* data, size_t datal, size_t* written +) nogil: + with gil: + print('bio_write', data[:datal], int(data)) + bio.set_retry_write(b) + written[0] = 0 + return 1 - key = hkdf_expand( - secret, - b'tls13 key', - linux.TLS_CIPHER_AES_GCM_256_KEY_SIZE, + +cdef int bio_read_ex( + bio.BIO* b, char* data, size_t datal, size_t* readbytes +) nogil: + with gil: + print('bio_read', datal, int(data)) + bio.set_retry_read(b) + readbytes[0] = 0 + return 1 + + +cdef long bio_ctrl(bio.BIO* b, int cmd, long num, void* ptr) nogil: + cdef long ret = 0 + with gil: + if cmd == bio.BIO_CTRL_EOF: + print("BIO_CTRL_EOF", ret) + elif cmd == bio.BIO_CTRL_PUSH: + print("BIO_CTRL_PUSH", ret) + elif cmd == bio.BIO_CTRL_FLUSH: + ret = 1 + print('BIO_CTRL_FLUSH', ret) + else: + print('bio_ctrl', cmd, num) + return ret + + +cdef int bio_create(bio.BIO* b) nogil: + cdef BIO* obj = PyMem_RawMalloc(sizeof(BIO)) + if obj == NULL: + return 0 + string.memset(obj, 0, sizeof(BIO)) + bio.set_data(b, obj) + bio.set_init(b, 1) + return 1 + + +cdef int bio_destroy(bio.BIO* b) nogil: + cdef void* obj = bio.get_data(b) + if obj != NULL: + PyMem_RawFree(obj) + bio.set_shutdown(b, 1) + return 1 + + +cdef object wrap_bio( + bio.BIO* b, + object ssl_context, + bint server_side=False, + object server_hostname=None, + object session=None, +): + cdef pyssl.PySSLMemoryBIO* c_bio + py_bio = ssl.MemoryBIO() + c_bio = py_bio + c_bio.bio, b = b, c_bio.bio + rv = ssl_context.wrap_bio( + py_bio, py_bio, server_side, server_hostname, session ) - string.memcpy( - crypto_info.key, - key, - linux.TLS_CIPHER_AES_GCM_256_KEY_SIZE, + c_bio.bio, b = b, c_bio.bio + ssl_h.set_options( + (rv._sslobj).ssl, ssl_h.OP_ENABLE_KTLS ) - string.memcpy( - crypto_info.rec_seq, - seq, - linux.TLS_CIPHER_AES_GCM_256_REC_SEQ_SIZE, - ) - iv = hkdf_expand( - secret, - b'tls13 iv', - linux.TLS_CIPHER_AES_GCM_256_IV_SIZE + - linux.TLS_CIPHER_AES_GCM_256_SALT_SIZE, - ) - string.memcpy( - crypto_info.iv, - iv+ ssl.EVP_GCM_TLS_FIXED_IV_LEN, - linux.TLS_CIPHER_AES_GCM_256_IV_SIZE, - ) - string.memcpy( - crypto_info.salt, - iv, - linux.TLS_CIPHER_AES_GCM_256_SALT_SIZE, - ) - if libc.setsockopt( - sock.fileno(), - libc.SOL_TLS, - linux.TLS_TX if sending else linux.TLS_RX, - &crypto_info, - sizeof(crypto_info), - ): - PyErr_SetFromErrno(IOError) - return - # print( - # sending, - # "iv", crypto_info.iv[:linux.TLS_CIPHER_AES_GCM_256_IV_SIZE].hex(), - # "key", crypto_info.key[:linux.TLS_CIPHER_AES_GCM_256_KEY_SIZE].hex(), - # "salt", crypto_info.salt[:linux.TLS_CIPHER_AES_GCM_256_SALT_SIZE].hex(), - # "rec_seq", crypto_info.rec_seq[:linux.TLS_CIPHER_AES_GCM_256_REC_SEQ_SIZE].hex(), - # ) + return rv + + +def test(): + cdef BIO* b + with nogil: + b = bio.new(KTLS_BIO_METHOD) + if b == NULL: + raise fromOpenSSLError(RuntimeError) + ctx = ssl.create_default_context() + return wrap_bio(b, ctx) + + +cdef bio.Method* KTLS_BIO_METHOD = bio.meth_new( + bio.get_new_index(), "kTLS BIO" +) +if not bio.meth_set_write_ex(KTLS_BIO_METHOD, bio_write_ex): + raise fromOpenSSLError(ImportError) +if not bio.meth_set_read_ex(KTLS_BIO_METHOD, bio_read_ex): + raise fromOpenSSLError(ImportError) +if not bio.meth_set_ctrl(KTLS_BIO_METHOD, bio_ctrl): + raise fromOpenSSLError(ImportError) +if not bio.meth_set_create(KTLS_BIO_METHOD, bio_create): + raise fromOpenSSLError(ImportError) +if not bio.meth_set_destroy(KTLS_BIO_METHOD, bio_destroy): + raise fromOpenSSLError(ImportError) diff --git a/src/kloop/loop.pxd b/src/kloop/loop.pxd new file mode 100644 index 0000000..344a960 --- /dev/null +++ b/src/kloop/loop.pxd @@ -0,0 +1,48 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +from cpython cimport PyErr_SetFromErrno +from cpython cimport PyMem_RawMalloc, PyMem_RawFree, PyMem_RawRealloc +from cpython cimport PyObject, Py_INCREF, Py_DECREF +from libc cimport errno, string +from posix cimport mman, unistd, time + +from .includes cimport libc, linux, barrier + +include "./handle.pxd" +include "./queue.pxd" +include "./heapq.pxd" +include "./uring.pxd" + + +cdef struct Loop: + bint stopping + Ring ring + HeapQueue scheduled + Queue ready + int timer_cancelled_count + PyObject* loop + + +cdef class KLoopImpl: + cdef: + bint closed + object thread_id + Loop loop + + cdef inline check_closed(self) + cdef inline bint _is_running(self) + cdef inline check_running(self) + cdef inline Handle _call_soon(self, callback, args, context) + cdef inline _add_callback(self, Handle handle) + cdef inline TimerHandle _call_at( + self, long long when, callback, args, context + ) diff --git a/src/kloop/loop.py b/src/kloop/loop.py deleted file mode 100644 index d2941db..0000000 --- a/src/kloop/loop.py +++ /dev/null @@ -1,473 +0,0 @@ -# Copyright (c) 2022 Fantix King http://fantix.pro -# kLoop is licensed under Mulan PSL v2. -# You can use this software according to the terms and conditions of the Mulan PSL v2. -# You may obtain a copy of Mulan PSL v2 at: -# http://license.coscl.org.cn/MulanPSL2 -# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, -# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, -# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. -# See the Mulan PSL v2 for more details. - -import asyncio.events -import asyncio.futures -import asyncio.trsock -import asyncio.transports -import contextvars -import errno -import socket -import ssl - -from . import uring, ktls - - -class Callback: - __slots__ = ("_callback", "_context", "_args", "_kwargs") - - def __init__(self, callback, context=None, args=None, kwargs=None): - if context is None: - context = contextvars.copy_context() - self._callback = callback - self._context = context - self._args = args or () - self._kwargs = kwargs or {} - - def __call__(self): - self._context.run(self._callback, *self._args, **self._kwargs) - - def __repr__(self): - return f"{self._callback} {self._args} {self._kwargs} {self._context}" - - -class KLoopSocketTransport( - asyncio.transports._FlowControlMixin, asyncio.Transport -): - __slots__ = ( - "_waiter", - "_sock", - "_protocol", - "_closing", - "_recv_buffer", - "_recv_buffer_factory", - "_read_ready_cb", - "_buffers", - "_buffer_size", - "_current_work", - "_write_waiter", - "_read_paused", - ) - - def __init__( - self, loop, sock, protocol, waiter=None, extra=None, server=None - ): - super().__init__(extra, loop) - self._extra["socket"] = asyncio.trsock.TransportSocket(sock) - try: - self._extra["sockname"] = sock.getsockname() - except OSError: - self._extra["sockname"] = None - if "peername" not in self._extra: - try: - self._extra["peername"] = sock.getpeername() - except socket.error: - self._extra["peername"] = None - - self._buffers = [] - self._buffer_size = 0 - self._current_work = None - self._sock = sock - self._closing = False - self._write_waiter = None - self._read_paused = False - - self.set_protocol(protocol) - self._waiter = waiter - - self._loop.call_soon(self._protocol.connection_made, self) - self._loop.call_soon(self._read) - - if self._waiter is not None: - self._loop.call_soon( - asyncio.futures._set_result_unless_cancelled, - self._waiter, - None, - ) - - def set_protocol(self, protocol): - if isinstance(protocol, asyncio.BufferedProtocol): - self._read_ready_cb = self._read_ready__buffer_updated - self._recv_buffer = None - self._recv_buffer_factory = protocol.get_buffer - else: - self._read_ready_cb = self._read_ready__data_received - self._recv_buffer = bytearray(64 * 1024 * 1024) - self._recv_buffer_factory = lambda _hint: self._recv_buffer - self._protocol = protocol - - def _read(self): - # print("RecvMsgWork") - if self._read_paused: - return - self._loop._selector.submit( - uring.RecvMsgWork( - self._sock.fileno(), - [self._recv_buffer_factory(-1)], - self._read_ready_cb, - ) - ) - - def _read_ready__buffer_updated(self, res, app_data): - if res < 0: - raise IOError - elif res == 0: - self._protocol.eof_received() - else: - try: - # print(f"buffer updated: {res}") - self._protocol.buffer_updated(res) - finally: - if not self._closing: - self._read() - - def _read_ready__data_received(self, res, app_data): - print("_read_ready__data_received", res) - if res < 0: - if abs(res) == errno.EAGAIN: - print('EAGAIN') - self._read() - else: - raise IOError(f"{res}") - elif res == 0: - self._protocol.eof_received() - else: - try: - print(f"data received: {res}") - data = bytes(self._recv_buffer[:res]) - # print(f"data received: {data}") - if app_data: - self._protocol.data_received(data) - finally: - if not self._closing: - self._read() - - def _write_done(self, res): - # print("_write_done") - self._current_work = None - if res < 0: - # TODO: force close transport - raise IOError() - self._buffer_size -= res - if self._buffers: - if len(self._buffers) == 1: - self._current_work = uring.SendWork( - self._sock.fileno(), self._buffers[0], self._write_done - ) - else: - self._current_work = uring.SendMsgWork( - self._sock.fileno(), self._buffers, self._write_done - ) - self._loop._selector.submit(self._current_work) - # print("more SendWork") - self._buffers = [] - elif self._closing: - self._loop.call_soon(self._call_connection_lost, None) - elif self._write_waiter is not None: - self._write_waiter() - self._write_waiter = None - self._maybe_resume_protocol() - - def write(self, data): - self._buffer_size += len(data) - if self._current_work is None: - self._current_work = uring.SendWork( - self._sock.fileno(), data, self._write_done - ) - self._loop._selector.submit(self._current_work) - # print("SendWork") - else: - self._buffers.append(data) - self._maybe_pause_protocol() - - def close(self): - if self._closing: - return - self._closing = True - if self._current_work is None: - self._loop.call_soon(self._call_connection_lost, None) - - def _call_connection_lost(self, exc): - try: - if self._protocol is not None: - self._protocol.connection_lost(exc) - finally: - self._sock.close() - self._sock = None - self._protocol = None - self._loop = None - - def get_write_buffer_size(self): - return self._buffer_size - - def pause_reading(self): - self._read_paused = True - - def resume_reading(self): - if self._read_paused: - self._read_paused = False - self._read() - - -class KLoopSSLHandshakeProtocol(asyncio.Protocol): - __slots__ = ( - "_incoming", - "_outgoing", - "_handshaking", - "_secrets", - "_app_protocol", - "_transport", - "_sslobj", - ) - - def __init__(self, sslcontext, server_hostname): - self._handshaking = True - self._secrets = {} - self._incoming = ssl.MemoryBIO() - self._outgoing = ssl.MemoryBIO() - self._sslobj = sslcontext.wrap_bio( - self._incoming, - self._outgoing, - server_side=False, - server_hostname=server_hostname, - ) - - def connection_made(self, transport): - self._transport = transport - self._handshake() - - def data_received(self, data): - self._incoming.write(data) - self._handshake() - - def _handshake(self): - ktls.get_state(self._sslobj) - success, secrets = ktls.do_handshake_capturing_secrets(self._sslobj) - self._secrets.update(secrets) - if success: - # print("handshake done") - if self._handshaking: - print(self._sslobj.cipher()) - ktls.get_state(self._sslobj) - self._handshaking = False - try: - data = self._sslobj.read(64 * 1024) - except ssl.SSLWantReadError: - data = None - if data: - while True: - try: - data += self._sslobj.read(64 * 1024) - except ssl.SSLWantReadError: - break - print("try read", data) - self._transport._upgrade_ktls_read( - self._sslobj, - self._secrets["SERVER_TRAFFIC_SECRET_0"], - data, - ) - if data := self._outgoing.read(): - print("last message") - self._transport.write(data) - self._transport._write_waiter = self._after_last_write - # self._transport._write_waiter = lambda: self._transport._loop.call_later(1, self._after_last_write) - self._transport.pause_reading() - else: - self._after_last_write() - else: - assert False - # try: - # data = self._sslobj.read(64 * 1024) - # except ssl.SSLWantReadError: - # data = None - # if data: - # while True: - # try: - # data += self._sslobj.read(64 * 1024) - # except ssl.SSLWantReadError: - # break - # print("try read", data) - # # ktls.get_state(self._sslobj) - # self._after_last_write() - # self._transport._upgrade_ktls_read( - # self._sslobj, - # self._secrets["SERVER_TRAFFIC_SECRET_0"], - # data, - # ) - else: - # print("SSLWantReadError") - if data := self._outgoing.read(): - self._transport.write(data) - - def _after_last_write(self): - print("_after_last_write") - ktls.get_state(self._sslobj) - # try: - # data = self._sslobj.read(64 * 1024) - # except ssl.SSLWantReadError: - # data = None - # if data: - # while True: - # try: - # data += self._sslobj.read(64 * 1024) - # except ssl.SSLWantReadError: - # break - # print("try read", data) - self._transport._upgrade_ktls_write( - self._sslobj, - self._secrets["CLIENT_TRAFFIC_SECRET_0"], - ) - # self._transport._upgrade_ktls_read( - # self._sslobj, - # self._secrets["SERVER_TRAFFIC_SECRET_0"], - # data, - # ) - self._transport.resume_reading() - - -class KLoopSSLTransport(KLoopSocketTransport): - __slots__ = ("_app_protocol",) - - def __init__( - self, - loop, - sock, - protocol, - waiter=None, - extra=None, - server=None, - *, - sslcontext, - server_hostname, - ): - ktls.enable_ulp(sock) - self._app_protocol = protocol - super().__init__( - loop, - sock, - KLoopSSLHandshakeProtocol(sslcontext, server_hostname), - None, - extra, - server, - ) - self._waiter = waiter - - def _upgrade_ktls_write(self, sslobj, secret): - print("_upgrade_ktls_write") - ktls.upgrade_aes_gcm_256(sslobj, self._sock, secret, True) - self.set_protocol(self._app_protocol) - self._loop.call_soon(self._app_protocol.connection_made, self) - if self._waiter is not None: - self._loop.call_soon( - asyncio.futures._set_result_unless_cancelled, - self._waiter, - None, - ) - - def _upgrade_ktls_read(self, sslobj, secret, data): - print("_upgrade_ktls_read") - ktls.upgrade_aes_gcm_256(sslobj, self._sock, secret, False) - # self.set_protocol(self._app_protocol) - # if data is not None: - # if data: - # self._app_protocol.data_received(data) - # else: - # self._app_protocol.eof_received() - - -class KLoop(asyncio.BaseEventLoop): - def __init__(self, args): - super().__init__() - self._selector = uring.Ring(*args) - - def _process_events(self, works): - for work in works: - work.complete() - - async def sock_connect(self, sock, address): - fut = self.create_future() - self._selector.submit(uring.ConnectWork(sock.fileno(), address, fut)) - return await fut - - async def getaddrinfo( - self, host, port, *, family=0, type=0, proto=0, flags=0 - ): - return socket.getaddrinfo(host, port, family, type, proto, flags) - - def _make_socket_transport( - self, sock, protocol, waiter=None, *, extra=None, server=None - ): - sock.setblocking(True) - return KLoopSocketTransport( - self, sock, protocol, waiter, extra, server - ) - - def _make_ssl_transport( - self, - rawsock, - protocol, - sslcontext, - waiter=None, - *, - server_side=False, - server_hostname=None, - extra=None, - server=None, - ssl_handshake_timeout=None, - call_connection_made=True, - ): - if sslcontext is None: - sslcontext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH) - return KLoopSSLTransport( - self, - rawsock, - protocol, - waiter, - extra, - server, - sslcontext=sslcontext, - server_hostname=server_hostname, - ) - - -class KLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy): - __slots__ = ("_selector_args",) - - def __init__( - self, queue_depth=128, sq_thread_idle=2000, sq_thread_cpu=None - ): - super().__init__() - assert queue_depth in { - 1, - 2, - 4, - 8, - 16, - 32, - 64, - 128, - 256, - 512, - 1024, - 2048, - 4096, - } - self._selector_args = (queue_depth, sq_thread_idle, sq_thread_cpu) - - def _loop_factory(self): - return KLoop(self._selector_args) - - # Child processes handling (Unix only). - - def get_child_watcher(self): - raise NotImplementedError - - def set_child_watcher(self, watcher): - raise NotImplementedError diff --git a/src/kloop/loop.pyx b/src/kloop/loop.pyx new file mode 100644 index 0000000..95bd15c --- /dev/null +++ b/src/kloop/loop.pyx @@ -0,0 +1,535 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +import time as py_time +import asyncio +import contextvars +import functools +import inspect +import os +import reprlib +import threading +import traceback + +cdef asyncio_isfuture = asyncio.isfuture +cdef asyncio_ensure_future = asyncio.ensure_future +cdef asyncio_set_running_loop = asyncio._set_running_loop +cdef asyncio_get_running_loop = asyncio._get_running_loop +cdef asyncio_Task = asyncio.Task +cdef asyncio_Future = asyncio.Future +cdef logger = asyncio.log.logger + +cdef long long SECOND_NS = 1_000_000_000 +cdef long long MAX_SELECT_TIMEOUT = 24 * 3600 * SECOND_NS + +# Minimum number of scheduled timer handles before cleanup of +# cancelled handles is performed. +cdef int MIN_SCHEDULED_TIMER_HANDLES = 100 + +# Maximum ratio of cancelled handles is performed of scheduled timer handles +# that are cancelled before cleanup +cdef int MAX_CANCELLED_TIMER_HANDLES_RATIO = 2 + +include "./handle.pyx" +include "./queue.pyx" +include "./heapq.pyx" +include "./uring.pyx" + + +cdef long long monotonic_ns() nogil except -1: + cdef: + long long rv + time.timespec ts + if time.clock_gettime(time.CLOCK_MONOTONIC, &ts): + with gil: + PyErr_SetFromErrno(OSError) + return -1 + rv = ts.tv_sec * SECOND_NS + return rv + ts.tv_nsec + + +cdef int loop_init( + Loop* loop, linux.__u32 depth, linux.io_uring_params* params +) nogil except 0: + if not queue_init(&loop.ready): + return 0 + if not heapq_init(&loop.scheduled): + queue_uninit(&loop.ready) + return 0 + if not ring_init(&loop.ring, depth, params): + queue_uninit(&loop.ready) + heapq_uninit(&loop.scheduled) + return 0 + return 1 + + +cdef int loop_uninit(Loop* loop) nogil except 0: + heapq_uninit(&loop.scheduled) + queue_uninit(&loop.ready) + return ring_uninit(&loop.ring) + + +cdef int loop_run_forever(Loop* loop) nogil except 0: + cdef: + Ring* ring = &loop.ring + Queue* ready = &loop.ready + HeapQueue* scheduled = &loop.scheduled + + while True: + if not loop_run_once(loop, ring, ready, scheduled): + return 0 + if loop.stopping: + break + return 1 + + +cdef inline int filter_cancelled_calls(Loop* loop) nogil except 0: + cdef: + HeapQueue* scheduled = &loop.scheduled + HeapQueue heap, drop + Callback** array = scheduled.array + Callback* callback + int i = 0, size = scheduled.tail + + if ( + MIN_SCHEDULED_TIMER_HANDLES < size < + loop.timer_cancelled_count * MAX_CANCELLED_TIMER_HANDLES_RATIO + ): + # Remove delayed calls that were cancelled if their number + # is too high + if not heapq_init(&drop): + return 0 + if not heapq_init(&heap): + heapq_uninit(&drop) + return 0 + while i < size: + callback = array[i] + if callback.mask & CANCELLED_MASK: + callback.mask &= ~SCHEDULED_MASK + if not heapq_push(&drop, callback, 0): + heap.tail = 0 + heapq_uninit(&heap) + drop.tail = 0 + heapq_uninit(&drop) + return 0 + elif not heapq_push(&heap, callback, 0): + heap.tail = 0 + heapq_uninit(&heap) + drop.tail = 0 + heapq_uninit(&drop) + return 0 + heapq_heapify(&heap) + heap, scheduled[0] = scheduled[0], heap + heap.tail = 0 + heapq_uninit(&heap) + heapq_uninit(&drop) + elif array[0].mask & CANCELLED_MASK: + if not heapq_init(&drop): + return 0 + while size: + callback = heapq_pop(scheduled) + if callback.mask & CANCELLED_MASK: + loop.timer_cancelled_count -= 1 + callback.mask &= ~SCHEDULED_MASK + if not heapq_push(&drop, callback, 0): + with gil: + Py_DECREF(callback.handle) + heapq_uninit(&drop) + return 0 + if not array[0].mask & CANCELLED_MASK: + break + size -= 1 + heapq_uninit(&drop) + + return 1 + + +cdef loop_run_ready(Queue* ready, int ntodo): + cdef Handle handle + + while ntodo: + handle = queue_pop_py(ready) + if not handle.cb.mask & CANCELLED_MASK: + handle.run() + ntodo -= 1 + handle = None + + +cdef inline int loop_run_once( + Loop* loop, Ring* ring, Queue* ready, HeapQueue* scheduled +) nogil except 0: + cdef: + Callback* callback + long long timeout = -1, now + int nready, res + void* data + + if scheduled.tail: + if not filter_cancelled_calls(loop): + return 0 + + if ready.head >= 0 or loop.stopping: + timeout = 0 + elif scheduled.tail: + timeout = min( + max(0, scheduled.array[0].when - monotonic_ns()), + MAX_SELECT_TIMEOUT, + ) + + nready = ring_select(ring, timeout) + if nready < 0: + return 0 + while nready: + res = ring_cq_pop(&ring.cq, &data) + nready -= 1 + + now = monotonic_ns() + 1 + while scheduled.tail and scheduled.array[0].when < now: + callback = heapq_pop(scheduled) + callback.mask &= ~SCHEDULED_MASK + if not queue_push(ready, callback): + if not heapq_push(scheduled, callback, 1): + with gil: + Py_DECREF(callback.handle) + return 0 + + if ready.head >= 0: + with gil: + loop_run_ready(ready, queue_size(ready)) + return 1 + + +class KLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy): + __slots__ = ("_selector_args",) + + def __init__( + self, queue_depth=128, sq_thread_idle=2000, sq_thread_cpu=None + ): + super().__init__() + self._selector_args = (queue_depth, sq_thread_idle, sq_thread_cpu) + + def _loop_factory(self): + return KLoop(*self._selector_args) + + # Child processes handling (Unix only). + + def get_child_watcher(self): + raise NotImplementedError + + def set_child_watcher(self, watcher): + raise NotImplementedError + + +cdef class KLoopImpl: + def __init__(self, queue_depth, sq_thread_idle, sq_thread_cpu): + cdef: + linux.io_uring_params params + linux.__u32 depth + string.memset(¶ms, 0, sizeof(params)) + params.flags = linux.IORING_SETUP_SQPOLL + params.sq_thread_idle = sq_thread_idle + if sq_thread_cpu is not None: + params.sq_thread_cpu = sq_thread_cpu + params.flags |= linux.IORING_SETUP_SQ_AFF + depth = queue_depth + self.loop.loop = self + with nogil: + loop_init(&self.loop, depth, ¶ms) + + self.closed = False + self.thread_id = None + + def __dealloc__(self): + with nogil: + loop_uninit(&self.loop) + + cdef inline check_closed(self): + if self.closed: + raise RuntimeError('Event loop is closed') + + cdef inline bint _is_running(self): + return self.thread_id is not None + + cdef inline check_running(self): + if self._is_running(): + raise RuntimeError('This event loop is already running') + if asyncio_get_running_loop() is not None: + raise RuntimeError( + 'Cannot run the event loop while another loop is running') + + def run_forever(self): + """Run until stop() is called.""" + self.check_closed() + self.check_running() + # self._set_coroutine_origin_tracking(self._debug) + self.thread_id = threading.get_ident() + + # old_agen_hooks = sys.get_asyncgen_hooks() + # sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook, + # finalizer=self._asyncgen_finalizer_hook) + try: + asyncio_set_running_loop(self) + with nogil: + loop_run_forever(&self.loop) + finally: + self.loop.stopping = 0 + self.thread_id = None + asyncio_set_running_loop(None) + # self._set_coroutine_origin_tracking(False) + # sys.set_asyncgen_hooks(*old_agen_hooks) + + def run_until_complete(self, future): + self.check_closed() + self.check_running() + + new_task = not asyncio_isfuture(future) + future = asyncio_ensure_future(future, loop=self) + if new_task: + # An exception is raised if the future didn't complete, so there + # is no need to log the "destroy pending task" message + future._log_destroy_pending = False + + future.add_done_callback(_run_until_complete_cb) + try: + self.run_forever() + except: + if new_task and future.done() and not future.cancelled(): + # The coroutine raised a BaseException. Consume the exception + # to not log a warning, the caller doesn't have access to the + # local task. + future.exception() + raise + finally: + future.remove_done_callback(_run_until_complete_cb) + if not future.done(): + raise RuntimeError('Event loop stopped before Future completed.') + + return future.result() + + def create_task(self, coro, *, name=None): + self.check_closed() + # if self._task_factory is None: + task = asyncio_Task(coro, loop=self, name=name) + if task._source_traceback: + del task._source_traceback[-1] + # else: + # task = self._task_factory(self, coro) + # tasks._set_task_name(task, name) + + return task + + def stop(self): + self.loop.stopping = 1 + + def close(self): + if self.is_running(): + raise RuntimeError("Cannot close a running event loop") + if self.closed: + return + # if self._debug: + # logger.debug("Close %r", self) + self.closed = True + # self.ready.clear() + # self._scheduled.clear() + # self._executor_shutdown_called = True + # executor = self._default_executor + # if executor is not None: + # self._default_executor = None + # executor.shutdown(wait=False) + + def fileno(self): + return self.loop.ring.ring_fd + + def is_running(self): + return self._is_running() + + def get_debug(self): + return False + + def call_soon(self, callback, *args, context=None): + cdef Handle handle + self.check_closed() + # if self._debug: + # self._check_thread() + # self._check_callback(callback, 'call_soon') + handle = self._call_soon(callback, args, context) + if handle.source_traceback: + del handle.source_traceback[-1] + return handle + + def time(self): + return (monotonic_ns()) / SECOND_NS + + def call_later(self, delay, callback, *args, context=None): + cdef long long when = monotonic_ns() + when += delay * SECOND_NS + timer = self._call_at(when, callback, args, context) + if timer.source_traceback: + del timer.source_traceback[-1] + return timer + + def call_at(self, when, callback, *args, context=None): + timer = self._call_at(when * SECOND_NS, callback, args, context) + if timer.source_traceback: + del timer.source_traceback[-1] + return timer + + cdef inline TimerHandle _call_at( + self, long long when, callback, args, context + ): + cdef TimerHandle timer + self.check_closed() + # if self._debug: + # self._check_thread() + # self._check_callback(callback, 'call_at') + timer = TimerHandle(when, callback, args, self, context) + heapq_push_py(&self.loop.scheduled, timer) + # else: + # heapq_heappush(self.heapq) + timer.cb.mask |= SCHEDULED_MASK + return timer + + cdef inline Handle _call_soon(self, callback, args, context): + cdef Handle handle = Handle(callback, args, self, context) + self._add_callback(handle) + return handle + + cdef inline _add_callback(self, Handle handle): + queue_push_py(&self.loop.ready, handle) + + def default_exception_handler(self, context): + message = context.get('message') + if not message: + message = 'Unhandled exception in event loop' + + exception = context.get('exception') + if exception is not None: + exc_info = (type(exception), exception, exception.__traceback__) + else: + exc_info = False + + # if ('source_traceback' not in context and + # self._current_handle is not None and + # self._current_handle._source_traceback): + # context['handle_traceback'] = \ + # self._current_handle._source_traceback + + log_lines = [message] + for key in sorted(context): + if key in {'message', 'exception'}: + continue + value = context[key] + if key == 'source_traceback': + tb = ''.join(traceback.format_list(value)) + value = 'Object created at (most recent call last):\n' + value += tb.rstrip() + elif key == 'handle_traceback': + tb = ''.join(traceback.format_list(value)) + value = 'Handle created at (most recent call last):\n' + value += tb.rstrip() + else: + value = repr(value) + log_lines.append(f'{key}: {value}') + + logger.error('\n'.join(log_lines), exc_info=exc_info) + + def call_exception_handler(self, context): + # if self._exception_handler is None: + try: + self.default_exception_handler(context) + except (SystemExit, KeyboardInterrupt): + raise + except BaseException: + # Second protection layer for unexpected errors + # in the default implementation, as well as for subclassed + # event loops with overloaded "default_exception_handler". + logger.error('Exception in default exception handler', + exc_info=True) + # else: + # try: + # self._exception_handler(self, context) + # except (SystemExit, KeyboardInterrupt): + # raise + # except BaseException as exc: + # # Exception in the user set custom exception handler. + # try: + # # Let's try default handler. + # self.default_exception_handler({ + # 'message': 'Unhandled error in exception handler', + # 'exception': exc, + # 'context': context, + # }) + # except (SystemExit, KeyboardInterrupt): + # raise + # except BaseException: + # # Guard 'default_exception_handler' in case it is + # # overloaded. + # logger.error('Exception in default exception handler ' + # 'while handling an unexpected error ' + # 'in custom exception handler', + # exc_info=True) + + async def shutdown_asyncgens(self): + pass + + async def shutdown_default_executor(self): + pass + + def create_future(self): + return asyncio_Future(loop=self) + + def _timer_handle_cancelled(self, handle): + pass + + async def create_connection( + self, + protocol_factory, + host=None, + port=None, + *, + ssl=None, + family=0, + proto=0, + flags=0, + sock=None, + local_addr=None, + server_hostname=None, + ssl_handshake_timeout=None, + happy_eyeballs_delay=None, + interleave=None, + ): + pass + + +class KLoop(KLoopImpl, asyncio.AbstractEventLoop): + pass + + +def _run_until_complete_cb(fut): + if not fut.cancelled(): + exc = fut.exception() + if isinstance(exc, (SystemExit, KeyboardInterrupt)): + # Issue #22429: run_forever() already finished, no need to + # stop it. + return + _get_loop(fut).stop() + + +def _get_loop(fut): + # Tries to call Future.get_loop() if it's available. + # Otherwise fallbacks to using the old '_loop' property. + try: + get_loop = fut.get_loop + except AttributeError: + pass + else: + return get_loop() + return fut._loop diff --git a/src/kloop/queue.pxd b/src/kloop/queue.pxd new file mode 100644 index 0000000..63fb923 --- /dev/null +++ b/src/kloop/queue.pxd @@ -0,0 +1,16 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef struct Queue: + Callback** array + int size + int head + int tail diff --git a/src/kloop/queue.pyx b/src/kloop/queue.pyx new file mode 100644 index 0000000..0c4a553 --- /dev/null +++ b/src/kloop/queue.pyx @@ -0,0 +1,165 @@ +# Copyright (c) 2022 Fantix King http://fantix.pro +# kLoop is licensed under Mulan PSL v2. +# You can use this software according to the terms and conditions of the Mulan PSL v2. +# You may obtain a copy of Mulan PSL v2 at: +# http://license.coscl.org.cn/MulanPSL2 +# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, +# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, +# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. +# See the Mulan PSL v2 for more details. + + +cdef int QUEUE_BLOCK_SIZE = 1024 + + +cdef int queue_init(Queue* queue) nogil except 0: + queue.array = PyMem_RawMalloc( + sizeof(Callback*) * QUEUE_BLOCK_SIZE + ) + if queue.array == NULL: + with gil: + raise MemoryError + queue.head = -1 + queue.tail = 0 + queue.size = QUEUE_BLOCK_SIZE + return 1 + + +cdef void queue_uninit(Queue* queue) nogil: + cdef: + int i = queue.head, size = queue.size, tail = queue.tail + Callback** array = queue.array + + if array == NULL: + return + if i >= 0: + with gil: + while True: + Py_DECREF(array[i].handle) + i = (i + 1) % size + if i == tail: + break + PyMem_RawFree(array) + queue.array = NULL + + +cdef queue_push_py(Queue* queue, Handle handle): + cdef Callback* callback = &handle.cb + Py_INCREF(handle) + with nogil: + queue_push(queue, callback) + + +cdef int queue_push(Queue* queue, Callback* callback) nogil except 0: + cdef: + Callback** orig = queue.array + Callback** array = orig + int size = queue.size, head = queue.head, tail = queue.tail + + if head == tail: + if head == 0: + tail = size + size += QUEUE_BLOCK_SIZE + array = PyMem_RawRealloc( + orig, sizeof(Callback*) * size + ) + if array == NULL: + with gil: + raise MemoryError + else: + tail = size + QUEUE_BLOCK_SIZE + array = PyMem_RawMalloc(sizeof(Callback*) * tail) + if array == NULL: + with gil: + raise MemoryError + queue.array = array + string.memcpy( + array, orig + head, sizeof(Callback*) * (size - head) + ) + string.memcpy(array + size - head, orig, sizeof(Callback*) * head) + size, tail = tail, size + queue.head = head = 0 + PyMem_RawFree(orig) + queue.size = size + elif head < 0: + queue.head = head = 0 + array[tail] = callback + queue.tail = (tail + 1) % size + return 1 + + +cdef Handle queue_pop_py(Queue* queue): + cdef: + Handle handle + Callback* callback + + with nogil: + callback = queue_pop(queue) + if callback == NULL: + return None + else: + handle = callback.handle + Py_DECREF(handle) + return handle + + +cdef Callback* queue_pop(Queue* queue) nogil: + cdef: + int size = queue.size, head = queue.head, tail = queue.tail + Callback* rv + Callback** orig = queue.array + Callback** array = orig + + if head < 0: + return NULL + rv = array[head] + queue.head = head = (head + 1) % size + if head == tail: + queue.head = -1 + queue.tail = 0 + if size > QUEUE_BLOCK_SIZE: + size = QUEUE_BLOCK_SIZE + if PyMem_RawRealloc( + array, sizeof(Callback*) * size + ) != NULL: + queue.size = size + elif (head - tail) % size >= QUEUE_BLOCK_SIZE * 2: + if head < tail: + size -= QUEUE_BLOCK_SIZE + if tail > size: + tail -= head + string.memmove(array, array + head, sizeof(Callback*) * tail) + queue.tail = tail + queue.head = 0 + if PyMem_RawRealloc( + array, sizeof(Callback*) * size + ) != NULL: + queue.size = size + queue.tail = tail % size + else: + array = PyMem_RawMalloc( + sizeof(Callback*) * (size - QUEUE_BLOCK_SIZE) + ) + if array != NULL: + string.memcpy( + array, orig + head, sizeof(Callback*) * (size - head) + ) + string.memcpy( + array + size - head, orig, sizeof(Callback*) * tail + ) + queue.array = array + queue.head = 0 + queue.tail = (tail - head) % size + queue.size = size - QUEUE_BLOCK_SIZE + PyMem_RawFree(orig) + return rv + + +cdef int queue_size(Queue* queue) nogil: + cdef int size = queue.size, head = queue.head, tail = queue.tail + if head < 0: + return 0 + elif head == tail: + return size + else: + return (tail - head) % size diff --git a/src/kloop/uring.pxd b/src/kloop/uring.pxd index 0cd9bd6..5496aac 100644 --- a/src/kloop/uring.pxd +++ b/src/kloop/uring.pxd @@ -8,108 +8,45 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -from .includes cimport linux, libc + +cdef struct SubmissionQueue: + unsigned* khead + unsigned* ktail + unsigned* kring_mask + unsigned* kring_entries + unsigned* kflags + unsigned* kdropped + unsigned* array + linux.io_uring_sqe* sqes + + unsigned sqe_head + unsigned sqe_tail + + size_t ring_size + void* ring_ptr -cdef class RingQueue: - cdef: - unsigned* head - unsigned* tail - unsigned* ring_mask - unsigned* ring_entries - unsigned* flags +cdef struct CompletionQueue: + unsigned* khead + unsigned* ktail + unsigned* kring_mask + unsigned* kring_entries + unsigned* kflags + unsigned* koverflow + linux.io_uring_cqe* cqes - size_t ring_size - void* ring_ptr + size_t ring_size + void* ring_ptr -cdef class SubmissionQueue(RingQueue): - cdef: - unsigned* dropped - unsigned* array - linux.io_uring_sqe* sqes - unsigned sqe_head - unsigned sqe_tail +cdef struct Ring: + SubmissionQueue sq + CompletionQueue cq + unsigned flags + int ring_fd - cdef init(self, linux.io_sqring_offsets sq_off) - cdef linux.io_uring_sqe * next_sqe(self) - cdef unsigned flush(self) - - -cdef class CompletionQueue(RingQueue): - cdef: - unsigned* overflow - linux.io_uring_cqe* cqes - - cdef init(self, linux.io_cqring_offsets cq_off) - cdef unsigned ready(self) - cdef inline object pop_works(self, unsigned ready) - - -cdef class Ring: - cdef: - SubmissionQueue sq - CompletionQueue cq - unsigned features - int fd - int enter_fd - - -cdef class Work: - cdef: - readonly object fut - public bint link - int res - - cdef void submit(self, linux.io_uring_sqe* sqe) - - cdef inline void _submit( - self, - int op, - linux.io_uring_sqe * sqe, - int fd, - void * addr, - unsigned len, - linux.__u64 offset, - ) - - -cdef class ConnectWork(Work): - cdef: - int fd - libc.sockaddr_in addr - object host_bytes - - -cdef class SendWork(Work): - cdef: - int fd - object data - char* data_ptr - linux.__u32 size - object callback - - -cdef class SendMsgWork(Work): - cdef: - int fd - list buffers - libc.msghdr msg - object callback - - -cdef class RecvWork(Work): - cdef: - int fd - object buffer - object callback - char* buffer_ptr - - -cdef class RecvMsgWork(Work): - cdef: - int fd - list buffers - libc.msghdr msg - object callback - object control_msg + unsigned features + int enter_ring_fd + linux.__u8 int_flags + linux.__u8 pad[3] + unsigned pad2 diff --git a/src/kloop/uring.pyx b/src/kloop/uring.pyx index 7a73190..6799c53 100644 --- a/src/kloop/uring.pyx +++ b/src/kloop/uring.pyx @@ -8,431 +8,174 @@ # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # See the Mulan PSL v2 for more details. -import os -import socket - -from cpython cimport Py_INCREF, Py_DECREF, PyErr_SetFromErrno -from cpython cimport PyMem_RawMalloc, PyMem_RawFree -from libc cimport errno, string -from posix cimport mman - -from .includes cimport barrier, libc, linux, ssl cdef linux.__u32 SIG_SIZE = libc._NSIG // 8 -class SubmissionQueueFull(Exception): - pass - - -cdef class RingQueue: - def __cinit__(self, size_t ring_size): - self.ring_size = ring_size - - -cdef class SubmissionQueue(RingQueue): - cdef init(self, linux.io_sqring_offsets sq_off): - self.head = (self.ring_ptr + sq_off.head) - self.tail = (self.ring_ptr + sq_off.tail) - self.ring_mask = (self.ring_ptr + sq_off.ring_mask) - self.ring_entries = (self.ring_ptr + sq_off.ring_entries) - self.flags = (self.ring_ptr + sq_off.flags) - self.dropped = (self.ring_ptr + sq_off.dropped) - self.array = (self.ring_ptr + sq_off.array) - - cdef linux.io_uring_sqe* next_sqe(self): - cdef: - unsigned int head, next - linux.io_uring_sqe* rv - head = barrier.io_uring_smp_load_acquire(self.head) - next = self.sqe_tail + 1 - if next - head <= self.ring_entries[0]: - rv = &self.sqes[self.sqe_tail & self.ring_mask[0]] - self.sqe_tail = next - return rv - else: - # TODO: IORING_ENTER_SQ_WAIT and retry - raise SubmissionQueueFull() - - cdef unsigned flush(self): - cdef: - unsigned mask = self.ring_mask[0] - unsigned tail = self.tail[0] - unsigned to_submit = self.sqe_tail - self.sqe_head - - if to_submit: - while to_submit: - self.array[tail & mask] = self.sqe_head & mask - tail += 1 - self.sqe_head += 1 - to_submit -= 1 - barrier.io_uring_smp_store_release(self.tail, tail) - return tail - self.head[0] - - -cdef class CompletionQueue(RingQueue): - cdef init(self, linux.io_cqring_offsets cq_off): - self.head = (self.ring_ptr + cq_off.head) - self.tail = (self.ring_ptr + cq_off.tail) - self.ring_mask = (self.ring_ptr + cq_off.ring_mask) - self.ring_entries = (self.ring_ptr + cq_off.ring_entries) - self.overflow = (self.ring_ptr + cq_off.overflow) - self.cqes = (self.ring_ptr + cq_off.cqes) - if cq_off.flags: - self.flags = (self.ring_ptr + cq_off.flags) - - cdef unsigned ready(self): - return barrier.io_uring_smp_load_acquire(self.tail) - self.head[0] - - cdef inline object pop_works(self, unsigned ready): - cdef: - object rv = [] - Work work - unsigned head, mask, last - linux.io_uring_cqe* cqe - head = self.head[0] - mask = self.ring_mask[0] - last = head + ready - while head != last: - cqe = self.cqes + (head & mask) - work = cqe.user_data - work.res = cqe.res - rv.append(work) - Py_DECREF(work) - head += 1 - barrier.io_uring_smp_store_release(self.head, self.head[0] + ready) - return rv - - -cdef class Ring: - def __cinit__( - self, - linux.__u32 queue_depth, - linux.__u32 sq_thread_idle, - object sq_thread_cpu, - ): - cdef: - linux.io_uring_params params - int fd - size_t size - void* ptr - - # Prepare io_uring_params - string.memset(¶ms, 0, sizeof(params)) - params.flags = linux.IORING_SETUP_SQPOLL - if sq_thread_cpu is not None: - params.flags |= linux.IORING_SETUP_SQ_AFF - params.sq_thread_cpu = sq_thread_cpu - params.sq_thread_idle = sq_thread_idle - - # SYSCALL: SYS_io_uring_setup - fd = libc.syscall(libc.SYS_io_uring_setup, queue_depth, ¶ms) - if fd < 0: +cdef int ring_init( + Ring* ring, + linux.__u32 queue_depth, + linux.io_uring_params* params +) nogil except 0: + # SYSCALL: SYS_io_uring_setup + ring.ring_fd = ring.enter_ring_fd = libc.syscall( + libc.SYS_io_uring_setup, queue_depth, params + ) + if ring.ring_fd < 0: + with gil: PyErr_SetFromErrno(IOError) - return - self.fd = self.enter_fd = fd + return 0 - # Initialize 2 RingQueue and mmap the ring_ptr - size = max( - params.sq_off.array + params.sq_entries * sizeof(unsigned), - params.cq_off.cqes + params.cq_entries * sizeof(linux.io_uring_cqe) + # mmap the ring_ptr + ring.sq.ring_size = ring.cq.ring_size = max( + params.sq_off.array + params.sq_entries * sizeof(unsigned), + params.cq_off.cqes + params.cq_entries * sizeof(linux.io_uring_cqe) + ) + ring.sq.ring_ptr = ring.cq.ring_ptr = mman.mmap( + NULL, + ring.sq.ring_size, + mman.PROT_READ | mman.PROT_WRITE, + mman.MAP_SHARED | mman.MAP_POPULATE, + ring.ring_fd, + linux.IORING_OFF_SQ_RING, + ) + if ring.sq.ring_ptr == mman.MAP_FAILED: + with gil: + PyErr_SetFromErrno(IOError) + return 0 + + # Initialize the SubmissionQueue + ring.sq.khead = (ring.sq.ring_ptr + params.sq_off.head) + ring.sq.ktail = (ring.sq.ring_ptr + params.sq_off.tail) + ring.sq.kring_mask = (ring.sq.ring_ptr + params.sq_off.ring_mask) + ring.sq.kring_entries = (ring.sq.ring_ptr + params.sq_off.ring_entries) + ring.sq.kflags = (ring.sq.ring_ptr + params.sq_off.flags) + ring.sq.kdropped = (ring.sq.ring_ptr + params.sq_off.dropped) + ring.sq.array = (ring.sq.ring_ptr + params.sq_off.array) + ring.sq.sqes = mman.mmap( + NULL, + params.sq_entries * sizeof(linux.io_uring_sqe), + mman.PROT_READ | mman.PROT_WRITE, + mman.MAP_SHARED | mman.MAP_POPULATE, + ring.ring_fd, + linux.IORING_OFF_SQES, + ) + if ring.sq.sqes == mman.MAP_FAILED: + mman.munmap(ring.sq.ring_ptr, ring.sq.ring_size) + with gil: + PyErr_SetFromErrno(IOError) + return 0 + + # Initialize the CompletionQueue + ring.cq.khead = (ring.cq.ring_ptr + params.cq_off.head) + ring.cq.ktail = (ring.cq.ring_ptr + params.cq_off.tail) + ring.cq.kring_mask = (ring.cq.ring_ptr + params.cq_off.ring_mask) + ring.cq.kring_entries = (ring.cq.ring_ptr + params.cq_off.ring_entries) + ring.cq.koverflow = (ring.cq.ring_ptr + params.cq_off.overflow) + ring.cq.cqes = (ring.cq.ring_ptr + params.cq_off.cqes) + if params.cq_off.flags: + ring.cq.kflags = (ring.cq.ring_ptr + params.cq_off.flags) + + return 1 + + +cdef int ring_uninit(Ring* ring) nogil except 0: + if ring.sq.sqes != NULL: + mman.munmap( + ring.sq.sqes, + ring.sq.kring_entries[0] * sizeof(linux.io_uring_sqe), ) - self.sq = SubmissionQueue(size) - self.cq = CompletionQueue(size) - ptr = mman.mmap( - NULL, - size, - mman.PROT_READ | mman.PROT_WRITE, - mman.MAP_SHARED | mman.MAP_POPULATE, - fd, - linux.IORING_OFF_SQ_RING, - ) - if ptr == mman.MAP_FAILED: - PyErr_SetFromErrno(IOError) - return - self.sq.ring_ptr = self.cq.ring_ptr = ptr - - # Initialize the SubmissionQueue - self.sq.init(params.sq_off) - size = params.sq_entries * sizeof(linux.io_uring_sqe) - ptr = mman.mmap( - NULL, - size, - mman.PROT_READ | mman.PROT_WRITE, - mman.MAP_SHARED | mman.MAP_POPULATE, - fd, - linux.IORING_OFF_SQES, - ) - if ptr == mman.MAP_FAILED: - mman.munmap(self.sq.ring_ptr, self.sq.ring_size) - PyErr_SetFromErrno(IOError) - return - self.sq.sqes = ptr - - # Initialize the CompletionQueue - self.cq.init(params.cq_off) - - self.features = params.features - - def __dealloc__(self): - if self.sq is not None: - if self.sq.sqes != NULL: - mman.munmap( - self.sq.sqes, self.sq.ring_entries[0] * sizeof(linux.io_uring_sqe) - ) - if self.sq.ring_ptr != NULL: - mman.munmap(self.sq.ring_ptr, self.sq.ring_size) - if self.fd: - os.close(self.fd) - - def submit(self, Work work): - cdef linux.io_uring_sqe* sqe = self.sq.next_sqe() - # print(f"submit: {work}") - work.submit(sqe) - - def select(self, timeout): - cdef: - int flags = linux.IORING_ENTER_EXT_ARG, ret - bint need_enter = False - unsigned submit, ready - unsigned wait_nr = 0 - linux.io_uring_getevents_arg arg - linux.__kernel_timespec ts - - # Call enter if we have no CQE ready and timeout is not 0, or else we - # handle the ready CQEs first. - ready = self.cq.ready() - if not ready and timeout is not 0: - flags |= linux.IORING_ENTER_GETEVENTS - if timeout is not None: - ts.tv_sec = int(timeout) - ts.tv_nsec = int(round((timeout - ts.tv_sec) * 1_000_000_000)) - arg.ts = &ts - wait_nr = 1 - need_enter = True - - # Flush the submission queue, and only wakeup the SQ polling thread if - # there is something for the kernel to handle. - submit = self.sq.flush() - if submit: - barrier.io_uring_smp_mb() - if barrier.IO_URING_READ_ONCE( - self.sq.flags[0] - ) & linux.IORING_SQ_NEED_WAKEUP: - arg.ts = 0 - flags |= linux.IORING_ENTER_SQ_WAKEUP - need_enter = True - - if need_enter: - arg.sigmask = 0 - arg.sigmask_sz = SIG_SIZE - # print(f"SYS_io_uring_enter(submit={submit}, wait_nr={wait_nr}, " - # f"flags={flags:b}, timeout={timeout})") - with nogil: - ret = libc.syscall( - libc.SYS_io_uring_enter, - self.enter_fd, - submit, - wait_nr, - flags, - &arg, - sizeof(arg), - ) - if ret < 0: - if errno.errno != errno.ETIME: - print(f"SYS_io_uring_enter(submit={submit}, wait_nr={wait_nr}, " - f"flags={flags:b}, timeout={timeout})") - PyErr_SetFromErrno(IOError) - return - - ready = self.cq.ready() - - if ready: - return self.cq.pop_works(ready) - else: - return [] - - -cdef class Work: - def __init__(self, fut): - self.fut = fut - self.link = False - self.res = -1 - - cdef void submit(self, linux.io_uring_sqe* sqe): - raise NotImplementedError - - cdef inline void _submit( - self, - int op, - linux.io_uring_sqe * sqe, - int fd, - void* addr, - unsigned len, - linux.__u64 offset, - ): - string.memset(sqe, 0, sizeof(linux.io_uring_sqe)) - sqe.opcode = op - sqe.fd = fd - sqe.off = offset - sqe.addr = addr - sqe.len = len - if self.link: - sqe.flags = linux.IOSQE_IO_LINK - else: - sqe.flags = 0 - sqe.user_data = self - Py_INCREF(self) - - def complete(self): - if self.res == 0: - self.fut.set_result(None) - else: - def _raise(): - errno.errno = abs(self.res) + if ring.sq.ring_ptr != NULL: + mman.munmap(ring.sq.ring_ptr, ring.sq.ring_size) + if ring.ring_fd: + if unistd.close(ring.ring_fd) != 0: + with gil: PyErr_SetFromErrno(IOError) - try: - _raise() - except IOError as ex: - self.fut.set_exception(ex) + return 0 + return 1 -cdef class ConnectWork(Work): - def __init__(self, int fd, sockaddr, fut): - cdef char* host - super().__init__(fut) - self.fd = fd - host_str, port = sockaddr - self.host_bytes = host_str.encode() - host = self.host_bytes - string.memset(&self.addr, 0, sizeof(self.addr)) - self.addr.sin_family = socket.AF_INET - if not libc.inet_pton(socket.AF_INET, host, &self.addr.sin_addr): - PyErr_SetFromErrno(IOError) - return - self.addr.sin_port = libc.htons(port) +cdef inline unsigned ring_sq_flush(SubmissionQueue* sq) nogil: + cdef: + unsigned mask = sq.kring_mask[0] + unsigned tail = sq.ktail[0] + unsigned to_submit = sq.sqe_tail - sq.sqe_head - cdef void submit(self, linux.io_uring_sqe* sqe): - self._submit( - linux.IORING_OP_CONNECT, - sqe, - self.fd, - &self.addr, - 0, - sizeof(self.addr), - ) + if to_submit: + while to_submit: + sq.array[tail & mask] = sq.sqe_head & mask + tail += 1 + sq.sqe_head += 1 + to_submit -= 1 + barrier.io_uring_smp_store_release(sq.ktail, tail) + return tail - sq.khead[0] -cdef class SendWork(Work): - def __init__(self, int fd, data, callback): - self.fd = fd - self.data = data - self.data_ptr = data - self.size = len(data) - self.callback = callback +cdef int ring_select(Ring* ring, long long timeout) nogil except -1: + cdef: + int flags = linux.IORING_ENTER_EXT_ARG + bint need_enter = 0 + unsigned submit, ready + unsigned wait_nr = 0 + linux.io_uring_getevents_arg arg + linux.__kernel_timespec ts + CompletionQueue* cq = &ring.cq + SubmissionQueue* sq = &ring.sq - cdef void submit(self, linux.io_uring_sqe* sqe): - self._submit(linux.IORING_OP_SEND, sqe, self.fd, self.data_ptr, self.size, 0) + # Call enter if we have no CQE ready and timeout is not 0, or else we + # handle the ready CQEs first. + ready = barrier.io_uring_smp_load_acquire(cq.ktail) - cq.khead[0] + if not ready and timeout != 0: + flags |= linux.IORING_ENTER_GETEVENTS + if timeout > 0: + ts.tv_sec = timeout // SECOND_NS + ts.tv_nsec = timeout % SECOND_NS + arg.ts = &ts + wait_nr = 1 + need_enter = 1 - def complete(self): - self.callback(self.res) + # Flush the submission queue, and only wakeup the SQ polling thread if + # there is something for the kernel to handle. + submit = ring_sq_flush(sq) + if submit: + barrier.io_uring_smp_mb() + if barrier.IO_URING_READ_ONCE( + sq.kflags[0] + ) & linux.IORING_SQ_NEED_WAKEUP: + arg.ts = 0 + flags |= linux.IORING_ENTER_SQ_WAKEUP + need_enter = 1 + + if need_enter: + arg.sigmask = 0 + arg.sigmask_sz = SIG_SIZE + if libc.syscall( + libc.SYS_io_uring_enter, + ring.enter_ring_fd, + submit, + wait_nr, + flags, + &arg, + sizeof(arg), + ) < 0: + if errno.errno != errno.ETIME: + with gil: + PyErr_SetFromErrno(IOError) + return -1 + + ready = barrier.io_uring_smp_load_acquire(cq.ktail) - cq.khead[0] + + return ready -cdef class SendMsgWork(Work): - def __init__(self, int fd, buffers, callback): - self.fd = fd - self.buffers = buffers - self.callback = callback - self.msg.msg_iov = PyMem_RawMalloc( - sizeof(libc.iovec) * len(buffers) - ) - if self.msg.msg_iov == NULL: - raise MemoryError - self.msg.msg_iovlen = len(buffers) - for i, buf in enumerate(buffers): - self.msg.msg_iov[i].iov_base = buf - self.msg.msg_iov[i].iov_len = len(buf) - - def __dealloc__(self): - if self.msg.msg_iov != NULL: - PyMem_RawFree(self.msg.msg_iov) - - cdef void submit(self, linux.io_uring_sqe* sqe): - self._submit(linux.IORING_OP_SENDMSG, sqe, self.fd, &self.msg, 1, 0) - - def complete(self): - if self.res < 0: - errno.errno = abs(self.res) - PyErr_SetFromErrno(IOError) - return - self.callback(self.res) - - -cdef class RecvWork(Work): - def __init__(self, int fd, buffer, callback): - self.fd = fd - self.buffer = buffer - self.callback = callback - self.buffer_ptr = buffer - - cdef void submit(self, linux.io_uring_sqe* sqe): - self._submit( - linux.IORING_OP_RECV, sqe, self.fd, self.buffer_ptr, len(self.buffer), 0 - ) - - def complete(self): - if self.res < 0: - errno.errno = abs(self.res) - PyErr_SetFromErrno(IOError) - return - self.callback(self.res) - - -cdef class RecvMsgWork(Work): - def __init__(self, int fd, buffers, callback): - cdef size_t size = libc.CMSG_SPACE(sizeof(unsigned char)) - self.fd = fd - self.buffers = buffers - self.callback = callback - self.msg.msg_iov = PyMem_RawMalloc( - sizeof(libc.iovec) * len(buffers) - ) - if self.msg.msg_iov == NULL: - raise MemoryError - self.msg.msg_iovlen = len(buffers) - for i, buf in enumerate(buffers): - self.msg.msg_iov[i].iov_base = buf - self.msg.msg_iov[i].iov_len = len(buf) - self.control_msg = bytearray(size) - self.msg.msg_control = self.control_msg - self.msg.msg_controllen = size - - def __dealloc__(self): - if self.msg.msg_iov != NULL: - PyMem_RawFree(self.msg.msg_iov) - - cdef void submit(self, linux.io_uring_sqe* sqe): - self._submit(linux.IORING_OP_RECVMSG, sqe, self.fd, &self.msg, 1, 0) - - def complete(self): - cdef: - libc.cmsghdr* cmsg - unsigned char* cmsg_data - unsigned char record_type - # if self.res < 0: - # errno.errno = abs(self.res) - # PyErr_SetFromErrno(IOError) - # return - app_data = True - if self.msg.msg_controllen: - print('msg_controllen:', self.msg.msg_controllen) - cmsg = libc.CMSG_FIRSTHDR(&self.msg) - if cmsg.cmsg_level == libc.SOL_TLS and cmsg.cmsg_type == linux.TLS_GET_RECORD_TYPE: - cmsg_data = libc.CMSG_DATA(cmsg) - record_type = (cmsg_data)[0] - if record_type != ssl.SSL3_RT_APPLICATION_DATA: - app_data = False - print(f'cmsg.len={cmsg.cmsg_len}, cmsg.level={cmsg.cmsg_level}, cmsg.type={cmsg.cmsg_type}') - print(f'record type: {record_type}') - print('flags:', self.msg.msg_flags) - self.callback(self.res, app_data) +cdef inline int ring_cq_pop(CompletionQueue* cq, void** data) nogil: + cdef: + unsigned head + linux.io_uring_cqe* cqe + int res + head = cq.khead[0] + cqe = cq.cqes + (head & cq.kring_mask[0]) + data[0] = cqe.user_data + res = cqe.res + barrier.io_uring_smp_store_release(cq.khead, head + 1) + return res