From 6f547cf05ddd1a27c8ec4e107ac227f7f9520ba6 Mon Sep 17 00:00:00 2001 From: kbleeke Date: Thu, 2 Mar 2023 15:34:08 +0100 Subject: asyncify outgoing events --- src/runner.rs | 108 ++++++++++++++++++++++++++++++++++------------------------ 1 file changed, 64 insertions(+), 44 deletions(-) (limited to 'src/runner.rs') diff --git a/src/runner.rs b/src/runner.rs index 9945af3fc..4abccf48b 100644 --- a/src/runner.rs +++ b/src/runner.rs @@ -1,6 +1,6 @@ -use core::cell::Cell; use core::slice; +use embassy_futures::select::{select3, Either3}; use embassy_futures::yield_now; use embassy_net_driver_channel as ch; use embassy_sync::pubsub::PubSubBehavior; @@ -12,9 +12,10 @@ pub use crate::bus::SpiBusCyw43; use crate::consts::*; use crate::events::{EventQueue, EventStatus}; use crate::fmt::Bytes; +use crate::ioctl::{IoctlState, IoctlType, PendingIoctl}; use crate::nvram::NVRAM; use crate::structs::*; -use crate::{events, Core, IoctlState, IoctlType, CHIP, MTU}; +use crate::{events, Core, CHIP, MTU}; #[cfg(feature = "firmware-logs")] struct LogState { @@ -40,7 +41,7 @@ pub struct Runner<'a, PWR, SPI> { ch: ch::Runner<'a, MTU>, bus: Bus, - ioctl_state: &'a Cell, + ioctl_state: &'a IoctlState, ioctl_id: u16, sdpcm_seq: u8, sdpcm_seq_max: u8, @@ -59,7 +60,7 @@ where pub(crate) fn new( ch: ch::Runner<'a, MTU>, bus: Bus, - ioctl_state: &'a Cell, + ioctl_state: &'a IoctlState, events: &'a EventQueue, ) -> Self { Self { @@ -226,19 +227,22 @@ where #[cfg(feature = "firmware-logs")] self.log_read().await; - // Send stuff - // TODO flow control not yet complete - if !self.has_credit() { - warn!("TX stalled"); - } else { - if let IoctlState::Pending { kind, cmd, iface, buf } = self.ioctl_state.get() { - self.send_ioctl(kind, cmd, iface, unsafe { &*buf }).await; - self.ioctl_state.set(IoctlState::Sent { buf }); - } - if !self.has_credit() { - warn!("TX stalled"); - } else { - if let Some(packet) = self.ch.try_tx_buf() { + let ev = || async { + // TODO use IRQs + yield_now().await; + }; + + if self.has_credit() { + let ioctl = self.ioctl_state.wait_pending(); + let tx = self.ch.tx_buf(); + + match select3(ioctl, tx, ev()).await { + Either3::First(PendingIoctl { buf, kind, cmd, iface }) => { + warn!("ioctl"); + self.send_ioctl(kind, cmd, iface, unsafe { &*buf }).await; + } + Either3::Second(packet) => { + warn!("packet"); trace!("tx pkt {:02x}", Bytes(&packet[..packet.len().min(48)])); let mut buf = [0; 512]; @@ -281,28 +285,46 @@ where self.bus.wlan_write(&buf[..(total_len / 4)]).await; self.ch.tx_done(); } + Either3::Third(()) => { + // Receive stuff + let irq = self.bus.read16(FUNC_BUS, REG_BUS_INTERRUPT).await; + + if irq & IRQ_F2_PACKET_AVAILABLE != 0 { + let mut status = 0xFFFF_FFFF; + while status == 0xFFFF_FFFF { + status = self.bus.read32(FUNC_BUS, REG_BUS_STATUS).await; + } + + if status & STATUS_F2_PKT_AVAILABLE != 0 { + let len = (status & STATUS_F2_PKT_LEN_MASK) >> STATUS_F2_PKT_LEN_SHIFT; + self.bus.wlan_read(&mut buf, len).await; + trace!("rx {:02x}", Bytes(&slice8_mut(&mut buf)[..(len as usize).min(48)])); + self.rx(&slice8_mut(&mut buf)[..len as usize]); + } + } + } } - } + } else { + warn!("TX stalled"); + ev().await; - // Receive stuff - let irq = self.bus.read16(FUNC_BUS, REG_BUS_INTERRUPT).await; + // Receive stuff + let irq = self.bus.read16(FUNC_BUS, REG_BUS_INTERRUPT).await; - if irq & IRQ_F2_PACKET_AVAILABLE != 0 { - let mut status = 0xFFFF_FFFF; - while status == 0xFFFF_FFFF { - status = self.bus.read32(FUNC_BUS, REG_BUS_STATUS).await; - } + if irq & IRQ_F2_PACKET_AVAILABLE != 0 { + let mut status = 0xFFFF_FFFF; + while status == 0xFFFF_FFFF { + status = self.bus.read32(FUNC_BUS, REG_BUS_STATUS).await; + } - if status & STATUS_F2_PKT_AVAILABLE != 0 { - let len = (status & STATUS_F2_PKT_LEN_MASK) >> STATUS_F2_PKT_LEN_SHIFT; - self.bus.wlan_read(&mut buf, len).await; - trace!("rx {:02x}", Bytes(&slice8_mut(&mut buf)[..(len as usize).min(48)])); - self.rx(&slice8_mut(&mut buf)[..len as usize]); + if status & STATUS_F2_PKT_AVAILABLE != 0 { + let len = (status & STATUS_F2_PKT_LEN_MASK) >> STATUS_F2_PKT_LEN_SHIFT; + self.bus.wlan_read(&mut buf, len).await; + trace!("rx {:02x}", Bytes(&slice8_mut(&mut buf)[..(len as usize).min(48)])); + self.rx(&slice8_mut(&mut buf)[..len as usize]); + } } } - - // TODO use IRQs - yield_now().await; } } @@ -340,19 +362,17 @@ where let cdc_header = CdcHeader::from_bytes(payload[..CdcHeader::SIZE].try_into().unwrap()); trace!(" {:?}", cdc_header); - if let IoctlState::Sent { buf } = self.ioctl_state.get() { - if cdc_header.id == self.ioctl_id { - if cdc_header.status != 0 { - // TODO: propagate error instead - panic!("IOCTL error {}", cdc_header.status as i32); - } + if cdc_header.id == self.ioctl_id { + if cdc_header.status != 0 { + // TODO: propagate error instead + panic!("IOCTL error {}", cdc_header.status as i32); + } - let resp_len = cdc_header.len as usize; - info!("IOCTL Response: {:02x}", Bytes(&payload[CdcHeader::SIZE..][..resp_len])); + let resp_len = cdc_header.len as usize; + let response = &payload[CdcHeader::SIZE..][..resp_len]; + info!("IOCTL Response: {:02x}", Bytes(response)); - (unsafe { &mut *buf }[..resp_len]).copy_from_slice(&payload[CdcHeader::SIZE..][..resp_len]); - self.ioctl_state.set(IoctlState::Done { resp_len }); - } + self.ioctl_state.ioctl_done(response); } } CHANNEL_TYPE_EVENT => { -- cgit