1
0
Fork 0
mirror of https://gitee.com/fantix/kloop.git synced 2024-11-22 10:21:25 +00:00

Implement UDP recv, basic DNS working!

This commit is contained in:
Fantix King 2022-05-21 13:04:34 -04:00
parent f3c43bf145
commit 91691e5996
No known key found for this signature in database
GPG key ID: 95304B04071CCDB4
5 changed files with 133 additions and 20 deletions

View file

@ -30,7 +30,7 @@ pub struct CResolve {
} }
#[repr(C)] #[repr(C)]
pub struct UDPSend { pub struct UDPAction {
_data: [u8; 0], _data: [u8; 0],
_marker: marker::PhantomData<(*mut u8, marker::PhantomPinned)>, _marker: marker::PhantomData<(*mut u8, marker::PhantomPinned)>,
} }
@ -41,7 +41,7 @@ extern "C" {
pub fn resolve_prep_addr(resolve: *const CResolve) -> *mut libc::sockaddr; pub fn resolve_prep_addr(resolve: *const CResolve) -> *mut libc::sockaddr;
pub fn resolve_done_cb(resolve: *const CResolve) -> libc::c_int; pub fn resolve_done_cb(resolve: *const CResolve) -> libc::c_int;
pub fn udp_bind(addr: *const libc::sockaddr, addrlen: libc::socklen_t) -> libc::c_int; pub fn udp_bind(addr: *const libc::sockaddr, addrlen: libc::socklen_t) -> libc::c_int;
pub fn udp_send_init(fd: libc::c_int, resolver: *const CResolver) -> libc::c_ulong; pub fn udp_action_init(fd: libc::c_int, resolver: *const CResolver) -> libc::c_ulong;
pub fn udp_send_poll( pub fn udp_send_poll(
send: libc::c_ulong, send: libc::c_ulong,
data: *const u8, data: *const u8,
@ -50,4 +50,12 @@ extern "C" {
addrlen: libc::socklen_t, addrlen: libc::socklen_t,
waker: *mut Waker, waker: *mut Waker,
) -> libc::c_int; ) -> libc::c_int;
pub fn udp_recv_poll(
recv: libc::c_ulong,
data: *mut u8,
datalen: libc::size_t,
waker: *mut Waker,
) -> libc::c_int;
pub fn udp_get_addr(action: libc::c_ulong) -> *const libc::sockaddr;
pub fn udp_action_free(action: libc::c_ulong);
} }

View file

