1
0
Fork 0
mirror of https://gitee.com/fantix/kloop.git synced 2024-11-28 13:21:05 +00:00

Socket connect PoC

This commit is contained in:
Fantix King 2022-04-25 08:02:28 -04:00
parent bb7c378c95
commit 64b105657a
No known key found for this signature in database
GPG key ID: 95304B04071CCDB4
7 changed files with 193 additions and 13 deletions

View file

@ -24,9 +24,6 @@ cdef extern from "sys/socket.h" nogil:
ctypedef int socklen_t
int SOL_TLS
int setsockopt(int socket, int level, int option_name,
const void *option_value, socklen_t option_len);
struct in_addr:
pass
@ -52,6 +49,15 @@ cdef extern from "sys/socket.h" nogil:
unsigned char* CMSG_DATA(cmsghdr* cmsg)
size_t CMSG_SPACE(size_t length)
int socket(int domain, int type, int protocol)
int setsockopt(
int socket,
int level,
int option_name,
const void* option_value,
socklen_t option_len,
)
cdef extern from "arpa/inet.h" nogil:
int inet_pton(int af, char* src, void* dst)

View file

@ -21,6 +21,7 @@ include "./handle.pxd"
include "./queue.pxd"
include "./heapq.pxd"
include "./uring.pxd"
include "./tcp.pxd"
cdef struct Loop:
@ -38,6 +39,7 @@ cdef class KLoopImpl:
object thread_id
Loop loop
cpdef create_future(self)
cdef inline check_closed(self)
cdef inline bint _is_running(self)
cdef inline check_running(self)

View file

