From 91691e5996d25a9f17cef01d0213ca1e76f4b4f8 Mon Sep 17 00:00:00 2001 From: Fantix King Date: Sat, 21 May 2022 13:04:34 -0400 Subject: [PATCH] Implement UDP recv, basic DNS working! --- resolver/src/kloop.rs | 12 ++++++-- resolver/src/runtime.rs | 62 +++++++++++++++++++++++++++++++++++++---- src/kloop/udp.pxd | 3 +- src/kloop/udp.pyx | 59 +++++++++++++++++++++++++++++++-------- src/kloop/uring.pyx | 17 +++++++++++ 5 files changed, 133 insertions(+), 20 deletions(-) diff --git a/resolver/src/kloop.rs b/resolver/src/kloop.rs index a9c91db..5876098 100644 --- a/resolver/src/kloop.rs +++ b/resolver/src/kloop.rs @@ -30,7 +30,7 @@ pub struct CResolve { } #[repr(C)] -pub struct UDPSend { +pub struct UDPAction { _data: [u8; 0], _marker: marker::PhantomData<(*mut u8, marker::PhantomPinned)>, } @@ -41,7 +41,7 @@ extern "C" { pub fn resolve_prep_addr(resolve: *const CResolve) -> *mut libc::sockaddr; pub fn resolve_done_cb(resolve: *const CResolve) -> libc::c_int; pub fn udp_bind(addr: *const libc::sockaddr, addrlen: libc::socklen_t) -> libc::c_int; - pub fn udp_send_init(fd: libc::c_int, resolver: *const CResolver) -> libc::c_ulong; + pub fn udp_action_init(fd: libc::c_int, resolver: *const CResolver) -> libc::c_ulong; pub fn udp_send_poll( send: libc::c_ulong, data: *const u8, @@ -50,4 +50,12 @@ extern "C" { addrlen: libc::socklen_t, waker: *mut Waker, ) -> libc::c_int; + pub fn udp_recv_poll( + recv: libc::c_ulong, + data: *mut u8, + datalen: libc::size_t, + waker: *mut Waker, + ) -> libc::c_int; + pub fn udp_get_addr(action: libc::c_ulong) -> *const libc::sockaddr; + pub fn udp_action_free(action: libc::c_ulong); } diff --git a/resolver/src/runtime.rs b/resolver/src/runtime.rs index 1bad4a7..df5c9a6 100644 --- a/resolver/src/runtime.rs +++ b/resolver/src/runtime.rs @@ -18,11 +18,12 @@ use futures_io::{AsyncRead, AsyncWrite}; use libc::{sockaddr, socklen_t}; use std::fmt::Debug; use std::future::Future; -use std::net::SocketAddr; +use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6}; use std::pin::Pin; use std::task::{Context, Poll, Waker}; use std::time::Duration; use std::{io, mem}; +use futures_util::future::Ready; use trust_dns_proto::error::ProtoError; use trust_dns_proto::tcp::{Connect, DnsTcpStream}; use trust_dns_proto::udp::UdpSocket; @@ -99,6 +100,7 @@ impl DnsTcpStream for KLoopTcp { pub struct KLoopUdp { fd: libc::c_int, send: libc::c_ulong, + recv: libc::c_ulong, } #[async_trait] @@ -112,8 +114,9 @@ impl UdpSocket for KLoopUdp { let resolver = resolver.borrow().unwrap(); let resolver = unsafe { resolver.as_ref() }.unwrap(); let resolver = resolver.c_resolver; - let send = unsafe { kloop::udp_send_init(fd, resolver) }; - Ok(KLoopUdp { fd, send }) + let send = unsafe { kloop::udp_action_init(fd, resolver) }; + let recv = unsafe { kloop::udp_action_init(fd, resolver) }; + Ok(KLoopUdp { fd, send, recv }) }) } @@ -122,8 +125,22 @@ impl UdpSocket for KLoopUdp { cx: &mut Context<'_>, buf: &mut [u8], ) -> Poll> { - println!("TODO: poll_recv_from"); - todo!() + let waker = Box::new(cx.waker().clone()); + match unsafe { + kloop::udp_recv_poll( + self.recv, + buf.as_mut_ptr(), + buf.len(), + Box::into_raw(waker), + ) + } { + res if res > 0 => { + Poll::Ready(unsafe { + ptr_to_socket_addr(kloop::udp_get_addr(self.recv)) + }.map(|addr| (res as usize, addr))) + } + _ => Poll::Pending, + } } fn poll_send_to( @@ -154,6 +171,15 @@ impl UdpSocket for KLoopUdp { } } +impl Drop for KLoopUdp { + fn drop(&mut self) { + unsafe { + kloop::udp_action_free(self.send); + kloop::udp_action_free(self.recv); + } + } +} + fn socket_addr_as_ptr(addr: SocketAddr) -> (*const sockaddr, socklen_t) { match addr { SocketAddr::V4(ref a) => ( @@ -167,6 +193,32 @@ fn socket_addr_as_ptr(addr: SocketAddr) -> (*const sockaddr, socklen_t) { } } + +unsafe fn ptr_to_socket_addr( + addr: *const libc::sockaddr, +) -> io::Result { + match (*addr).sa_family as libc::c_int { + libc::AF_INET => { + let addr: &libc::sockaddr_in = &*(addr as *const libc::sockaddr_in); + let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes()); + let port = u16::from_be(addr.sin_port); + Ok(SocketAddr::V4(SocketAddrV4::new(ip, port))) + } + libc::AF_INET6 => { + let addr: &libc::sockaddr_in6 = &*(addr as *const libc::sockaddr_in6); + let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr); + let port = u16::from_be(addr.sin6_port); + Ok(SocketAddr::V6(SocketAddrV6::new( + ip, + port, + addr.sin6_flowinfo, + addr.sin6_scope_id, + ))) + } + _ => Err(io::ErrorKind::InvalidInput.into()), + } +} + #[derive(Clone, Copy)] pub struct KLoopHandle; diff --git a/src/kloop/udp.pxd b/src/kloop/udp.pxd index fb55ed9..dd286a0 100644 --- a/src/kloop/udp.pxd +++ b/src/kloop/udp.pxd @@ -9,10 +9,11 @@ # See the Mulan PSL v2 for more details. -cdef struct UDPSend: +cdef struct UDPAction: int fd libc.iovec vec libc.msghdr msg RingCallback callback CResolver* resolver void* rust_waker + libc.sockaddr addr diff --git a/src/kloop/udp.pyx b/src/kloop/udp.pyx index 0c205b1..b134a5f 100644 --- a/src/kloop/udp.pyx +++ b/src/kloop/udp.pyx @@ -19,41 +19,43 @@ cdef extern int udp_bind(libc.sockaddr* addr, libc.socklen_t addrlen) nogil: return fd -cdef extern unsigned long udp_send_init(int fd, CResolver* resolver) nogil: - cdef UDPSend* rv - rv = PyMem_RawMalloc(sizeof(UDPSend)) +cdef extern unsigned long udp_action_init(int fd, CResolver* resolver) nogil: + cdef UDPAction* rv + rv = PyMem_RawMalloc(sizeof(UDPAction)) if rv == NULL: return 0 - string.memset(rv, 0, sizeof(UDPSend)) + string.memset(rv, 0, sizeof(UDPAction)) rv.fd = fd rv.resolver = resolver rv.msg.msg_iov = &rv.vec rv.msg.msg_iovlen = 1 rv.callback.data = rv - rv.callback.callback = udp_send_cb + rv.callback.callback = udp_action_cb + rv.msg.msg_name = &rv.addr + rv.msg.msg_namelen = sizeof(rv.addr) return rv -cdef int udp_send_cb(RingCallback* cb) nogil except 0: - cdef UDPSend* send = cb.data - waker_wake(send.rust_waker) - resolver_run_until_stalled(send.resolver.rust_resolver) +cdef int udp_action_cb(RingCallback* cb) nogil except 0: + cdef UDPAction* action = cb.data + waker_wake(action.rust_waker) + resolver_run_until_stalled(action.resolver.rust_resolver) return 1 cdef extern int udp_send_poll( unsigned long send_in, - char* data, + const char* data, size_t datalen, libc.sockaddr* addr, libc.socklen_t addrlen, void* waker, ) nogil: - cdef UDPSend* send = send_in + cdef UDPAction* send = send_in if send.vec.iov_base == NULL: send.vec.iov_base = data send.vec.iov_len = datalen - send.msg.msg_name = addr + send.addr = addr[0] send.msg.msg_namelen = addrlen send.rust_waker = waker return ring_sq_submit_sendmsg( @@ -67,3 +69,36 @@ cdef extern int udp_send_poll( if send.vec.iov_base != data or send.vec.iov_len != datalen: return -1 return send.callback.res or -1 + + +cdef extern int udp_recv_poll( + unsigned long recv_in, + char* data, + size_t datalen, + void* waker, +) nogil: + cdef UDPAction* recv = recv_in + if recv.vec.iov_base == NULL: + recv.vec.iov_base = data + recv.vec.iov_len = datalen + recv.rust_waker = waker + return ring_sq_submit_recvmsg( + &recv.resolver.loop.ring.sq, + recv.fd, + &recv.msg, + &recv.callback, + ) - 1 + else: + waker_forget(waker) + if recv.vec.iov_base != data or recv.vec.iov_len != datalen: + return -1 + return recv.callback.res or -1 + + +cdef extern libc.sockaddr* udp_get_addr(unsigned long recv_in) nogil: + cdef UDPAction* recv = recv_in + return &recv.addr + + +cdef extern void udp_action_free(unsigned long action) nogil: + PyMem_RawFree(action) diff --git a/src/kloop/uring.pyx b/src/kloop/uring.pyx index 34d36e0..12d41a7 100644 --- a/src/kloop/uring.pyx +++ b/src/kloop/uring.pyx @@ -311,3 +311,20 @@ cdef int ring_sq_submit_sendmsg( 0, callback, ) else 0 + +cdef int ring_sq_submit_recvmsg( + SubmissionQueue* sq, + int fd, + const libc.msghdr *msg, + RingCallback* callback, +) nogil: + return 1 if ring_sq_submit( + sq, + linux.IORING_OP_RECVMSG, + fd, + msg, + 1, + 0, + 0, + callback, + ) else 0