@ -18,11 +18,12 @@ use futures_io::{AsyncRead, AsyncWrite};
use libc::{sockaddr, socklen_t}; use libc::{sockaddr, socklen_t};
use std::fmt::Debug; use std::fmt::Debug;
use std::future::Future; use std::future::Future;
use std::net::SocketAddr; use std::net::{Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6};
use std::pin::Pin; use std::pin::Pin;
use std::task::{Context, Poll, Waker}; use std::task::{Context, Poll, Waker};
use std::time::Duration; use std::time::Duration;
use std::{io, mem}; use std::{io, mem};
use futures_util::future::Ready;
use trust_dns_proto::error::ProtoError; use trust_dns_proto::error::ProtoError;
use trust_dns_proto::tcp::{Connect, DnsTcpStream}; use trust_dns_proto::tcp::{Connect, DnsTcpStream};
use trust_dns_proto::udp::UdpSocket; use trust_dns_proto::udp::UdpSocket;
@ -99,6 +100,7 @@ impl DnsTcpStream for KLoopTcp {
pub struct KLoopUdp { pub struct KLoopUdp {
fd: libc::c_int, fd: libc::c_int,
send: libc::c_ulong, send: libc::c_ulong,
recv: libc::c_ulong,
} }
#[async_trait] #[async_trait]
@ -112,8 +114,9 @@ impl UdpSocket for KLoopUdp {
let resolver = resolver.borrow().unwrap(); let resolver = resolver.borrow().unwrap();
let resolver = unsafe { resolver.as_ref() }.unwrap(); let resolver = unsafe { resolver.as_ref() }.unwrap();
let resolver = resolver.c_resolver; let resolver = resolver.c_resolver;
let send = unsafe { kloop::udp_send_init(fd, resolver) }; let send = unsafe { kloop::udp_action_init(fd, resolver) };
Ok(KLoopUdp { fd, send }) let recv = unsafe { kloop::udp_action_init(fd, resolver) };
Ok(KLoopUdp { fd, send, recv })
}) })
} }
@ -122,8 +125,22 @@ impl UdpSocket for KLoopUdp {
cx: &mut Context<'_>, cx: &mut Context<'_>,
buf: &mut [u8], buf: &mut [u8],
) -> Poll<io::Result<(usize, SocketAddr)>> { ) -> Poll<io::Result<(usize, SocketAddr)>> {
println!("TODO: poll_recv_from"); let waker = Box::new(cx.waker().clone());
todo!() match unsafe {
kloop::udp_recv_poll(
self.recv,
buf.as_mut_ptr(),
buf.len(),
Box::into_raw(waker),
)
} {
res if res > 0 => {
Poll::Ready(unsafe {
ptr_to_socket_addr(kloop::udp_get_addr(self.recv))
}.map(|addr| (res as usize, addr)))
}
_ => Poll::Pending,
}
} }
fn poll_send_to( fn poll_send_to(
@ -154,6 +171,15 @@ impl UdpSocket for KLoopUdp {
} }
} }
impl Drop for KLoopUdp {
fn drop(&mut self) {
unsafe {
kloop::udp_action_free(self.send);
kloop::udp_action_free(self.recv);
}
}
}
fn socket_addr_as_ptr(addr: SocketAddr) -> (*const sockaddr, socklen_t) { fn socket_addr_as_ptr(addr: SocketAddr) -> (*const sockaddr, socklen_t) {
match addr { match addr {
SocketAddr::V4(ref a) => ( SocketAddr::V4(ref a) => (
@ -167,6 +193,32 @@ fn socket_addr_as_ptr(addr: SocketAddr) -> (*const sockaddr, socklen_t) {
} }
} }
unsafe fn ptr_to_socket_addr(
addr: *const libc::sockaddr,
) -> io::Result<SocketAddr> {
match (*addr).sa_family as libc::c_int {
libc::AF_INET => {
let addr: &libc::sockaddr_in = &*(addr as *const libc::sockaddr_in);
let ip = Ipv4Addr::from(addr.sin_addr.s_addr.to_ne_bytes());
let port = u16::from_be(addr.sin_port);
Ok(SocketAddr::V4(SocketAddrV4::new(ip, port)))
}
libc::AF_INET6 => {
let addr: &libc::sockaddr_in6 = &*(addr as *const libc::sockaddr_in6);
let ip = Ipv6Addr::from(addr.sin6_addr.s6_addr);
let port = u16::from_be(addr.sin6_port);
Ok(SocketAddr::V6(SocketAddrV6::new(
ip,
port,
addr.sin6_flowinfo,
addr.sin6_scope_id,
)))
}
_ => Err(io::ErrorKind::InvalidInput.into()),
}
}
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub struct KLoopHandle; pub struct KLoopHandle;

View file

@ -9,10 +9,11 @@
# See the Mulan PSL v2 for more details. # See the Mulan PSL v2 for more details.
cdef struct UDPSend: cdef struct UDPAction:
int fd int fd
libc.iovec vec libc.iovec vec
libc.msghdr msg libc.msghdr msg
RingCallback callback RingCallback callback
CResolver* resolver CResolver* resolver
void* rust_waker void* rust_waker
libc.sockaddr addr

View file

@ -19,41 +19,43 @@ cdef extern int udp_bind(libc.sockaddr* addr, libc.socklen_t addrlen) nogil:
return fd return fd
cdef extern unsigned long udp_send_init(int fd, CResolver* resolver) nogil: cdef extern unsigned long udp_action_init(int fd, CResolver* resolver) nogil:
cdef UDPSend* rv cdef UDPAction* rv
rv = <UDPSend*>PyMem_RawMalloc(sizeof(UDPSend)) rv = <UDPAction*>PyMem_RawMalloc(sizeof(UDPAction))
if rv == NULL: if rv == NULL:
return 0 return 0
string.memset(rv, 0, sizeof(UDPSend)) string.memset(rv, 0, sizeof(UDPAction))
rv.fd = fd rv.fd = fd
rv.resolver = resolver rv.resolver = resolver
rv.msg.msg_iov = &rv.vec rv.msg.msg_iov = &rv.vec
rv.msg.msg_iovlen = 1 rv.msg.msg_iovlen = 1
rv.callback.data = <void*>rv rv.callback.data = <void*>rv
rv.callback.callback = udp_send_cb rv.callback.callback = udp_action_cb
rv.msg.msg_name = <void*>&rv.addr
rv.msg.msg_namelen = sizeof(rv.addr)
return <unsigned long>rv return <unsigned long>rv
cdef int udp_send_cb(RingCallback* cb) nogil except 0: cdef int udp_action_cb(RingCallback* cb) nogil except 0:
cdef UDPSend* send = <UDPSend*>cb.data cdef UDPAction* action = <UDPAction*>cb.data
waker_wake(send.rust_waker) waker_wake(action.rust_waker)
resolver_run_until_stalled(send.resolver.rust_resolver) resolver_run_until_stalled(action.resolver.rust_resolver)
return 1 return 1
cdef extern int udp_send_poll( cdef extern int udp_send_poll(
unsigned long send_in, unsigned long send_in,
char* data, const char* data,
size_t datalen, size_t datalen,
libc.sockaddr* addr, libc.sockaddr* addr,
libc.socklen_t addrlen, libc.socklen_t addrlen,
void* waker, void* waker,
) nogil: ) nogil:
cdef UDPSend* send = <UDPSend*>send_in cdef UDPAction* send = <UDPAction*>send_in
if send.vec.iov_base == NULL: if send.vec.iov_base == NULL:
send.vec.iov_base = data send.vec.iov_base = data
send.vec.iov_len = datalen send.vec.iov_len = datalen
send.msg.msg_name = <void*>addr send.addr = addr[0]
send.msg.msg_namelen = addrlen send.msg.msg_namelen = addrlen
send.rust_waker = waker send.rust_waker = waker
return ring_sq_submit_sendmsg( return ring_sq_submit_sendmsg(
@ -67,3 +69,36 @@ cdef extern int udp_send_poll(
if send.vec.iov_base != data or send.vec.iov_len != datalen: if send.vec.iov_base != data or send.vec.iov_len != datalen:
return -1 return -1
return send.callback.res or -1 return send.callback.res or -1
cdef extern int udp_recv_poll(
unsigned long recv_in,
char* data,
size_t datalen,
void* waker,
) nogil:
cdef UDPAction* recv = <UDPAction*>recv_in
if recv.vec.iov_base == NULL:
recv.vec.iov_base = data
recv.vec.iov_len = datalen
recv.rust_waker = waker
return ring_sq_submit_recvmsg(
&recv.resolver.loop.ring.sq,
recv.fd,
&recv.msg,
&recv.callback,
) - 1
else:
waker_forget(waker)
if recv.vec.iov_base != data or recv.vec.iov_len != datalen:
return -1
return recv.callback.res or -1
cdef extern libc.sockaddr* udp_get_addr(unsigned long recv_in) nogil:
cdef UDPAction* recv = <UDPAction*>recv_in
return &recv.addr
cdef extern void udp_action_free(unsigned long action) nogil:
PyMem_RawFree(<void*>action)

View file

@ -311,3 +311,20 @@ cdef int ring_sq_submit_sendmsg(
0, 0,
callback, callback,
) else 0 ) else 0
cdef int ring_sq_submit_recvmsg(
SubmissionQueue* sq,
int fd,
const libc.msghdr *msg,
RingCallback* callback,
) nogil:
return 1 if ring_sq_submit(
sq,
linux.IORING_OP_RECVMSG,
fd,
<unsigned long>msg,
1,
0,
0,
callback,
) else 0