@ -16,6 +16,7 @@ import functools
import inspect
import os
import reprlib
import socket
import threading
import traceback
@ -42,6 +43,7 @@ include "./handle.pyx"
include "./queue.pyx"
include "./heapq.pyx"
include "./uring.pyx"
include "./tcp.pyx"
cdef long long monotonic_ns() nogil except -1:
@ -169,8 +171,8 @@ cdef inline int loop_run_once(
cdef:
Callback* callback
long long timeout = -1, now
int nready, res
void* data
int nready
RingCallback* cb
if scheduled.tail:
if not filter_cancelled_calls(loop):
@ -188,7 +190,9 @@ cdef inline int loop_run_once(
if nready < 0:
return 0
while nready:
res = ring_cq_pop(&ring.cq, &data)
ring_cq_pop(&ring.cq, &cb)
if cb != NULL and not cb.callback(cb):
return 0
nready -= 1
now = monotonic_ns() + 1
@ -483,7 +487,7 @@ cdef class KLoopImpl:
async def shutdown_default_executor(self):
pass
def create_future(self):
cpdef create_future(self):
return asyncio_Future(loop=self)
def _timer_handle_cancelled(self, handle):
@ -506,7 +510,11 @@ cdef class KLoopImpl:
happy_eyeballs_delay=None,
interleave=None,
):
pass
cdef TCPTransport transport = TCPTransport.new(protocol_factory, self)
r = socket.getaddrinfo(host, port)[0]
host, port = r[-1]
waiter = transport.connect(host, port)
return transport, await waiter
class KLoop(KLoopImpl, asyncio.AbstractEventLoop):

34
src/kloop/tcp.pxd Normal file
View file

@ -0,0 +1,34 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef struct TCPConnect:
int fd
libc.sockaddr_in addr
RingCallback ring_cb
Loop* loop
Callback* cb
cdef class TCPTransport:
cdef:
KLoopImpl loop
int fd
TCPConnect connector
object waiter
object protocol_factory
object host_bytes
Handle handle
@staticmethod
cdef TCPTransport new(object protocol_factory, KLoopImpl loop)
cdef connect(self, host, port)
cdef connect_cb(self)

76
src/kloop/tcp.pyx Normal file
View file

@ -0,0 +1,76 @@
# Copyright (c) 2022 Fantix King http://fantix.pro
# kLoop is licensed under Mulan PSL v2.
# You can use this software according to the terms and conditions of the Mulan PSL v2.
# You may obtain a copy of Mulan PSL v2 at:
# http://license.coscl.org.cn/MulanPSL2
# THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
# EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
# MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
# See the Mulan PSL v2 for more details.
cdef int tcp_connect(TCPConnect* connector) nogil:
return ring_sq_submit_connect(
&connector.loop.ring.sq,
connector.fd,
&connector.addr,
&connector.ring_cb,
)
cdef int tcp_connect_cb(RingCallback* cb) nogil except 0:
cdef TCPConnect* connector = <TCPConnect*>cb.data
return queue_push(&connector.loop.ready, connector.cb)
cdef class TCPTransport:
@staticmethod
cdef TCPTransport new(object protocol_factory, KLoopImpl loop):
cdef TCPTransport rv = TCPTransport.__new__(TCPTransport)
rv.protocol_factory = protocol_factory
rv.loop = loop
return rv
cdef connect(self, host, port):
cdef:
int fd
TCPConnect* c = &self.connector
fd = libc.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
if fd == -1:
PyErr_SetFromErrno(IOError)
return
self.host_bytes = host.encode()
if not libc.inet_pton(
socket.AF_INET, <char*>self.host_bytes, &c.addr.sin_addr
):
PyErr_SetFromErrno(IOError)
return
c.addr.sin_family = socket.AF_INET
c.addr.sin_port = libc.htons(port)
c.fd = self.fd = fd
c.loop = &self.loop.loop
c.ring_cb.callback = tcp_connect_cb
c.ring_cb.data = c
self.handle = Handle(self.connect_cb, (self,), self.loop, None)
c.cb = &self.handle.cb
if not tcp_connect(c):
raise ValueError("Submission queue is full!")
self.waiter = self.loop.create_future()
return self.waiter
cdef connect_cb(self):
if self.connector.ring_cb.res != 0:
try:
errno.errno = abs(self.connector.ring_cb.res)
PyErr_SetFromErrno(IOError)
except IOError as e:
self.waiter.set_exception(e)
return
protocol = self.protocol_factory()
self.waiter.set_result(protocol)
self.loop.call_soon(protocol.connection_made, self)
def get_extra_info(self, x):
return None

View file

@ -50,3 +50,9 @@ cdef struct Ring:
linux.__u8 int_flags
linux.__u8 pad[3]
unsigned pad2
cdef struct RingCallback:
void* data
int res
int (*callback)(RingCallback* cb) nogil except 0

View file

@ -168,14 +168,62 @@ cdef int ring_select(Ring* ring, long long timeout) nogil except -1:
return ready
cdef inline int ring_cq_pop(CompletionQueue* cq, void** data) nogil:
cdef inline void ring_cq_pop(CompletionQueue* cq, RingCallback** callback) nogil:
cdef:
unsigned head
linux.io_uring_cqe* cqe
int res
RingCallback* ret
head = cq.khead[0]
cqe = cq.cqes + (head & cq.kring_mask[0])
data[0] = <void*>cqe.user_data
res = cqe.res
ret = <RingCallback*>cqe.user_data
ret.res = cqe.res
callback[0] = ret
barrier.io_uring_smp_store_release(cq.khead, head + 1)
return res
cdef inline int ring_sq_submit(
SubmissionQueue* sq,
linux.__u8 op,
int fd,
unsigned long addr,
unsigned len,
linux.__u64 offset,
bint link,
RingCallback* callback,
) nogil:
cdef:
unsigned int head, next
linux.io_uring_sqe* sqe
head = barrier.io_uring_smp_load_acquire(sq.khead)
next = sq.sqe_tail + 1
if next - head <= sq.kring_entries[0]:
sqe = &sq.sqes[sq.sqe_tail & sq.kring_mask[0]]
sq.sqe_tail = next
string.memset(sqe, 0, sizeof(linux.io_uring_sqe))
sqe.opcode = op
sqe.fd = fd
sqe.off = offset
sqe.addr = addr
sqe.len = len
if link:
sqe.flags = linux.IOSQE_IO_LINK
sqe.user_data = <linux.__u64>callback
return 1
else:
return 0
cdef int ring_sq_submit_connect(
SubmissionQueue* sq, int fd, libc.sockaddr_in* addr, RingCallback* callback
) nogil:
return ring_sq_submit(
sq,
linux.IORING_OP_CONNECT,
fd,
<unsigned long>addr,
0,
sizeof(addr[0]),
0,
callback,
)