1
0
Fork 0
mirror of https://gitee.com/fantix/kloop.git synced 2024-06-02 13:29:37 +00:00

New implementation: nogil for most of the stuff

This commit is contained in:
Fantix King 2022-04-23 14:35:17 -04:00
parent 67c2c9aa2a
commit bb7c378c95
No known key found for this signature in database
GPG key ID: 95304B04071CCDB4
25 changed files with 1657 additions and 1187 deletions

View file

@ -9,18 +9,47 @@
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
import sysconfig
from setuptools import setup
from Cython.Build import cythonize from Cython.Build import cythonize
from Cython.Distutils import Extension from Cython.Distutils import Extension
from setuptools import setup
setup( setup(
ext_modules=cythonize( ext_modules=cythonize(
[ [
Extension("kloop.uring", ["src/kloop/uring.pyx"]), Extension("kloop.loop", ["src/kloop/loop.pyx"]),
Extension( Extension(
"kloop.ktls", "kloop.ktls",
["src/kloop/ktls.pyx"], ["src/kloop/ktls.pyx"],
libraries=["ssl", "crypto"], libraries=[
lib.strip().removeprefix("-l")
for lib in sysconfig.get_config_var("OPENSSL_LIBS").split()
],
include_dirs=[
d.strip().removeprefix("-I")
for d in sysconfig.get_config_var(
"OPENSSL_INCLUDES"
).split()
],
library_dirs=[
d.strip().removeprefix("-L")
for d in sysconfig.get_config_var(
"OPENSSL_LDFLAGS"
).split()
if d.strip().startswith("-L")
],
extra_link_args=[
d.strip()
for d in sysconfig.get_config_var(
"OPENSSL_LDFLAGS"
).split()
if not d.strip().startswith("-L")
],
runtime_library_dirs=(lambda x: [x] if x else [])(
sysconfig.get_config_var("OPENSSL_RPATH")
),
), ),
], ],
language_level="3", language_level="3",

View file

@ -8,4 +8,5 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
from .loop import KLoop, KLoopPolicy from .loop import KLoop, KLoopPolicy

37
src/kloop/handle.pxd Normal file
View file

@ -0,0 +1,37 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef unsigned char CANCELLED_MASK = 1
cdef unsigned char SCHEDULED_MASK = 1 << 1
cdef struct Callback:
unsigned char mask
PyObject* handle
long long when
cdef class Handle:
cdef:
Callback cb
object callback
object args
KLoopImpl loop
object source_traceback
object repr
object context
cdef run(self)
cdef class TimerHandle(Handle):
cdef:
bint scheduled

152
src/kloop/handle.pyx Normal file
View file

@ -0,0 +1,152 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef class Handle:
def __init__(self, callback, args, loop, context=None):
if context is None:
context = contextvars.copy_context()
self.context = context
self.loop = loop
self.callback = callback
self.args = args
self.repr = None
self.cb.handle = <PyObject*>self
# if self._loop.get_debug():
# self._source_traceback = format_helpers.extract_stack(
# sys._getframe(1))
# else:
# self._source_traceback = None
def _repr_info(self):
info = [self.__class__.__name__]
if self.cb.mask & CANCELLED_MASK:
info.append('cancelled')
if self.callback is not None:
info.append(_format_callback_source(self.callback, self.args))
# if self._source_traceback:
# frame = self._source_traceback[-1]
# info.append(f'created at {frame[0]}:{frame[1]}')
return info
def __repr__(self):
# if self._repr is not None:
# return self._repr
info = self._repr_info()
return '<{}>'.format(' '.join(info))
def cancel(self):
if self.cb.mask & CANCELLED_MASK == 0:
self.cb.mask |= CANCELLED_MASK
# if self._loop.get_debug():
# # Keep a representation in debug mode to keep callback and
# # parameters. For example, to log the warning
# # "Executing <Handle...> took 2.5 second"
# self._repr = repr(self)
self.callback = None
self.args = None
def cancelled(self):
return self.cb.mask & CANCELLED_MASK == 1
cdef run(self):
try:
self.context.run(self.callback, *self.args)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException as exc:
cb = _format_callback_source(self.callback, self.args)
msg = f'Exception in callback {cb}'
context = {
'message': msg,
'exception': exc,
'handle': self,
}
if self.source_traceback:
context['source_traceback'] = self.source_traceback
self.loop.call_exception_handler(context)
self = None # Needed to break cycles when an exception occurs.
cdef class TimerHandle(Handle):
"""Object returned by timed callback registration methods."""
def __init__(self, when, callback, args, loop, context=None):
assert when is not None
super().__init__(callback, args, loop, context)
if self.source_traceback:
del self.source_traceback[-1]
self.cb.when = when
# def _repr_info(self):
# info = super()._repr_info()
# pos = 2 if self._cancelled else 1
# info.insert(pos, f'when={self._when}')
# return info
def cancel(self):
if self.cb.mask & (CANCELLED_MASK | SCHEDULED_MASK) == SCHEDULED_MASK:
self.loop.loop.timer_cancelled_count += 1
super().cancel()
def when(self):
return self.cb.when
def _get_function_source(func):
func = inspect.unwrap(func)
if inspect.isfunction(func):
code = func.__code__
return (code.co_filename, code.co_firstlineno)
if isinstance(func, functools.partial):
return _get_function_source(func.func)
if isinstance(func, functools.partialmethod):
return _get_function_source(func.func)
return None
def _format_callback_source(func, args):
func_repr = _format_callback(func, args, None)
source = _get_function_source(func)
if source:
func_repr += f' at {source[0]}:{source[1]}'
return func_repr
def _format_args_and_kwargs(args, kwargs):
"""Format function arguments and keyword arguments.
Special case for a single parameter: ('hello',) is formatted as ('hello').
"""
# use reprlib to limit the length of the output
items = []
if args:
items.extend(reprlib.repr(arg) for arg in args)
if kwargs:
items.extend(f'{k}={reprlib.repr(v)}' for k, v in kwargs.items())
return '({})'.format(', '.join(items))
def _format_callback(func, args, kwargs, suffix=''):
if isinstance(func, functools.partial):
suffix = _format_args_and_kwargs(args, kwargs) + suffix
return _format_callback(func.func, func.args, func.keywords, suffix)
if hasattr(func, '__qualname__') and func.__qualname__:
func_repr = func.__qualname__
elif hasattr(func, '__name__') and func.__name__:
func_repr = func.__name__
else:
func_repr = repr(func)
func_repr += _format_args_and_kwargs(args, kwargs)
if suffix:
func_repr += suffix
return func_repr

15
src/kloop/heapq.pxd Normal file
View file

@ -0,0 +1,15 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef struct HeapQueue:
Callback** array
int size
int tail

184
src/kloop/heapq.pyx Normal file
View file

@ -0,0 +1,184 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef int HEAP_BLOCK_SIZE = 1024
cdef void siftup(HeapQueue* heap, int pos, int endpos) nogil:
cdef:
int childpos, limit = endpos >> 1
Callback** array = heap.array
while pos < limit:
childpos = 2 * pos + 1
if childpos + 1 < endpos:
if array[childpos].when >= array[childpos + 1].when:
childpos += 1
array[childpos], array[pos] = array[pos], array[childpos]
pos = childpos
siftdown(heap, pos, endpos)
cdef void siftdown(HeapQueue* heap, int pos, int size) nogil:
cdef:
int parentpos
Callback** array = heap.array
long long new_when = array[pos].when
while pos > 0:
parentpos = (pos - 1) >> 1
if new_when >= array[parentpos].when:
break
new_when = array[pos].when
array[pos], array[parentpos] = array[parentpos], array[pos]
pos = parentpos
cdef int heapq_init(HeapQueue* heap) nogil except 0:
heap.array = <Callback**>PyMem_RawMalloc(
sizeof(Callback*) * HEAP_BLOCK_SIZE
)
if heap.array == NULL:
with gil:
raise MemoryError
heap.size = HEAP_BLOCK_SIZE
heap.tail = 0
return 1
cdef void heapq_uninit(HeapQueue* heap) nogil:
cdef:
int i = 0, tail = heap.tail
Callback** array = heap.array
if array == NULL:
return
if i < tail:
with gil:
while i < tail:
Py_DECREF(<object>array[i].handle)
PyMem_RawFree(array)
heap.array = NULL
cdef heapq_push_py(HeapQueue* heap, Handle handle):
cdef Callback* callback = &handle.cb
Py_INCREF(handle)
with nogil:
heapq_push(heap, callback, 1)
cdef int heapq_push(
HeapQueue* heap, Callback* callback, int keep
) nogil except 0:
cdef:
int size = heap.size, tail = heap.tail
Callback** array = heap.array
if tail == size:
size += HEAP_BLOCK_SIZE
array = <Callback**>PyMem_RawRealloc(array, sizeof(Callback*) * size)
if array == NULL:
with gil:
raise MemoryError
heap.size = size
array[tail] = callback
size = heap.tail = tail + 1
if keep:
siftdown(heap, tail, size)
return 1
cdef Handle heapq_pop_py(HeapQueue* heap):
cdef:
Handle handle
Callback* callback
with nogil:
callback = heapq_pop(heap)
if callback == NULL:
return None
else:
handle = <Handle>callback.handle
Py_DECREF(handle)
return handle
cdef Callback* heapq_pop(HeapQueue* heap) nogil:
cdef:
Callback* rv
Callback** array = heap.array
int size = heap.size, tail = heap.tail
if tail == 0:
return NULL
tail = heap.tail = tail - 1
rv = array[tail]
if tail == 0:
if size > HEAP_BLOCK_SIZE:
size = HEAP_BLOCK_SIZE
if PyMem_RawRealloc(array, sizeof(Callback*) * size) != NULL:
heap.size = size
return rv
rv, array[0] = array[0], rv
if tail > 1:
siftup(heap, 0, tail)
if size - tail >= HEAP_BLOCK_SIZE * 2:
size -= HEAP_BLOCK_SIZE
if PyMem_RawRealloc(array, sizeof(Callback*) * size) != NULL:
heap.size = size
return rv
cdef inline int keep_top_bit(int n) nogil:
cdef int i = 0
while n > 1:
n >>= 1
i += 1
return n << i
cdef inline void heapq_cache_friendly_heapify(HeapQueue* heap, int tail) nogil:
cdef:
int m = tail >> 1, mhalf = m >> 1
int leftmost = keep_top_bit(m + 1) - 1
int i = leftmost - 1, j
while i >= mhalf:
j = i
while True:
siftup(heap, j, tail)
if not (j & 1):
break
j >>= 1
i -= 1
i = m - 1
while i >= leftmost:
j = i
while True:
siftup(heap, j, tail)
if not (j & 1):
break
j >>= 1
i -= 1
cdef void heapq_heapify(HeapQueue* heap) nogil:
cdef int tail = heap.tail, i = (tail >> 1) - 1
if tail > 2500:
heapq_cache_friendly_heapify(heap, tail)
else:
while i >= 0:
siftup(heap, i, tail)
i -= 1

View file

@ -8,6 +8,7 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
cdef extern from "includes/barrier.h" nogil: cdef extern from "includes/barrier.h" nogil:
unsigned IO_URING_READ_ONCE(unsigned var) unsigned IO_URING_READ_ONCE(unsigned var)
void io_uring_smp_store_release(void* p, unsigned v) void io_uring_smp_store_release(void* p, unsigned v)

View file

@ -1,3 +1,14 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef extern from "sys/syscall.h" nogil: cdef extern from "sys/syscall.h" nogil:
int SYS_io_uring_setup int SYS_io_uring_setup
int SYS_io_uring_enter int SYS_io_uring_enter

View file

@ -8,6 +8,7 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
cdef extern from "linux/fs.h" nogil: cdef extern from "linux/fs.h" nogil:
ctypedef int __kernel_rwf_t ctypedef int __kernel_rwf_t

View file

@ -0,0 +1,9 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.

View file

@ -0,0 +1,88 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef extern from "openssl/bio.h" nogil:
enum BIOCtrl:
BIO_CTRL_RESET # 1 opt - rewind/zero etc
BIO_CTRL_EOF # 2 opt - are we at the eof
BIO_CTRL_INFO # 3 opt - extra tit-bits
BIO_CTRL_SET # 4 man - set the 'IO' type
BIO_CTRL_GET # 5 man - get the 'IO' type
BIO_CTRL_PUSH # 6 opt - internal, used to signify change
BIO_CTRL_POP # 7 opt - internal, used to signify change
BIO_CTRL_GET_CLOSE # 8 man - set the 'close' on free
BIO_CTRL_SET_CLOSE # 9 man - set the 'close' on free
BIO_CTRL_PENDING # 10 opt - is their more data buffered
BIO_CTRL_FLUSH # 11 opt - 'flush' buffered output
BIO_CTRL_DUP # 12 man - extra stuff for 'duped' BIO
BIO_CTRL_WPENDING # 13 opt - number of bytes still to write
BIO_CTRL_SET_CALLBACK # 14 opt - set callback function
BIO_CTRL_GET_CALLBACK # 15 opt - set callback function
ctypedef struct Method "BIO_METHOD":
pass
ctypedef struct BIO:
pass
int get_new_index "BIO_get_new_index" ()
Method* meth_new "BIO_meth_new" (int type, const char* name)
int meth_set_write_ex "BIO_meth_set_write_ex" (
Method* biom,
int (*bwrite)(BIO*, const char*, size_t, size_t*),
)
int meth_set_write "BIO_meth_set_write" (
Method* biom,
int (*write)(BIO*, const char*, int),
)
int meth_set_read_ex "BIO_meth_set_read_ex" (
Method* biom,
int (*bread)(BIO*, char*, size_t, size_t*),
)
int meth_set_read "BIO_meth_set_read"(
Method* biom,
int (*read)(BIO*, char*, int),
)
int meth_set_ctrl "BIO_meth_set_ctrl" (
Method* biom, long (*ctrl)(BIO*, int, long, void*)
)
int meth_set_create "BIO_meth_set_create" (
Method* biom,
int (*create)(BIO*),
)
int meth_set_destroy "BIO_meth_set_destroy" (
Method* biom,
int (*destroy)(BIO*),
)
ctypedef int info_cb "BIO_info_cb" (BIO*, int, int)
int meth_set_callback_ctrl "BIO_meth_set_callback_ctrl" (
Method* biom,
long (*callback_ctrl)(BIO*, int, info_cb*),
)
BIO* new "BIO_new" (const Method* type)
int up_ref "BIO_up_ref" (BIO* a)
int free "BIO_free" (BIO* a)
void set_data "BIO_set_data" (BIO* a, void* ptr)
void* get_data "BIO_get_data" (BIO* a)
void set_init "BIO_set_init" (BIO* a, int init)
void set_shutdown "BIO_set_shutdown" (BIO* a, int shut)
void set_retry_read "BIO_set_retry_read" (BIO *b)
void set_retry_write "BIO_set_retry_write" (BIO *b)

View file

@ -0,0 +1,14 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef extern from "openssl/err.h" nogil:
unsigned long get_error "ERR_get_error" ()
const char* reason_error_string "ERR_reason_error_string" (unsigned long e)

View file

@ -0,0 +1,17 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef extern from "openssl/ssl.h" nogil:
ctypedef struct SSL:
pass
int OP_ENABLE_KTLS "SSL_OP_ENABLE_KTLS"
int set_options "SSL_set_options" (SSL* ssl, int options)

View file

@ -0,0 +1,34 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
from .openssl.ssl cimport SSL
from .openssl.bio cimport BIO
cdef extern from *:
"""
typedef struct {
PyObject_HEAD
PyObject *Socket; /* weakref to socket on which we're layered */
SSL *ssl;
} PySSLSocket;
typedef struct {
PyObject_HEAD
BIO *bio;
int eof_written;
} PySSLMemoryBIO;
"""
ctypedef struct PySSLSocket:
SSL* ssl
ctypedef struct PySSLMemoryBIO:
BIO* bio

View file

@ -1,20 +0,0 @@
/*
Copyright (c) 2022 Fantix King http://fantix.pro
kLoop is licensed under Mulan PSL v2.
You can use this software according to the terms and conditions of the Mulan PSL v2.
You may obtain a copy of Mulan PSL v2 at:
http://license.coscl.org.cn/MulanPSL2
THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
See the Mulan PSL v2 for more details.
*/
#include "Python.h"
#include "openssl/ssl.h"
typedef struct {
PyObject_HEAD
PyObject *Socket; /* weakref to socket on which we're layered */
SSL *ssl;
} PySSLSocket;

View file

@ -1,39 +0,0 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef extern from "openssl/ssl.h" nogil:
int EVP_GCM_TLS_FIXED_IV_LEN
ctypedef struct SSL:
pass
ctypedef struct SSL_CTX:
pass
int SSL_version(const SSL *s)
ctypedef void(*SSL_CTX_keylog_cb_func)(SSL *ssl, char *line)
void SSL_CTX_set_keylog_callback(SSL_CTX* ctx, SSL_CTX_keylog_cb_func cb)
SSL_CTX_keylog_cb_func SSL_CTX_get_keylog_callback(SSL_CTX* ctx)
SSL_CTX* SSL_get_SSL_CTX(SSL* ssl)
ctypedef enum OSSL_HANDSHAKE_STATE:
pass
OSSL_HANDSHAKE_STATE SSL_get_state(const SSL *ssl);
unsigned int SSL3_RT_CHANGE_CIPHER_SPEC
unsigned int SSL3_RT_ALERT
unsigned int SSL3_RT_HANDSHAKE
unsigned int SSL3_RT_APPLICATION_DATA
cdef extern from "includes/ssl.h" nogil:
ctypedef struct PySSLSocket:
SSL *ssl

View file

@ -8,3 +8,6 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
cdef struct BIO:
int data

View file

@ -8,157 +8,119 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
import socket
import hmac
import hashlib
import struct
from ssl import SSLWantReadError
from cpython cimport PyErr_SetFromErrno import ssl
from cpython cimport PyMem_RawMalloc, PyMem_RawFree
from libc cimport string from libc cimport string
from .includes cimport libc, linux, ssl from .includes.openssl cimport bio, err, ssl as ssl_h
from .includes cimport pyssl
cdef ssl.SSL_CTX_keylog_cb_func orig_cb cdef object fromOpenSSLError(object err_type):
cdef secrets = {} cdef:
unsigned long e = err.get_error()
const char* msg = err.reason_error_string(e)
if msg == NULL:
return err_type()
else:
return err_type(msg.decode("ISO-8859-1"))
cdef void _capture_secrets(const ssl.SSL* s, const char* line) nogil: cdef int bio_write_ex(
if line != NULL: bio.BIO* b, const char* data, size_t datal, size_t* written
try: ) nogil:
with gil: with gil:
global secrets print('bio_write', data[:datal], int(<int>data))
parts = line.decode("ISO-8859-1").split() bio.set_retry_write(b)
secrets[parts[0]] = bytes.fromhex(parts[-1]) written[0] = 0
finally: return 1
if orig_cb != NULL:
orig_cb(s, line)
def do_handshake_capturing_secrets(sslobj): cdef int bio_read_ex(
cdef: bio.BIO* b, char* data, size_t datal, size_t* readbytes
ssl.SSL* s = (<ssl.PySSLSocket *> sslobj._sslobj).ssl ) nogil:
ssl.SSL_CTX* ctx = ssl.SSL_get_SSL_CTX(s) with gil:
global orig_cb print('bio_read', datal, int(<int>data))
orig_cb = ssl.SSL_CTX_get_keylog_callback(ctx) bio.set_retry_read(b)
ssl.SSL_CTX_set_keylog_callback( readbytes[0] = 0
ctx, <ssl.SSL_CTX_keylog_cb_func>_capture_secrets return 1
)
try:
try: cdef long bio_ctrl(bio.BIO* b, int cmd, long num, void* ptr) nogil:
sslobj.do_handshake() cdef long ret = 0
except SSLWantReadError: with gil:
success = False if cmd == bio.BIO_CTRL_EOF:
print("BIO_CTRL_EOF", ret)
elif cmd == bio.BIO_CTRL_PUSH:
print("BIO_CTRL_PUSH", ret)
elif cmd == bio.BIO_CTRL_FLUSH:
ret = 1
print('BIO_CTRL_FLUSH', ret)
else: else:
success = True print('bio_ctrl', cmd, num)
if secrets: return ret
rv = dict(secrets)
secrets.clear()
else:
rv = {}
return success, rv
finally:
ssl.SSL_CTX_set_keylog_callback(ctx, orig_cb)
def hkdf_expand(pseudo_random_key, label, length, hash_method=hashlib.sha384): cdef int bio_create(bio.BIO* b) nogil:
''' cdef BIO* obj = <BIO*>PyMem_RawMalloc(sizeof(BIO))
Expand `pseudo_random_key` and `info` into a key of length `bytes` using if obj == NULL:
HKDF's expand function based on HMAC with the provided hash (default return 0
SHA-512). See the HKDF draft RFC and paper for usage notes. string.memset(obj, 0, sizeof(BIO))
''' bio.set_data(b, <void*>obj)
info = struct.pack("!HB", length, len(label)) + label + b'\0' bio.set_init(b, 1)
hash_len = hash_method().digest_size return 1
blocks_needed = length // hash_len + (0 if length % hash_len == 0 else 1) # ceil
okm = b""
output_block = b""
for counter in range(blocks_needed):
output_block = hmac.new(
pseudo_random_key,
(output_block + info + bytearray((counter + 1,))),
hash_method,
).digest()
okm += output_block
return okm[:length]
def enable_ulp(sock): cdef int bio_destroy(bio.BIO* b) nogil:
cdef char *tls = b"tls" cdef void* obj = bio.get_data(b)
if libc.setsockopt(sock.fileno(), socket.SOL_TCP, linux.TCP_ULP, tls, 4): if obj != NULL:
PyErr_SetFromErrno(IOError) PyMem_RawFree(obj)
return bio.set_shutdown(b, 1)
return 1
def get_state(sslobj): cdef object wrap_bio(
cdef: bio.BIO* b,
ssl.SSL* s = (<ssl.PySSLSocket*>sslobj._sslobj).ssl object ssl_context,
print(ssl.SSL_get_state(s)) bint server_side=False,
object server_hostname=None,
object session=None,
def upgrade_aes_gcm_256(sslobj, sock, secret, sending): ):
cdef: cdef pyssl.PySSLMemoryBIO* c_bio
ssl.SSL* s = (<ssl.PySSLSocket*>sslobj._sslobj).ssl py_bio = ssl.MemoryBIO()
linux.tls12_crypto_info_aes_gcm_256 crypto_info c_bio = <pyssl.PySSLMemoryBIO*>py_bio
char* seq c_bio.bio, b = b, c_bio.bio
rv = ssl_context.wrap_bio(
if sending: py_bio, py_bio, server_side, server_hostname, session
# s->rlayer->write_sequence
seq = <char*>((<void*>s) + 6112)
else:
# s->rlayer->read_sequence
seq = <char*>((<void*>s) + 6104)
# print(sslobj.cipher())
string.memset(&crypto_info, 0, sizeof(crypto_info))
crypto_info.info.cipher_type = linux.TLS_CIPHER_AES_GCM_256
crypto_info.info.version = ssl.SSL_version(s)
key = hkdf_expand(
secret,
b'tls13 key',
linux.TLS_CIPHER_AES_GCM_256_KEY_SIZE,
) )
string.memcpy( c_bio.bio, b = b, c_bio.bio
crypto_info.key, ssl_h.set_options(
<char*>key, (<pyssl.PySSLSocket*>rv._sslobj).ssl, ssl_h.OP_ENABLE_KTLS
linux.TLS_CIPHER_AES_GCM_256_KEY_SIZE,
) )
string.memcpy( return rv
crypto_info.rec_seq,
seq,
linux.TLS_CIPHER_AES_GCM_256_REC_SEQ_SIZE, def test():
) cdef BIO* b
iv = hkdf_expand( with nogil:
secret, b = bio.new(KTLS_BIO_METHOD)
b'tls13 iv', if b == NULL:
linux.TLS_CIPHER_AES_GCM_256_IV_SIZE + raise fromOpenSSLError(RuntimeError)
linux.TLS_CIPHER_AES_GCM_256_SALT_SIZE, ctx = ssl.create_default_context()
) return wrap_bio(b, ctx)
string.memcpy(
crypto_info.iv,
<char*>iv+ ssl.EVP_GCM_TLS_FIXED_IV_LEN, cdef bio.Method* KTLS_BIO_METHOD = bio.meth_new(
linux.TLS_CIPHER_AES_GCM_256_IV_SIZE, bio.get_new_index(), "kTLS BIO"
) )
string.memcpy( if not bio.meth_set_write_ex(KTLS_BIO_METHOD, bio_write_ex):
crypto_info.salt, raise fromOpenSSLError(ImportError)
<char*>iv, if not bio.meth_set_read_ex(KTLS_BIO_METHOD, bio_read_ex):
linux.TLS_CIPHER_AES_GCM_256_SALT_SIZE, raise fromOpenSSLError(ImportError)
) if not bio.meth_set_ctrl(KTLS_BIO_METHOD, bio_ctrl):
if libc.setsockopt( raise fromOpenSSLError(ImportError)
sock.fileno(), if not bio.meth_set_create(KTLS_BIO_METHOD, bio_create):
libc.SOL_TLS, raise fromOpenSSLError(ImportError)
linux.TLS_TX if sending else linux.TLS_RX, if not bio.meth_set_destroy(KTLS_BIO_METHOD, bio_destroy):
&crypto_info, raise fromOpenSSLError(ImportError)
sizeof(crypto_info),
):
PyErr_SetFromErrno(IOError)
return
# print(
# sending,
# "iv", crypto_info.iv[:linux.TLS_CIPHER_AES_GCM_256_IV_SIZE].hex(),
# "key", crypto_info.key[:linux.TLS_CIPHER_AES_GCM_256_KEY_SIZE].hex(),
# "salt", crypto_info.salt[:linux.TLS_CIPHER_AES_GCM_256_SALT_SIZE].hex(),
# "rec_seq", crypto_info.rec_seq[:linux.TLS_CIPHER_AES_GCM_256_REC_SEQ_SIZE].hex(),
# )

48
src/kloop/loop.pxd Normal file
View file

@ -0,0 +1,48 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
from cpython cimport PyErr_SetFromErrno
from cpython cimport PyMem_RawMalloc, PyMem_RawFree, PyMem_RawRealloc
from cpython cimport PyObject, Py_INCREF, Py_DECREF
from libc cimport errno, string
from posix cimport mman, unistd, time
from .includes cimport libc, linux, barrier
include "./handle.pxd"
include "./queue.pxd"
include "./heapq.pxd"
include "./uring.pxd"
cdef struct Loop:
bint stopping
Ring ring
HeapQueue scheduled
Queue ready
int timer_cancelled_count
PyObject* loop
cdef class KLoopImpl:
cdef:
bint closed
object thread_id
Loop loop
cdef inline check_closed(self)
cdef inline bint _is_running(self)
cdef inline check_running(self)
cdef inline Handle _call_soon(self, callback, args, context)
cdef inline _add_callback(self, Handle handle)
cdef inline TimerHandle _call_at(
self, long long when, callback, args, context
)

View file

@ -1,473 +0,0 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
import asyncio.events
import asyncio.futures
import asyncio.trsock
import asyncio.transports
import contextvars
import errno
import socket
import ssl
from . import uring, ktls
class Callback:
__slots__ = ("_callback", "_context", "_args", "_kwargs")
def __init__(self, callback, context=None, args=None, kwargs=None):
if context is None:
context = contextvars.copy_context()
self._callback = callback
self._context = context
self._args = args or ()
self._kwargs = kwargs or {}
def __call__(self):
self._context.run(self._callback, *self._args, **self._kwargs)
def __repr__(self):
return f"{self._callback} {self._args} {self._kwargs} {self._context}"
class KLoopSocketTransport(
asyncio.transports._FlowControlMixin, asyncio.Transport
):
__slots__ = (
"_waiter",
"_sock",
"_protocol",
"_closing",
"_recv_buffer",
"_recv_buffer_factory",
"_read_ready_cb",
"_buffers",
"_buffer_size",
"_current_work",
"_write_waiter",
"_read_paused",
)
def __init__(
self, loop, sock, protocol, waiter=None, extra=None, server=None
):
super().__init__(extra, loop)
self._extra["socket"] = asyncio.trsock.TransportSocket(sock)
try:
self._extra["sockname"] = sock.getsockname()
except OSError:
self._extra["sockname"] = None
if "peername" not in self._extra:
try:
self._extra["peername"] = sock.getpeername()
except socket.error:
self._extra["peername"] = None
self._buffers = []
self._buffer_size = 0
self._current_work = None
self._sock = sock
self._closing = False
self._write_waiter = None
self._read_paused = False
self.set_protocol(protocol)
self._waiter = waiter
self._loop.call_soon(self._protocol.connection_made, self)
self._loop.call_soon(self._read)
if self._waiter is not None:
self._loop.call_soon(
asyncio.futures._set_result_unless_cancelled,
self._waiter,
None,
)
def set_protocol(self, protocol):
if isinstance(protocol, asyncio.BufferedProtocol):
self._read_ready_cb = self._read_ready__buffer_updated
self._recv_buffer = None
self._recv_buffer_factory = protocol.get_buffer
else:
self._read_ready_cb = self._read_ready__data_received
self._recv_buffer = bytearray(64 * 1024 * 1024)
self._recv_buffer_factory = lambda _hint: self._recv_buffer
self._protocol = protocol
def _read(self):
# print("RecvMsgWork")
if self._read_paused:
return
self._loop._selector.submit(
uring.RecvMsgWork(
self._sock.fileno(),
[self._recv_buffer_factory(-1)],
self._read_ready_cb,
)
)
def _read_ready__buffer_updated(self, res, app_data):
if res < 0:
raise IOError
elif res == 0:
self._protocol.eof_received()
else:
try:
# print(f"buffer updated: {res}")
self._protocol.buffer_updated(res)
finally:
if not self._closing:
self._read()
def _read_ready__data_received(self, res, app_data):
print("_read_ready__data_received", res)
if res < 0:
if abs(res) == errno.EAGAIN:
print('EAGAIN')
self._read()
else:
raise IOError(f"{res}")
elif res == 0:
self._protocol.eof_received()
else:
try:
print(f"data received: {res}")
data = bytes(self._recv_buffer[:res])
# print(f"data received: {data}")
if app_data:
self._protocol.data_received(data)
finally:
if not self._closing:
self._read()
def _write_done(self, res):
# print("_write_done")
self._current_work = None
if res < 0:
# TODO: force close transport
raise IOError()
self._buffer_size -= res
if self._buffers:
if len(self._buffers) == 1:
self._current_work = uring.SendWork(
self._sock.fileno(), self._buffers[0], self._write_done
)
else:
self._current_work = uring.SendMsgWork(
self._sock.fileno(), self._buffers, self._write_done
)
self._loop._selector.submit(self._current_work)
# print("more SendWork")
self._buffers = []
elif self._closing:
self._loop.call_soon(self._call_connection_lost, None)
elif self._write_waiter is not None:
self._write_waiter()
self._write_waiter = None
self._maybe_resume_protocol()
def write(self, data):
self._buffer_size += len(data)
if self._current_work is None:
self._current_work = uring.SendWork(
self._sock.fileno(), data, self._write_done
)
self._loop._selector.submit(self._current_work)
# print("SendWork")
else:
self._buffers.append(data)
self._maybe_pause_protocol()
def close(self):
if self._closing:
return
self._closing = True
if self._current_work is None:
self._loop.call_soon(self._call_connection_lost, None)
def _call_connection_lost(self, exc):
try:
if self._protocol is not None:
self._protocol.connection_lost(exc)
finally:
self._sock.close()
self._sock = None
self._protocol = None
self._loop = None
def get_write_buffer_size(self):
return self._buffer_size
def pause_reading(self):
self._read_paused = True
def resume_reading(self):
if self._read_paused:
self._read_paused = False
self._read()
class KLoopSSLHandshakeProtocol(asyncio.Protocol):
__slots__ = (
"_incoming",
"_outgoing",
"_handshaking",
"_secrets",
"_app_protocol",
"_transport",
"_sslobj",
)
def __init__(self, sslcontext, server_hostname):
self._handshaking = True
self._secrets = {}
self._incoming = ssl.MemoryBIO()
self._outgoing = ssl.MemoryBIO()
self._sslobj = sslcontext.wrap_bio(
self._incoming,
self._outgoing,
server_side=False,
server_hostname=server_hostname,
)
def connection_made(self, transport):
self._transport = transport
self._handshake()
def data_received(self, data):
self._incoming.write(data)
self._handshake()
def _handshake(self):
ktls.get_state(self._sslobj)
success, secrets = ktls.do_handshake_capturing_secrets(self._sslobj)
self._secrets.update(secrets)
if success:
# print("handshake done")
if self._handshaking:
print(self._sslobj.cipher())
ktls.get_state(self._sslobj)
self._handshaking = False
try:
data = self._sslobj.read(64 * 1024)
except ssl.SSLWantReadError:
data = None
if data:
while True:
try:
data += self._sslobj.read(64 * 1024)
except ssl.SSLWantReadError:
break
print("try read", data)
self._transport._upgrade_ktls_read(
self._sslobj,
self._secrets["SERVER_TRAFFIC_SECRET_0"],
data,
)
if data := self._outgoing.read():
print("last message")
self._transport.write(data)
self._transport._write_waiter = self._after_last_write
# self._transport._write_waiter = lambda: self._transport._loop.call_later(1, self._after_last_write)
self._transport.pause_reading()
else:
self._after_last_write()
else:
assert False
# try:
# data = self._sslobj.read(64 * 1024)
# except ssl.SSLWantReadError:
# data = None
# if data:
# while True:
# try:
# data += self._sslobj.read(64 * 1024)
# except ssl.SSLWantReadError:
# break
# print("try read", data)
# # ktls.get_state(self._sslobj)
# self._after_last_write()
# self._transport._upgrade_ktls_read(
# self._sslobj,
# self._secrets["SERVER_TRAFFIC_SECRET_0"],
# data,
# )
else:
# print("SSLWantReadError")
if data := self._outgoing.read():
self._transport.write(data)
def _after_last_write(self):
print("_after_last_write")
ktls.get_state(self._sslobj)
# try:
# data = self._sslobj.read(64 * 1024)
# except ssl.SSLWantReadError:
# data = None
# if data:
# while True:
# try:
# data += self._sslobj.read(64 * 1024)
# except ssl.SSLWantReadError:
# break
# print("try read", data)
self._transport._upgrade_ktls_write(
self._sslobj,
self._secrets["CLIENT_TRAFFIC_SECRET_0"],
)
# self._transport._upgrade_ktls_read(
# self._sslobj,
# self._secrets["SERVER_TRAFFIC_SECRET_0"],
# data,
# )
self._transport.resume_reading()
class KLoopSSLTransport(KLoopSocketTransport):
__slots__ = ("_app_protocol",)
def __init__(
self,
loop,
sock,
protocol,
waiter=None,
extra=None,
server=None,
*,
sslcontext,
server_hostname,
):
ktls.enable_ulp(sock)
self._app_protocol = protocol
super().__init__(
loop,
sock,
KLoopSSLHandshakeProtocol(sslcontext, server_hostname),
None,
extra,
server,
)
self._waiter = waiter
def _upgrade_ktls_write(self, sslobj, secret):
print("_upgrade_ktls_write")
ktls.upgrade_aes_gcm_256(sslobj, self._sock, secret, True)
self.set_protocol(self._app_protocol)
self._loop.call_soon(self._app_protocol.connection_made, self)
if self._waiter is not None:
self._loop.call_soon(
asyncio.futures._set_result_unless_cancelled,
self._waiter,
None,
)
def _upgrade_ktls_read(self, sslobj, secret, data):
print("_upgrade_ktls_read")
ktls.upgrade_aes_gcm_256(sslobj, self._sock, secret, False)
# self.set_protocol(self._app_protocol)
# if data is not None:
# if data:
# self._app_protocol.data_received(data)
# else:
# self._app_protocol.eof_received()
class KLoop(asyncio.BaseEventLoop):
def __init__(self, args):
super().__init__()
self._selector = uring.Ring(*args)
def _process_events(self, works):
for work in works:
work.complete()
async def sock_connect(self, sock, address):
fut = self.create_future()
self._selector.submit(uring.ConnectWork(sock.fileno(), address, fut))
return await fut
async def getaddrinfo(
self, host, port, *, family=0, type=0, proto=0, flags=0
):
return socket.getaddrinfo(host, port, family, type, proto, flags)
def _make_socket_transport(
self, sock, protocol, waiter=None, *, extra=None, server=None
):
sock.setblocking(True)
return KLoopSocketTransport(
self, sock, protocol, waiter, extra, server
)
def _make_ssl_transport(
self,
rawsock,
protocol,
sslcontext,
waiter=None,
*,
server_side=False,
server_hostname=None,
extra=None,
server=None,
ssl_handshake_timeout=None,
call_connection_made=True,
):
if sslcontext is None:
sslcontext = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
return KLoopSSLTransport(
self,
rawsock,
protocol,
waiter,
extra,
server,
sslcontext=sslcontext,
server_hostname=server_hostname,
)
class KLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy):
__slots__ = ("_selector_args",)
def __init__(
self, queue_depth=128, sq_thread_idle=2000, sq_thread_cpu=None
):
super().__init__()
assert queue_depth in {
1,
2,
4,
8,
16,
32,
64,
128,
256,
512,
1024,
2048,
4096,
}
self._selector_args = (queue_depth, sq_thread_idle, sq_thread_cpu)
def _loop_factory(self):
return KLoop(self._selector_args)
# Child processes handling (Unix only).
def get_child_watcher(self):
raise NotImplementedError
def set_child_watcher(self, watcher):
raise NotImplementedError

535
src/kloop/loop.pyx Normal file
View file

@ -0,0 +1,535 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
import time as py_time
import asyncio
import contextvars
import functools
import inspect
import os
import reprlib
import threading
import traceback
cdef asyncio_isfuture = asyncio.isfuture
cdef asyncio_ensure_future = asyncio.ensure_future
cdef asyncio_set_running_loop = asyncio._set_running_loop
cdef asyncio_get_running_loop = asyncio._get_running_loop
cdef asyncio_Task = asyncio.Task
cdef asyncio_Future = asyncio.Future
cdef logger = asyncio.log.logger
cdef long long SECOND_NS = 1_000_000_000
cdef long long MAX_SELECT_TIMEOUT = 24 * 3600 * SECOND_NS
# Minimum number of scheduled timer handles before cleanup of
# cancelled handles is performed.
cdef int MIN_SCHEDULED_TIMER_HANDLES = 100
# Maximum ratio of cancelled handles is performed of scheduled timer handles
# that are cancelled before cleanup
cdef int MAX_CANCELLED_TIMER_HANDLES_RATIO = 2
include "./handle.pyx"
include "./queue.pyx"
include "./heapq.pyx"
include "./uring.pyx"
cdef long long monotonic_ns() nogil except -1:
cdef:
long long rv
time.timespec ts
if time.clock_gettime(time.CLOCK_MONOTONIC, &ts):
with gil:
PyErr_SetFromErrno(OSError)
return -1
rv = ts.tv_sec * SECOND_NS
return rv + ts.tv_nsec
cdef int loop_init(
Loop* loop, linux.__u32 depth, linux.io_uring_params* params
) nogil except 0:
if not queue_init(&loop.ready):
return 0
if not heapq_init(&loop.scheduled):
queue_uninit(&loop.ready)
return 0
if not ring_init(&loop.ring, depth, params):
queue_uninit(&loop.ready)
heapq_uninit(&loop.scheduled)
return 0
return 1
cdef int loop_uninit(Loop* loop) nogil except 0:
heapq_uninit(&loop.scheduled)
queue_uninit(&loop.ready)
return ring_uninit(&loop.ring)
cdef int loop_run_forever(Loop* loop) nogil except 0:
cdef:
Ring* ring = &loop.ring
Queue* ready = &loop.ready
HeapQueue* scheduled = &loop.scheduled
while True:
if not loop_run_once(loop, ring, ready, scheduled):
return 0
if loop.stopping:
break
return 1
cdef inline int filter_cancelled_calls(Loop* loop) nogil except 0:
cdef:
HeapQueue* scheduled = &loop.scheduled
HeapQueue heap, drop
Callback** array = scheduled.array
Callback* callback
int i = 0, size = scheduled.tail
if (
MIN_SCHEDULED_TIMER_HANDLES < size <
loop.timer_cancelled_count * MAX_CANCELLED_TIMER_HANDLES_RATIO
):
# Remove delayed calls that were cancelled if their number
# is too high
if not heapq_init(&drop):
return 0
if not heapq_init(&heap):
heapq_uninit(&drop)
return 0
while i < size:
callback = array[i]
if callback.mask & CANCELLED_MASK:
callback.mask &= ~SCHEDULED_MASK
if not heapq_push(&drop, callback, 0):
heap.tail = 0
heapq_uninit(&heap)
drop.tail = 0
heapq_uninit(&drop)
return 0
elif not heapq_push(&heap, callback, 0):
heap.tail = 0
heapq_uninit(&heap)
drop.tail = 0
heapq_uninit(&drop)
return 0
heapq_heapify(&heap)
heap, scheduled[0] = scheduled[0], heap
heap.tail = 0
heapq_uninit(&heap)
heapq_uninit(&drop)
elif array[0].mask & CANCELLED_MASK:
if not heapq_init(&drop):
return 0
while size:
callback = heapq_pop(scheduled)
if callback.mask & CANCELLED_MASK:
loop.timer_cancelled_count -= 1
callback.mask &= ~SCHEDULED_MASK
if not heapq_push(&drop, callback, 0):
with gil:
Py_DECREF(<object>callback.handle)
heapq_uninit(&drop)
return 0
if not array[0].mask & CANCELLED_MASK:
break
size -= 1
heapq_uninit(&drop)
return 1
cdef loop_run_ready(Queue* ready, int ntodo):
cdef Handle handle
while ntodo:
handle = queue_pop_py(ready)
if not handle.cb.mask & CANCELLED_MASK:
handle.run()
ntodo -= 1
handle = None
cdef inline int loop_run_once(
Loop* loop, Ring* ring, Queue* ready, HeapQueue* scheduled
) nogil except 0:
cdef:
Callback* callback
long long timeout = -1, now
int nready, res
void* data
if scheduled.tail:
if not filter_cancelled_calls(loop):
return 0
if ready.head >= 0 or loop.stopping:
timeout = 0
elif scheduled.tail:
timeout = min(
max(0, scheduled.array[0].when - monotonic_ns()),
MAX_SELECT_TIMEOUT,
)
nready = ring_select(ring, timeout)
if nready < 0:
return 0
while nready:
res = ring_cq_pop(&ring.cq, &data)
nready -= 1
now = monotonic_ns() + 1
while scheduled.tail and scheduled.array[0].when < now:
callback = heapq_pop(scheduled)
callback.mask &= ~SCHEDULED_MASK
if not queue_push(ready, callback):
if not heapq_push(scheduled, callback, 1):
with gil:
Py_DECREF(<object>callback.handle)
return 0
if ready.head >= 0:
with gil:
loop_run_ready(ready, queue_size(ready))
return 1
class KLoopPolicy(asyncio.events.BaseDefaultEventLoopPolicy):
__slots__ = ("_selector_args",)
def __init__(
self, queue_depth=128, sq_thread_idle=2000, sq_thread_cpu=None
):
super().__init__()
self._selector_args = (queue_depth, sq_thread_idle, sq_thread_cpu)
def _loop_factory(self):
return KLoop(*self._selector_args)
# Child processes handling (Unix only).
def get_child_watcher(self):
raise NotImplementedError
def set_child_watcher(self, watcher):
raise NotImplementedError
cdef class KLoopImpl:
def __init__(self, queue_depth, sq_thread_idle, sq_thread_cpu):
cdef:
linux.io_uring_params params
linux.__u32 depth
string.memset(&params, 0, sizeof(params))
params.flags = linux.IORING_SETUP_SQPOLL
params.sq_thread_idle = sq_thread_idle
if sq_thread_cpu is not None:
params.sq_thread_cpu = sq_thread_cpu
params.flags |= linux.IORING_SETUP_SQ_AFF
depth = queue_depth
self.loop.loop = <PyObject*>self
with nogil:
loop_init(&self.loop, depth, &params)
self.closed = False
self.thread_id = None
def __dealloc__(self):
with nogil:
loop_uninit(&self.loop)
cdef inline check_closed(self):
if self.closed:
raise RuntimeError('Event loop is closed')
cdef inline bint _is_running(self):
return self.thread_id is not None
cdef inline check_running(self):
if self._is_running():
raise RuntimeError('This event loop is already running')
if asyncio_get_running_loop() is not None:
raise RuntimeError(
'Cannot run the event loop while another loop is running')
def run_forever(self):
"""Run until stop() is called."""
self.check_closed()
self.check_running()
# self._set_coroutine_origin_tracking(self._debug)
self.thread_id = threading.get_ident()
# old_agen_hooks = sys.get_asyncgen_hooks()
# sys.set_asyncgen_hooks(firstiter=self._asyncgen_firstiter_hook,
# finalizer=self._asyncgen_finalizer_hook)
try:
asyncio_set_running_loop(self)
with nogil:
loop_run_forever(&self.loop)
finally:
self.loop.stopping = 0
self.thread_id = None
asyncio_set_running_loop(None)
# self._set_coroutine_origin_tracking(False)
# sys.set_asyncgen_hooks(*old_agen_hooks)
def run_until_complete(self, future):
self.check_closed()
self.check_running()
new_task = not asyncio_isfuture(future)
future = asyncio_ensure_future(future, loop=self)
if new_task:
# An exception is raised if the future didn't complete, so there
# is no need to log the "destroy pending task" message
future._log_destroy_pending = False
future.add_done_callback(_run_until_complete_cb)
try:
self.run_forever()
except:
if new_task and future.done() and not future.cancelled():
# The coroutine raised a BaseException. Consume the exception
# to not log a warning, the caller doesn't have access to the
# local task.
future.exception()
raise
finally:
future.remove_done_callback(_run_until_complete_cb)
if not future.done():
raise RuntimeError('Event loop stopped before Future completed.')
return future.result()
def create_task(self, coro, *, name=None):
self.check_closed()
# if self._task_factory is None:
task = asyncio_Task(coro, loop=self, name=name)
if task._source_traceback:
del task._source_traceback[-1]
# else:
# task = self._task_factory(self, coro)
# tasks._set_task_name(task, name)
return task
def stop(self):
self.loop.stopping = 1
def close(self):
if self.is_running():
raise RuntimeError("Cannot close a running event loop")
if self.closed:
return
# if self._debug:
# logger.debug("Close %r", self)
self.closed = True
# self.ready.clear()
# self._scheduled.clear()
# self._executor_shutdown_called = True
# executor = self._default_executor
# if executor is not None:
# self._default_executor = None
# executor.shutdown(wait=False)
def fileno(self):
return self.loop.ring.ring_fd
def is_running(self):
return self._is_running()
def get_debug(self):
return False
def call_soon(self, callback, *args, context=None):
cdef Handle handle
self.check_closed()
# if self._debug:
# self._check_thread()
# self._check_callback(callback, 'call_soon')
handle = self._call_soon(callback, args, context)
if handle.source_traceback:
del handle.source_traceback[-1]
return handle
def time(self):
return (<float>monotonic_ns()) / SECOND_NS
def call_later(self, delay, callback, *args, context=None):
cdef long long when = monotonic_ns()
when += delay * SECOND_NS
timer = self._call_at(when, callback, args, context)
if timer.source_traceback:
del timer.source_traceback[-1]
return timer
def call_at(self, when, callback, *args, context=None):
timer = self._call_at(when * SECOND_NS, callback, args, context)
if timer.source_traceback:
del timer.source_traceback[-1]
return timer
cdef inline TimerHandle _call_at(
self, long long when, callback, args, context
):
cdef TimerHandle timer
self.check_closed()
# if self._debug:
# self._check_thread()
# self._check_callback(callback, 'call_at')
timer = TimerHandle(when, callback, args, self, context)
heapq_push_py(&self.loop.scheduled, timer)
# else:
# heapq_heappush(self.heapq)
timer.cb.mask |= SCHEDULED_MASK
return timer
cdef inline Handle _call_soon(self, callback, args, context):
cdef Handle handle = Handle(callback, args, self, context)
self._add_callback(handle)
return handle
cdef inline _add_callback(self, Handle handle):
queue_push_py(&self.loop.ready, handle)
def default_exception_handler(self, context):
message = context.get('message')
if not message:
message = 'Unhandled exception in event loop'
exception = context.get('exception')
if exception is not None:
exc_info = (type(exception), exception, exception.__traceback__)
else:
exc_info = False
# if ('source_traceback' not in context and
# self._current_handle is not None and
# self._current_handle._source_traceback):
# context['handle_traceback'] = \
# self._current_handle._source_traceback
log_lines = [message]
for key in sorted(context):
if key in {'message', 'exception'}:
continue
value = context[key]
if key == 'source_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Object created at (most recent call last):\n'
value += tb.rstrip()
elif key == 'handle_traceback':
tb = ''.join(traceback.format_list(value))
value = 'Handle created at (most recent call last):\n'
value += tb.rstrip()
else:
value = repr(value)
log_lines.append(f'{key}: {value}')
logger.error('\n'.join(log_lines), exc_info=exc_info)
def call_exception_handler(self, context):
# if self._exception_handler is None:
try:
self.default_exception_handler(context)
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
# Second protection layer for unexpected errors
# in the default implementation, as well as for subclassed
# event loops with overloaded "default_exception_handler".
logger.error('Exception in default exception handler',
exc_info=True)
# else:
# try:
# self._exception_handler(self, context)
# except (SystemExit, KeyboardInterrupt):
# raise
# except BaseException as exc:
# # Exception in the user set custom exception handler.
# try:
# # Let's try default handler.
# self.default_exception_handler({
# 'message': 'Unhandled error in exception handler',
# 'exception': exc,
# 'context': context,
# })
# except (SystemExit, KeyboardInterrupt):
# raise
# except BaseException:
# # Guard 'default_exception_handler' in case it is
# # overloaded.
# logger.error('Exception in default exception handler '
# 'while handling an unexpected error '
# 'in custom exception handler',
# exc_info=True)
async def shutdown_asyncgens(self):
pass
async def shutdown_default_executor(self):
pass
def create_future(self):
return asyncio_Future(loop=self)
def _timer_handle_cancelled(self, handle):
pass
async def create_connection(
self,
protocol_factory,
host=None,
port=None,
*,
ssl=None,
family=0,
proto=0,
flags=0,
sock=None,
local_addr=None,
server_hostname=None,
ssl_handshake_timeout=None,
happy_eyeballs_delay=None,
interleave=None,
):
pass
class KLoop(KLoopImpl, asyncio.AbstractEventLoop):
pass
def _run_until_complete_cb(fut):
if not fut.cancelled():
exc = fut.exception()
if isinstance(exc, (SystemExit, KeyboardInterrupt)):
# Issue #22429: run_forever() already finished, no need to
# stop it.
return
_get_loop(fut).stop()
def _get_loop(fut):
# Tries to call Future.get_loop() if it's available.
# Otherwise fallbacks to using the old '_loop' property.
try:
get_loop = fut.get_loop
except AttributeError:
pass
else:
return get_loop()
return fut._loop

16
src/kloop/queue.pxd Normal file
View file

@ -0,0 +1,16 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef struct Queue:
Callback** array
int size
int head
int tail

165
src/kloop/queue.pyx Normal file
View file

@ -0,0 +1,165 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef int QUEUE_BLOCK_SIZE = 1024
cdef int queue_init(Queue* queue) nogil except 0:
queue.array = <Callback**>PyMem_RawMalloc(
sizeof(Callback*) * QUEUE_BLOCK_SIZE
)
if queue.array == NULL:
with gil:
raise MemoryError
queue.head = -1
queue.tail = 0
queue.size = QUEUE_BLOCK_SIZE
return 1
cdef void queue_uninit(Queue* queue) nogil:
cdef:
int i = queue.head, size = queue.size, tail = queue.tail
Callback** array = queue.array
if array == NULL:
return
if i >= 0:
with gil:
while True:
Py_DECREF(<object>array[i].handle)
i = (i + 1) % size
if i == tail:
break
PyMem_RawFree(array)
queue.array = NULL
cdef queue_push_py(Queue* queue, Handle handle):
cdef Callback* callback = &handle.cb
Py_INCREF(handle)
with nogil:
queue_push(queue, callback)
cdef int queue_push(Queue* queue, Callback* callback) nogil except 0:
cdef:
Callback** orig = queue.array
Callback** array = orig
int size = queue.size, head = queue.head, tail = queue.tail
if head == tail:
if head == 0:
tail = size
size += QUEUE_BLOCK_SIZE
array = <Callback**>PyMem_RawRealloc(
orig, sizeof(Callback*) * size
)
if array == NULL:
with gil:
raise MemoryError
else:
tail = size + QUEUE_BLOCK_SIZE
array = <Callback**>PyMem_RawMalloc(sizeof(Callback*) * tail)
if array == NULL:
with gil:
raise MemoryError
queue.array = array
string.memcpy(
array, orig + head, sizeof(Callback*) * (size - head)
)
string.memcpy(array + size - head, orig, sizeof(Callback*) * head)
size, tail = tail, size
queue.head = head = 0
PyMem_RawFree(orig)
queue.size = size
elif head < 0:
queue.head = head = 0
array[tail] = callback
queue.tail = (tail + 1) % size
return 1
cdef Handle queue_pop_py(Queue* queue):
cdef:
Handle handle
Callback* callback
with nogil:
callback = queue_pop(queue)
if callback == NULL:
return None
else:
handle = <Handle>callback.handle
Py_DECREF(handle)
return handle
cdef Callback* queue_pop(Queue* queue) nogil:
cdef:
int size = queue.size, head = queue.head, tail = queue.tail
Callback* rv
Callback** orig = queue.array
Callback** array = orig
if head < 0:
return NULL
rv = array[head]
queue.head = head = (head + 1) % size
if head == tail:
queue.head = -1
queue.tail = 0
if size > QUEUE_BLOCK_SIZE:
size = QUEUE_BLOCK_SIZE
if PyMem_RawRealloc(
array, sizeof(Callback*) * size
) != NULL:
queue.size = size
elif (head - tail) % size >= QUEUE_BLOCK_SIZE * 2:
if head < tail:
size -= QUEUE_BLOCK_SIZE
if tail > size:
tail -= head
string.memmove(array, array + head, sizeof(Callback*) * tail)
queue.tail = tail
queue.head = 0
if PyMem_RawRealloc(
array, sizeof(Callback*) * size
) != NULL:
queue.size = size
queue.tail = tail % size
else:
array = <Callback**>PyMem_RawMalloc(
sizeof(Callback*) * (size - QUEUE_BLOCK_SIZE)
)
if array != NULL:
string.memcpy(
array, orig + head, sizeof(Callback*) * (size - head)
)
string.memcpy(
array + size - head, orig, sizeof(Callback*) * tail
)
queue.array = array
queue.head = 0
queue.tail = (tail - head) % size
queue.size = size - QUEUE_BLOCK_SIZE
PyMem_RawFree(orig)
return rv
cdef int queue_size(Queue* queue) nogil:
cdef int size = queue.size, head = queue.head, tail = queue.tail
if head < 0:
return 0
elif head == tail:
return size
else:
return (tail - head) % size

View file

@ -8,108 +8,45 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
from .includes cimport linux, libc
cdef struct SubmissionQueue:
unsigned* khead
unsigned* ktail
unsigned* kring_mask
unsigned* kring_entries
unsigned* kflags
unsigned* kdropped
unsigned* array
linux.io_uring_sqe* sqes
cdef class RingQueue: unsigned sqe_head
cdef: unsigned sqe_tail
unsigned* head
unsigned* tail
unsigned* ring_mask
unsigned* ring_entries
unsigned* flags
size_t ring_size size_t ring_size
void* ring_ptr void* ring_ptr
cdef class SubmissionQueue(RingQueue): cdef struct CompletionQueue:
cdef: unsigned* khead
unsigned* dropped unsigned* ktail
unsigned* array unsigned* kring_mask
linux.io_uring_sqe* sqes unsigned* kring_entries
unsigned sqe_head unsigned* kflags
unsigned sqe_tail unsigned* koverflow
cdef init(self, linux.io_sqring_offsets sq_off)
cdef linux.io_uring_sqe * next_sqe(self)
cdef unsigned flush(self)
cdef class CompletionQueue(RingQueue):
cdef:
unsigned* overflow
linux.io_uring_cqe* cqes linux.io_uring_cqe* cqes
cdef init(self, linux.io_cqring_offsets cq_off) size_t ring_size
cdef unsigned ready(self) void* ring_ptr
cdef inline object pop_works(self, unsigned ready)
cdef class Ring: cdef struct Ring:
cdef:
SubmissionQueue sq SubmissionQueue sq
CompletionQueue cq CompletionQueue cq
unsigned flags
int ring_fd
unsigned features unsigned features
int fd int enter_ring_fd
int enter_fd linux.__u8 int_flags
linux.__u8 pad[3]
unsigned pad2
cdef class Work:
cdef:
readonly object fut
public bint link
int res
cdef void submit(self, linux.io_uring_sqe* sqe)
cdef inline void _submit(
self,
int op,
linux.io_uring_sqe * sqe,
int fd,
void * addr,
unsigned len,
linux.__u64 offset,
)
cdef class ConnectWork(Work):
cdef:
int fd
libc.sockaddr_in addr
object host_bytes
cdef class SendWork(Work):
cdef:
int fd
object data
char* data_ptr
linux.__u32 size
object callback
cdef class SendMsgWork(Work):
cdef:
int fd
list buffers
libc.msghdr msg
object callback
cdef class RecvWork(Work):
cdef:
int fd
object buffer
object callback
char* buffer_ptr
cdef class RecvMsgWork(Work):
cdef:
int fd
list buffers
libc.msghdr msg
object callback
object control_msg

View file

@ -8,431 +8,174 @@
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. # MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
import os
import socket
from cpython cimport Py_INCREF, Py_DECREF, PyErr_SetFromErrno
from cpython cimport PyMem_RawMalloc, PyMem_RawFree
from libc cimport errno, string
from posix cimport mman
from .includes cimport barrier, libc, linux, ssl
cdef linux.__u32 SIG_SIZE = libc._NSIG // 8 cdef linux.__u32 SIG_SIZE = libc._NSIG // 8
class SubmissionQueueFull(Exception): cdef int ring_init(
pass Ring* ring,
cdef class RingQueue:
def __cinit__(self, size_t ring_size):
self.ring_size = ring_size
cdef class SubmissionQueue(RingQueue):
cdef init(self, linux.io_sqring_offsets sq_off):
self.head = <unsigned*>(self.ring_ptr + sq_off.head)
self.tail = <unsigned*>(self.ring_ptr + sq_off.tail)
self.ring_mask = <unsigned*>(self.ring_ptr + sq_off.ring_mask)
self.ring_entries = <unsigned*>(self.ring_ptr + sq_off.ring_entries)
self.flags = <unsigned*>(self.ring_ptr + sq_off.flags)
self.dropped = <unsigned*>(self.ring_ptr + sq_off.dropped)
self.array = <unsigned*>(self.ring_ptr + sq_off.array)
cdef linux.io_uring_sqe* next_sqe(self):
cdef:
unsigned int head, next
linux.io_uring_sqe* rv
head = barrier.io_uring_smp_load_acquire(self.head)
next = self.sqe_tail + 1
if next - head <= self.ring_entries[0]:
rv = &self.sqes[self.sqe_tail & self.ring_mask[0]]
self.sqe_tail = next
return rv
else:
# TODO: IORING_ENTER_SQ_WAIT and retry
raise SubmissionQueueFull()
cdef unsigned flush(self):
cdef:
unsigned mask = self.ring_mask[0]
unsigned tail = self.tail[0]
unsigned to_submit = self.sqe_tail - self.sqe_head
if to_submit:
while to_submit:
self.array[tail & mask] = self.sqe_head & mask
tail += 1
self.sqe_head += 1
to_submit -= 1
barrier.io_uring_smp_store_release(self.tail, tail)
return tail - self.head[0]
cdef class CompletionQueue(RingQueue):
cdef init(self, linux.io_cqring_offsets cq_off):
self.head = <unsigned*>(self.ring_ptr + cq_off.head)
self.tail = <unsigned*>(self.ring_ptr + cq_off.tail)
self.ring_mask = <unsigned*>(self.ring_ptr + cq_off.ring_mask)
self.ring_entries = <unsigned*>(self.ring_ptr + cq_off.ring_entries)
self.overflow = <unsigned*>(self.ring_ptr + cq_off.overflow)
self.cqes = <linux.io_uring_cqe*>(self.ring_ptr + cq_off.cqes)
if cq_off.flags:
self.flags = <unsigned*>(self.ring_ptr + cq_off.flags)
cdef unsigned ready(self):
return barrier.io_uring_smp_load_acquire(self.tail) - self.head[0]
cdef inline object pop_works(self, unsigned ready):
cdef:
object rv = []
Work work
unsigned head, mask, last
linux.io_uring_cqe* cqe
head = self.head[0]
mask = self.ring_mask[0]
last = head + ready
while head != last:
cqe = self.cqes + (head & mask)
work = <Work><void*>cqe.user_data
work.res = cqe.res
rv.append(work)
Py_DECREF(work)
head += 1
barrier.io_uring_smp_store_release(self.head, self.head[0] + ready)
return rv
cdef class Ring:
def __cinit__(
self,
linux.__u32 queue_depth, linux.__u32 queue_depth,
linux.__u32 sq_thread_idle, linux.io_uring_params* params
object sq_thread_cpu, ) nogil except 0:
):
cdef:
linux.io_uring_params params
int fd
size_t size
void* ptr
# Prepare io_uring_params
string.memset(&params, 0, sizeof(params))
params.flags = linux.IORING_SETUP_SQPOLL
if sq_thread_cpu is not None:
params.flags |= linux.IORING_SETUP_SQ_AFF
params.sq_thread_cpu = sq_thread_cpu
params.sq_thread_idle = sq_thread_idle
# SYSCALL: SYS_io_uring_setup # SYSCALL: SYS_io_uring_setup
fd = libc.syscall(libc.SYS_io_uring_setup, queue_depth, &params) ring.ring_fd = ring.enter_ring_fd = libc.syscall(
if fd < 0: libc.SYS_io_uring_setup, queue_depth, params
)
if ring.ring_fd < 0:
with gil:
PyErr_SetFromErrno(IOError) PyErr_SetFromErrno(IOError)
return return 0
self.fd = self.enter_fd = fd
# Initialize 2 RingQueue and mmap the ring_ptr # mmap the ring_ptr
size = max( ring.sq.ring_size = ring.cq.ring_size = max(
params.sq_off.array + params.sq_entries * sizeof(unsigned), params.sq_off.array + params.sq_entries * sizeof(unsigned),
params.cq_off.cqes + params.cq_entries * sizeof(linux.io_uring_cqe) params.cq_off.cqes + params.cq_entries * sizeof(linux.io_uring_cqe)
) )
self.sq = SubmissionQueue(size) ring.sq.ring_ptr = ring.cq.ring_ptr = mman.mmap(
self.cq = CompletionQueue(size)
ptr = mman.mmap(
NULL, NULL,
size, ring.sq.ring_size,
mman.PROT_READ | mman.PROT_WRITE, mman.PROT_READ | mman.PROT_WRITE,
mman.MAP_SHARED | mman.MAP_POPULATE, mman.MAP_SHARED | mman.MAP_POPULATE,
fd, ring.ring_fd,
linux.IORING_OFF_SQ_RING, linux.IORING_OFF_SQ_RING,
) )
if ptr == mman.MAP_FAILED: if ring.sq.ring_ptr == mman.MAP_FAILED:
with gil:
PyErr_SetFromErrno(IOError) PyErr_SetFromErrno(IOError)
return return 0
self.sq.ring_ptr = self.cq.ring_ptr = ptr
# Initialize the SubmissionQueue # Initialize the SubmissionQueue
self.sq.init(params.sq_off) ring.sq.khead = <unsigned*>(ring.sq.ring_ptr + params.sq_off.head)
size = params.sq_entries * sizeof(linux.io_uring_sqe) ring.sq.ktail = <unsigned*>(ring.sq.ring_ptr + params.sq_off.tail)
ptr = mman.mmap( ring.sq.kring_mask = <unsigned*>(ring.sq.ring_ptr + params.sq_off.ring_mask)
ring.sq.kring_entries = <unsigned*>(ring.sq.ring_ptr + params.sq_off.ring_entries)
ring.sq.kflags = <unsigned*>(ring.sq.ring_ptr + params.sq_off.flags)
ring.sq.kdropped = <unsigned*>(ring.sq.ring_ptr + params.sq_off.dropped)
ring.sq.array = <unsigned*>(ring.sq.ring_ptr + params.sq_off.array)
ring.sq.sqes = <linux.io_uring_sqe*>mman.mmap(
NULL, NULL,
size, params.sq_entries * sizeof(linux.io_uring_sqe),
mman.PROT_READ | mman.PROT_WRITE, mman.PROT_READ | mman.PROT_WRITE,
mman.MAP_SHARED | mman.MAP_POPULATE, mman.MAP_SHARED | mman.MAP_POPULATE,
fd, ring.ring_fd,
linux.IORING_OFF_SQES, linux.IORING_OFF_SQES,
) )
if ptr == mman.MAP_FAILED: if ring.sq.sqes == mman.MAP_FAILED:
mman.munmap(self.sq.ring_ptr, self.sq.ring_size) mman.munmap(ring.sq.ring_ptr, ring.sq.ring_size)
with gil:
PyErr_SetFromErrno(IOError) PyErr_SetFromErrno(IOError)
return return 0
self.sq.sqes = <linux.io_uring_sqe*>ptr
# Initialize the CompletionQueue # Initialize the CompletionQueue
self.cq.init(params.cq_off) ring.cq.khead = <unsigned*>(ring.cq.ring_ptr + params.cq_off.head)
ring.cq.ktail = <unsigned*>(ring.cq.ring_ptr + params.cq_off.tail)
ring.cq.kring_mask = <unsigned*>(ring.cq.ring_ptr + params.cq_off.ring_mask)
ring.cq.kring_entries = <unsigned*>(ring.cq.ring_ptr + params.cq_off.ring_entries)
ring.cq.koverflow = <unsigned*>(ring.cq.ring_ptr + params.cq_off.overflow)
ring.cq.cqes = <linux.io_uring_cqe*>(ring.cq.ring_ptr + params.cq_off.cqes)
if params.cq_off.flags:
ring.cq.kflags = <unsigned*>(ring.cq.ring_ptr + params.cq_off.flags)
self.features = params.features return 1
def __dealloc__(self):
if self.sq is not None: cdef int ring_uninit(Ring* ring) nogil except 0:
if self.sq.sqes != NULL: if ring.sq.sqes != NULL:
mman.munmap( mman.munmap(
self.sq.sqes, self.sq.ring_entries[0] * sizeof(linux.io_uring_sqe) ring.sq.sqes,
ring.sq.kring_entries[0] * sizeof(linux.io_uring_sqe),
) )
if self.sq.ring_ptr != NULL: if ring.sq.ring_ptr != NULL:
mman.munmap(self.sq.ring_ptr, self.sq.ring_size) mman.munmap(ring.sq.ring_ptr, ring.sq.ring_size)
if self.fd: if ring.ring_fd:
os.close(self.fd) if unistd.close(ring.ring_fd) != 0:
with gil:
PyErr_SetFromErrno(IOError)
return 0
return 1
def submit(self, Work work):
cdef linux.io_uring_sqe* sqe = self.sq.next_sqe()
# print(f"submit: {work}")
work.submit(sqe)
def select(self, timeout): cdef inline unsigned ring_sq_flush(SubmissionQueue* sq) nogil:
cdef: cdef:
int flags = linux.IORING_ENTER_EXT_ARG, ret unsigned mask = sq.kring_mask[0]
bint need_enter = False unsigned tail = sq.ktail[0]
unsigned to_submit = sq.sqe_tail - sq.sqe_head
if to_submit:
while to_submit:
sq.array[tail & mask] = sq.sqe_head & mask
tail += 1
sq.sqe_head += 1
to_submit -= 1
barrier.io_uring_smp_store_release(sq.ktail, tail)
return tail - sq.khead[0]
cdef int ring_select(Ring* ring, long long timeout) nogil except -1:
cdef:
int flags = linux.IORING_ENTER_EXT_ARG
bint need_enter = 0
unsigned submit, ready unsigned submit, ready
unsigned wait_nr = 0 unsigned wait_nr = 0
linux.io_uring_getevents_arg arg linux.io_uring_getevents_arg arg
linux.__kernel_timespec ts linux.__kernel_timespec ts
CompletionQueue* cq = &ring.cq
SubmissionQueue* sq = &ring.sq
# Call enter if we have no CQE ready and timeout is not 0, or else we # Call enter if we have no CQE ready and timeout is not 0, or else we
# handle the ready CQEs first. # handle the ready CQEs first.
ready = self.cq.ready() ready = barrier.io_uring_smp_load_acquire(cq.ktail) - cq.khead[0]
if not ready and timeout is not 0: if not ready and timeout != 0:
flags |= linux.IORING_ENTER_GETEVENTS flags |= linux.IORING_ENTER_GETEVENTS
if timeout is not None: if timeout > 0:
ts.tv_sec = int(timeout) ts.tv_sec = timeout // SECOND_NS
ts.tv_nsec = int(round((timeout - ts.tv_sec) * 1_000_000_000)) ts.tv_nsec = timeout % SECOND_NS
arg.ts = <linux.__u64>&ts arg.ts = <linux.__u64> &ts
wait_nr = 1 wait_nr = 1
need_enter = True need_enter = 1
# Flush the submission queue, and only wakeup the SQ polling thread if # Flush the submission queue, and only wakeup the SQ polling thread if
# there is something for the kernel to handle. # there is something for the kernel to handle.
submit = self.sq.flush() submit = ring_sq_flush(sq)
if submit: if submit:
barrier.io_uring_smp_mb() barrier.io_uring_smp_mb()
if barrier.IO_URING_READ_ONCE( if barrier.IO_URING_READ_ONCE(
self.sq.flags[0] sq.kflags[0]
) & linux.IORING_SQ_NEED_WAKEUP: ) & linux.IORING_SQ_NEED_WAKEUP:
arg.ts = 0 arg.ts = 0
flags |= linux.IORING_ENTER_SQ_WAKEUP flags |= linux.IORING_ENTER_SQ_WAKEUP
need_enter = True need_enter = 1
if need_enter: if need_enter:
arg.sigmask = 0 arg.sigmask = 0
arg.sigmask_sz = SIG_SIZE arg.sigmask_sz = SIG_SIZE
# print(f"SYS_io_uring_enter(submit={submit}, wait_nr={wait_nr}, " if libc.syscall(
# f"flags={flags:b}, timeout={timeout})")
with nogil:
ret = libc.syscall(
libc.SYS_io_uring_enter, libc.SYS_io_uring_enter,
self.enter_fd, ring.enter_ring_fd,
submit, submit,
wait_nr, wait_nr,
flags, flags,
&arg, &arg,
sizeof(arg), sizeof(arg),
) ) < 0:
if ret < 0:
if errno.errno != errno.ETIME: if errno.errno != errno.ETIME:
print(f"SYS_io_uring_enter(submit={submit}, wait_nr={wait_nr}, " with gil:
f"flags={flags:b}, timeout={timeout})")
PyErr_SetFromErrno(IOError) PyErr_SetFromErrno(IOError)
return return -1
ready = self.cq.ready() ready = barrier.io_uring_smp_load_acquire(cq.ktail) - cq.khead[0]
if ready: return ready
return self.cq.pop_works(ready)
else:
return []
cdef class Work: cdef inline int ring_cq_pop(CompletionQueue* cq, void** data) nogil:
def __init__(self, fut):
self.fut = fut
self.link = False
self.res = -1
cdef void submit(self, linux.io_uring_sqe* sqe):
raise NotImplementedError
cdef inline void _submit(
self,
int op,
linux.io_uring_sqe * sqe,
int fd,
void* addr,
unsigned len,
linux.__u64 offset,
):
string.memset(sqe, 0, sizeof(linux.io_uring_sqe))
sqe.opcode = <linux.__u8> op
sqe.fd = fd
sqe.off = offset
sqe.addr = <unsigned long> addr
sqe.len = len
if self.link:
sqe.flags = linux.IOSQE_IO_LINK
else:
sqe.flags = 0
sqe.user_data = <linux.__u64><void*>self
Py_INCREF(self)
def complete(self):
if self.res == 0:
self.fut.set_result(None)
else:
def _raise():
errno.errno = abs(self.res)
PyErr_SetFromErrno(IOError)
try:
_raise()
except IOError as ex:
self.fut.set_exception(ex)
cdef class ConnectWork(Work):
def __init__(self, int fd, sockaddr, fut):
cdef char* host
super().__init__(fut)
self.fd = fd
host_str, port = sockaddr
self.host_bytes = host_str.encode()
host = self.host_bytes
string.memset(&self.addr, 0, sizeof(self.addr))
self.addr.sin_family = socket.AF_INET
if not libc.inet_pton(socket.AF_INET, host, &self.addr.sin_addr):
PyErr_SetFromErrno(IOError)
return
self.addr.sin_port = libc.htons(port)
cdef void submit(self, linux.io_uring_sqe* sqe):
self._submit(
linux.IORING_OP_CONNECT,
sqe,
self.fd,
&self.addr,
0,
sizeof(self.addr),
)
cdef class SendWork(Work):
def __init__(self, int fd, data, callback):
self.fd = fd
self.data = data
self.data_ptr = data
self.size = len(data)
self.callback = callback
cdef void submit(self, linux.io_uring_sqe* sqe):
self._submit(linux.IORING_OP_SEND, sqe, self.fd, self.data_ptr, self.size, 0)
def complete(self):
self.callback(self.res)
cdef class SendMsgWork(Work):
def __init__(self, int fd, buffers, callback):
self.fd = fd
self.buffers = buffers
self.callback = callback
self.msg.msg_iov = <libc.iovec*>PyMem_RawMalloc(
sizeof(libc.iovec) * len(buffers)
)
if self.msg.msg_iov == NULL:
raise MemoryError
self.msg.msg_iovlen = len(buffers)
for i, buf in enumerate(buffers):
self.msg.msg_iov[i].iov_base = <char*>buf
self.msg.msg_iov[i].iov_len = len(buf)
def __dealloc__(self):
if self.msg.msg_iov != NULL:
PyMem_RawFree(self.msg.msg_iov)
cdef void submit(self, linux.io_uring_sqe* sqe):
self._submit(linux.IORING_OP_SENDMSG, sqe, self.fd, &self.msg, 1, 0)
def complete(self):
if self.res < 0:
errno.errno = abs(self.res)
PyErr_SetFromErrno(IOError)
return
self.callback(self.res)
cdef class RecvWork(Work):
def __init__(self, int fd, buffer, callback):
self.fd = fd
self.buffer = buffer
self.callback = callback
self.buffer_ptr = <char*>buffer
cdef void submit(self, linux.io_uring_sqe* sqe):
self._submit(
linux.IORING_OP_RECV, sqe, self.fd, self.buffer_ptr, len(self.buffer), 0
)
def complete(self):
if self.res < 0:
errno.errno = abs(self.res)
PyErr_SetFromErrno(IOError)
return
self.callback(self.res)
cdef class RecvMsgWork(Work):
def __init__(self, int fd, buffers, callback):
cdef size_t size = libc.CMSG_SPACE(sizeof(unsigned char))
self.fd = fd
self.buffers = buffers
self.callback = callback
self.msg.msg_iov = <libc.iovec*>PyMem_RawMalloc(
sizeof(libc.iovec) * len(buffers)
)
if self.msg.msg_iov == NULL:
raise MemoryError
self.msg.msg_iovlen = len(buffers)
for i, buf in enumerate(buffers):
self.msg.msg_iov[i].iov_base = <char*>buf
self.msg.msg_iov[i].iov_len = len(buf)
self.control_msg = bytearray(size)
self.msg.msg_control = <char*>self.control_msg
self.msg.msg_controllen = size
def __dealloc__(self):
if self.msg.msg_iov != NULL:
PyMem_RawFree(self.msg.msg_iov)
cdef void submit(self, linux.io_uring_sqe* sqe):
self._submit(linux.IORING_OP_RECVMSG, sqe, self.fd, &self.msg, 1, 0)
def complete(self):
cdef: cdef:
libc.cmsghdr* cmsg unsigned head
unsigned char* cmsg_data linux.io_uring_cqe* cqe
unsigned char record_type int res
# if self.res < 0: head = cq.khead[0]
# errno.errno = abs(self.res) cqe = cq.cqes + (head & cq.kring_mask[0])
# PyErr_SetFromErrno(IOError) data[0] = <void*>cqe.user_data
# return res = cqe.res
app_data = True barrier.io_uring_smp_store_release(cq.khead, head + 1)
if self.msg.msg_controllen: return res
print('msg_controllen:', self.msg.msg_controllen)
cmsg = libc.CMSG_FIRSTHDR(&self.msg)
if cmsg.cmsg_level == libc.SOL_TLS and cmsg.cmsg_type == linux.TLS_GET_RECORD_TYPE:
cmsg_data = libc.CMSG_DATA(cmsg)
record_type = (<unsigned char*>cmsg_data)[0]
if record_type != ssl.SSL3_RT_APPLICATION_DATA:
app_data = False
print(f'cmsg.len={cmsg.cmsg_len}, cmsg.level={cmsg.cmsg_level}, cmsg.type={cmsg.cmsg_type}')
print(f'record type: {record_type}')
print('flags:', self.msg.msg_flags)
self.callback(self.res, app_data)