aboutsummaryrefslogtreecommitdiff
path: root/embassy-net/src
diff options
context:
space:
mode:
authorUlf Lilleengen <[email protected]>2022-08-08 16:51:34 +0200
committerUlf Lilleengen <[email protected]>2022-08-08 16:51:34 +0200
commit18671b94ba173d6b5c2d2ec5e3569e39a03b61bb (patch)
tree23ec7aa298e049957c307b7c1b973201bf7f5f7d /embassy-net/src
parentb7b4c84067e0e85fa641a540458438297176f2e4 (diff)
Implement embedded-nal-async traits for embassy-net
Diffstat (limited to 'embassy-net/src')
-rw-r--r--embassy-net/src/tcp.rs167
1 files changed, 167 insertions, 0 deletions
diff --git a/embassy-net/src/tcp.rs b/embassy-net/src/tcp.rs
index c18391ace..96a6dfe28 100644
--- a/embassy-net/src/tcp.rs
+++ b/embassy-net/src/tcp.rs
@@ -328,3 +328,170 @@ impl<'d> embedded_io::asynch::Write for TcpWriter<'d> {
328 self.io.flush() 328 self.io.flush()
329 } 329 }
330} 330}
331
332#[cfg(feature = "unstable-traits")]
333pub mod client {
334 use core::mem::MaybeUninit;
335 use core::ptr::NonNull;
336
337 use atomic_polyfill::{AtomicBool, Ordering};
338 use embedded_nal_async::IpAddr;
339
340 use super::*;
341
342 pub struct TcpClient<'d, D: Device, const N: usize, const TX_SZ: usize = 1024, const RX_SZ: usize = 1024> {
343 stack: &'d Stack<D>,
344 tx: &'d BufferPool<TX_SZ, N>,
345 rx: &'d BufferPool<RX_SZ, N>,
346 }
347
348 impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpClient<'d, D, N, TX_SZ, RX_SZ> {
349 pub fn new(stack: &'d Stack<D>, tx: &'d BufferPool<TX_SZ, N>, rx: &'d BufferPool<RX_SZ, N>) -> Self {
350 Self { stack, tx, rx }
351 }
352 }
353
354 impl<'d, D: Device, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_nal_async::TcpConnect
355 for TcpClient<'d, D, N, TX_SZ, RX_SZ>
356 {
357 type Error = Error;
358 type Connection<'m> = TcpConnection<'m, N, TX_SZ, RX_SZ> where Self: 'm;
359 type ConnectFuture<'m> = impl Future<Output = Result<Self::Connection<'m>, Self::Error>> + 'm
360 where
361 Self: 'm;
362
363 fn connect<'m>(&'m self, remote: embedded_nal_async::SocketAddr) -> Self::ConnectFuture<'m> {
364 async move {
365 let addr: crate::IpAddress = match remote.ip() {
366 IpAddr::V4(addr) => crate::IpAddress::Ipv4(crate::Ipv4Address::from_bytes(&addr.octets())),
367 #[cfg(feature = "proto-ipv6")]
368 IpAddr::V6(addr) => crate::IpAddress::Ipv6(crate::Ipv6Address::from_bytes(&addr.octets())),
369 #[cfg(not(feature = "proto-ipv6"))]
370 IpAddr::V6(_) => panic!("ipv6 support not enabled"),
371 };
372 let remote_endpoint = (addr, remote.port());
373 let mut socket = TcpConnection::new(&self.stack, self.tx, self.rx)?;
374 socket
375 .socket
376 .connect(remote_endpoint)
377 .await
378 .map_err(|_| Error::ConnectionReset)?;
379 Ok(socket)
380 }
381 }
382 }
383
384 pub struct TcpConnection<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> {
385 socket: TcpSocket<'d>,
386 tx: &'d BufferPool<TX_SZ, N>,
387 rx: &'d BufferPool<RX_SZ, N>,
388 txb: NonNull<[u8; TX_SZ]>,
389 rxb: NonNull<[u8; RX_SZ]>,
390 }
391
392 impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> TcpConnection<'d, N, TX_SZ, RX_SZ> {
393 fn new<D: Device>(
394 stack: &'d Stack<D>,
395 tx: &'d BufferPool<TX_SZ, N>,
396 rx: &'d BufferPool<RX_SZ, N>,
397 ) -> Result<Self, Error> {
398 let mut txb = tx.alloc().ok_or(Error::ConnectionReset)?;
399 let mut rxb = rx.alloc().ok_or(Error::ConnectionReset)?;
400 Ok(Self {
401 socket: unsafe { TcpSocket::new(stack, rxb.as_mut(), txb.as_mut()) },
402 tx,
403 rx,
404 txb,
405 rxb,
406 })
407 }
408 }
409
410 impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> Drop for TcpConnection<'d, N, TX_SZ, RX_SZ> {
411 fn drop(&mut self) {
412 unsafe {
413 self.socket.close();
414 self.rx.free(self.rxb);
415 self.tx.free(self.txb);
416 }
417 }
418 }
419
420 impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::Io
421 for TcpConnection<'d, N, TX_SZ, RX_SZ>
422 {
423 type Error = Error;
424 }
425
426 impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::asynch::Read
427 for TcpConnection<'d, N, TX_SZ, RX_SZ>
428 {
429 type ReadFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
430 where
431 Self: 'a;
432
433 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Self::ReadFuture<'a> {
434 self.socket.read(buf)
435 }
436 }
437
438 impl<'d, const N: usize, const TX_SZ: usize, const RX_SZ: usize> embedded_io::asynch::Write
439 for TcpConnection<'d, N, TX_SZ, RX_SZ>
440 {
441 type WriteFuture<'a> = impl Future<Output = Result<usize, Self::Error>>
442 where
443 Self: 'a;
444
445 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Self::WriteFuture<'a> {
446 self.socket.write(buf)
447 }
448
449 type FlushFuture<'a> = impl Future<Output = Result<(), Self::Error>>
450 where
451 Self: 'a;
452
453 fn flush<'a>(&'a mut self) -> Self::FlushFuture<'a> {
454 self.socket.flush()
455 }
456 }
457
458 pub type BufferPool<const BUFSZ: usize, const N: usize> = Pool<[u8; BUFSZ], N>;
459
460 pub struct Pool<T, const N: usize> {
461 used: [AtomicBool; N],
462 data: [UnsafeCell<MaybeUninit<T>>; N],
463 }
464
465 impl<T, const N: usize> Pool<T, N> {
466 const VALUE: AtomicBool = AtomicBool::new(false);
467 const UNINIT: UnsafeCell<MaybeUninit<T>> = UnsafeCell::new(MaybeUninit::uninit());
468
469 pub const fn new() -> Self {
470 Self {
471 used: [Self::VALUE; N],
472 data: [Self::UNINIT; N],
473 }
474 }
475 }
476
477 impl<T, const N: usize> Pool<T, N> {
478 fn alloc(&self) -> Option<NonNull<T>> {
479 for n in 0..N {
480 if self.used[n].swap(true, Ordering::SeqCst) == false {
481 let p = self.data[n].get() as *mut T;
482 return Some(unsafe { NonNull::new_unchecked(p) });
483 }
484 }
485 None
486 }
487
488 /// safety: p must be a pointer obtained from self.alloc that hasn't been freed yet.
489 unsafe fn free(&self, p: NonNull<T>) {
490 let origin = self.data.as_ptr() as *mut T;
491 let n = p.as_ptr().offset_from(origin);
492 assert!(n >= 0);
493 assert!((n as usize) < N);
494 self.used[n as usize].store(false, Ordering::SeqCst);
495 }
496 }
497}