diff options
Diffstat (limited to 'src/lib.rs')
| -rw-r--r-- | src/lib.rs | 230 |
1 files changed, 168 insertions, 62 deletions
| @@ -1,10 +1,14 @@ | |||
| 1 | #![no_std] | 1 | #![no_std] |
| 2 | 2 | ||
| 3 | use core::{cell::RefCell, net::{Ipv4Addr, SocketAddrV4}, task::Waker}; | 3 | use core::{ |
| 4 | cell::RefCell, | ||
| 5 | net::{Ipv4Addr, SocketAddrV4}, | ||
| 6 | task::Waker, | ||
| 7 | }; | ||
| 4 | 8 | ||
| 5 | use embassy_net::tcp::TcpSocket; | 9 | use embassy_net::tcp::TcpSocket; |
| 6 | use embassy_sync::waitqueue::AtomicWaker; | 10 | use embassy_sync::waitqueue::AtomicWaker; |
| 7 | use embassy_time::Timer; | 11 | use embassy_time::{Duration, Timer}; |
| 8 | use heapless::{ | 12 | use heapless::{ |
| 9 | Vec, VecView, | 13 | Vec, VecView, |
| 10 | string::{String, StringView}, | 14 | string::{String, StringView}, |
| @@ -432,7 +436,11 @@ pub fn new<'a>(resources: &'a mut DeviceResources, config: DeviceConfig) -> Devi | |||
| 432 | } | 436 | } |
| 433 | } | 437 | } |
| 434 | 438 | ||
| 435 | fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityStorage) -> Entity<'a> { | 439 | fn create_entity<'a>( |
| 440 | device: &Device<'a>, | ||
| 441 | config: EntityConfig, | ||
| 442 | storage: EntityStorage, | ||
| 443 | ) -> Entity<'a> { | ||
| 436 | let index = 'outer: { | 444 | let index = 'outer: { |
| 437 | for idx in 0..device.entities.len() { | 445 | for idx in 0..device.entities.len() { |
| 438 | if device.entities[idx].borrow().is_none() { | 446 | if device.entities[idx].borrow().is_none() { |
| @@ -457,7 +465,11 @@ fn create_entity<'a>(device: &Device<'a>, config: EntityConfig, storage: EntityS | |||
| 457 | } | 465 | } |
| 458 | } | 466 | } |
| 459 | 467 | ||
| 460 | pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorConfig) -> Sensor<'a> { | 468 | pub fn create_sensor<'a>( |
| 469 | device: &Device<'a>, | ||
| 470 | id: &'static str, | ||
| 471 | config: SensorConfig, | ||
| 472 | ) -> Sensor<'a> { | ||
| 461 | let mut entity_config = EntityConfig::default(); | 473 | let mut entity_config = EntityConfig::default(); |
| 462 | entity_config.id = id; | 474 | entity_config.id = id; |
| 463 | config.populate(&mut entity_config); | 475 | config.populate(&mut entity_config); |
| @@ -470,16 +482,28 @@ pub fn create_sensor<'a>(device: &Device<'a>, id: &'static str, config: SensorCo | |||
| 470 | Sensor::new(entity) | 482 | Sensor::new(entity) |
| 471 | } | 483 | } |
| 472 | 484 | ||
| 473 | pub fn create_button<'a>(device: &Device<'a>, id: &'static str, config: ButtonConfig) -> Button<'a> { | 485 | pub fn create_button<'a>( |
| 486 | device: &Device<'a>, | ||
| 487 | id: &'static str, | ||
| 488 | config: ButtonConfig, | ||
| 489 | ) -> Button<'a> { | ||
| 474 | let mut entity_config = EntityConfig::default(); | 490 | let mut entity_config = EntityConfig::default(); |
| 475 | entity_config.id = id; | 491 | entity_config.id = id; |
| 476 | config.populate(&mut entity_config); | 492 | config.populate(&mut entity_config); |
| 477 | 493 | ||
| 478 | let entity = create_entity(device, entity_config, EntityStorage::Button(Default::default())); | 494 | let entity = create_entity( |
| 495 | device, | ||
| 496 | entity_config, | ||
| 497 | EntityStorage::Button(Default::default()), | ||
| 498 | ); | ||
| 479 | Button::new(entity) | 499 | Button::new(entity) |
| 480 | } | 500 | } |
| 481 | 501 | ||
| 482 | pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberConfig) -> Number<'a> { | 502 | pub fn create_number<'a>( |
| 503 | device: &Device<'a>, | ||
| 504 | id: &'static str, | ||
| 505 | config: NumberConfig, | ||
| 506 | ) -> Number<'a> { | ||
| 483 | let mut entity_config = EntityConfig::default(); | 507 | let mut entity_config = EntityConfig::default(); |
| 484 | entity_config.id = id; | 508 | entity_config.id = id; |
| 485 | config.populate(&mut entity_config); | 509 | config.populate(&mut entity_config); |
| @@ -495,7 +519,11 @@ pub fn create_number<'a>(device: &Device<'a>, id: &'static str, config: NumberCo | |||
| 495 | Number::new(entity) | 519 | Number::new(entity) |
| 496 | } | 520 | } |
| 497 | 521 | ||
| 498 | pub fn create_switch<'a>(device: &Device<'a>, id: &'static str, config: SwitchConfig) -> Switch<'a> { | 522 | pub fn create_switch<'a>( |
| 523 | device: &Device<'a>, | ||
| 524 | id: &'static str, | ||
| 525 | config: SwitchConfig, | ||
| 526 | ) -> Switch<'a> { | ||
| 499 | let mut entity_config = EntityConfig::default(); | 527 | let mut entity_config = EntityConfig::default(); |
| 500 | entity_config.id = id; | 528 | entity_config.id = id; |
| 501 | config.populate(&mut entity_config); | 529 | config.populate(&mut entity_config); |
| @@ -531,6 +559,8 @@ pub fn create_binary_sensor<'a>( | |||
| 531 | pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { | 559 | pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Result<(), Error> { |
| 532 | use core::fmt::Write; | 560 | use core::fmt::Write; |
| 533 | 561 | ||
| 562 | const MQTT_TIMEOUT: Duration = Duration::from_secs(30); | ||
| 563 | |||
| 534 | device.availability_topic_buffer.clear(); | 564 | device.availability_topic_buffer.clear(); |
| 535 | write!( | 565 | write!( |
| 536 | device.availability_topic_buffer, | 566 | device.availability_topic_buffer, |
| @@ -549,15 +579,24 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 549 | will_retain: true, | 579 | will_retain: true, |
| 550 | ..Default::default() | 580 | ..Default::default() |
| 551 | }; | 581 | }; |
| 552 | if let Err(err) = client | 582 | match embassy_time::with_timeout( |
| 553 | .connect_with(device.config.device_id, connect_params) | 583 | MQTT_TIMEOUT, |
| 554 | .await | 584 | client.connect_with(device.config.device_id, connect_params), |
| 585 | ) | ||
| 586 | .await | ||
| 555 | { | 587 | { |
| 556 | crate::log::error!( | 588 | Ok(Ok(())) => {} |
| 557 | "mqtt connect failed with: {:?}", | 589 | Ok(Err(err)) => { |
| 558 | crate::log::Debug2Format(&err) | 590 | crate::log::error!( |
| 559 | ); | 591 | "mqtt connect failed with: {:?}", |
| 560 | return Err(Error::new("mqtt connection failed")); | 592 | crate::log::Debug2Format(&err) |
| 593 | ); | ||
| 594 | return Err(Error::new("mqtt connection failed")); | ||
| 595 | } | ||
| 596 | Err(_) => { | ||
| 597 | crate::log::error!("mqtt connect timed out"); | ||
| 598 | return Err(Error::new("mqtt connect timed out")); | ||
| 599 | } | ||
| 561 | } | 600 | } |
| 562 | 601 | ||
| 563 | crate::log::debug!("sending discover messages"); | 602 | crate::log::debug!("sending discover messages"); |
| @@ -635,7 +674,8 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 635 | discovery | 674 | discovery |
| 636 | ); | 675 | ); |
| 637 | 676 | ||
| 638 | device.discovery_buffer | 677 | device |
| 678 | .discovery_buffer | ||
| 639 | .resize(device.discovery_buffer.capacity(), 0) | 679 | .resize(device.discovery_buffer.capacity(), 0) |
| 640 | .unwrap(); | 680 | .unwrap(); |
| 641 | let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer) | 681 | let n = serde_json_core::to_slice(&discovery, &mut device.discovery_buffer) |
| @@ -645,48 +685,73 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 645 | 685 | ||
| 646 | let discovery_topic = device.discovery_topic_buffer.as_str(); | 686 | let discovery_topic = device.discovery_topic_buffer.as_str(); |
| 647 | crate::log::debug!("sending discovery to topic '{}'", discovery_topic); | 687 | crate::log::debug!("sending discovery to topic '{}'", discovery_topic); |
| 648 | if let Err(err) = client | 688 | match embassy_time::with_timeout( |
| 649 | .publish(discovery_topic, &device.discovery_buffer) | 689 | MQTT_TIMEOUT, |
| 650 | .await | 690 | client.publish(discovery_topic, &device.discovery_buffer), |
| 691 | ) | ||
| 692 | .await | ||
| 651 | { | 693 | { |
| 652 | crate::log::error!( | 694 | Ok(Ok(_)) => {} |
| 653 | "mqtt discovery publish failed with: {:?}", | 695 | Ok(Err(err)) => { |
| 654 | crate::log::Debug2Format(&err) | 696 | crate::log::error!( |
| 655 | ); | 697 | "mqtt discovery publish failed with: {:?}", |
| 656 | return Err(Error::new("mqtt discovery publish failed")); | 698 | crate::log::Debug2Format(&err) |
| 699 | ); | ||
| 700 | return Err(Error::new("mqtt discovery publish failed")); | ||
| 701 | } | ||
| 702 | Err(_) => { | ||
| 703 | crate::log::error!("mqtt discovery publish timed out"); | ||
| 704 | return Err(Error::new("mqtt discovery publish timed out")); | ||
| 705 | } | ||
| 657 | } | 706 | } |
| 658 | 707 | ||
| 659 | let command_topic = device.command_topic_buffer.as_str(); | 708 | let command_topic = device.command_topic_buffer.as_str(); |
| 660 | crate::log::debug!("subscribing to command topic '{}'", command_topic); | 709 | crate::log::debug!("subscribing to command topic '{}'", command_topic); |
| 661 | if let Err(err) = client.subscribe(command_topic).await { | 710 | match embassy_time::with_timeout(MQTT_TIMEOUT, client.subscribe(command_topic)).await { |
| 662 | crate::log::error!( | 711 | Ok(Ok(_)) => {} |
| 663 | "mqtt subscribe to '{}' failed with: {:?}", | 712 | Ok(Err(err)) => { |
| 664 | command_topic, | 713 | crate::log::error!( |
| 665 | crate::log::Debug2Format(&err) | 714 | "mqtt subscribe to '{}' failed with: {:?}", |
| 666 | ); | 715 | command_topic, |
| 667 | return Err(Error::new( | 716 | crate::log::Debug2Format(&err) |
| 668 | "mqtt subscription to entity command topic failed", | 717 | ); |
| 669 | )); | 718 | return Err(Error::new( |
| 719 | "mqtt subscription to entity command topic failed", | ||
| 720 | )); | ||
| 721 | } | ||
| 722 | Err(_) => { | ||
| 723 | crate::log::error!("mqtt subscribe to '{}' timed out", command_topic); | ||
| 724 | return Err(Error::new("mqtt subscribe timed out")); | ||
| 725 | } | ||
| 670 | } | 726 | } |
| 671 | } | 727 | } |
| 672 | 728 | ||
| 673 | if let Err(err) = client | 729 | match embassy_time::with_timeout( |
| 674 | .publish_with( | 730 | MQTT_TIMEOUT, |
| 675 | availability_topic, | 731 | client.publish_with( |
| 676 | AVAILABLE_PAYLOAD.as_bytes(), | 732 | availability_topic, |
| 677 | embedded_mqtt::PublishParams { | 733 | AVAILABLE_PAYLOAD.as_bytes(), |
| 678 | retain: true, | 734 | embedded_mqtt::PublishParams { |
| 679 | ..Default::default() | 735 | retain: true, |
| 680 | }, | 736 | ..Default::default() |
| 681 | ) | 737 | }, |
| 682 | .await | 738 | ), |
| 683 | { | 739 | ) |
| 740 | .await | ||
| 741 | { | ||
| 742 | Ok(Ok(_)) => {} | ||
| 743 | Ok(Err(err)) => { | ||
| 684 | crate::log::error!( | 744 | crate::log::error!( |
| 685 | "mqtt availability publish failed with: {:?}", | 745 | "mqtt availability publish failed with: {:?}", |
| 686 | crate::log::Debug2Format(&err) | 746 | crate::log::Debug2Format(&err) |
| 687 | ); | 747 | ); |
| 688 | return Err(Error::new("mqtt availability publish failed")); | 748 | return Err(Error::new("mqtt availability publish failed")); |
| 689 | } | 749 | } |
| 750 | Err(_) => { | ||
| 751 | crate::log::error!("mqtt availability publish timed out"); | ||
| 752 | return Err(Error::new("mqtt availability publish timed out")); | ||
| 753 | } | ||
| 754 | } | ||
| 690 | 755 | ||
| 691 | 'outer_loop: loop { | 756 | 'outer_loop: loop { |
| 692 | use core::fmt::Write; | 757 | use core::fmt::Write; |
| @@ -749,7 +814,14 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 749 | } | 814 | } |
| 750 | 815 | ||
| 751 | let state_topic = device.state_topic_buffer.as_str(); | 816 | let state_topic = device.state_topic_buffer.as_str(); |
| 752 | if let Err(err) = client.publish(state_topic, device.publish_buffer).await { | 817 | match embassy_time::with_timeout( |
| 818 | MQTT_TIMEOUT, | ||
| 819 | client.publish(state_topic, device.publish_buffer), | ||
| 820 | ) | ||
| 821 | .await | ||
| 822 | { | ||
| 823 | Ok(Ok(_)) => {} | ||
| 824 | Ok(Err(err)) => { | ||
| 753 | crate::log::error!( | 825 | crate::log::error!( |
| 754 | "mqtt state publish on topic '{}' failed with: {:?}", | 826 | "mqtt state publish on topic '{}' failed with: {:?}", |
| 755 | state_topic, | 827 | state_topic, |
| @@ -757,12 +829,22 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 757 | ); | 829 | ); |
| 758 | return Err(Error::new("mqtt publish failed")); | 830 | return Err(Error::new("mqtt publish failed")); |
| 759 | } | 831 | } |
| 832 | Err(_) => { | ||
| 833 | crate::log::error!("mqtt state publish on topic '{}' timed out", state_topic); | ||
| 834 | return Err(Error::new("mqtt publish timed out")); | ||
| 835 | } | ||
| 760 | } | 836 | } |
| 837 | } | ||
| 761 | 838 | ||
| 762 | let receive = client.receive(); | 839 | let receive = client.receive(); |
| 763 | let waker = wait_on_atomic_waker(device.waker); | 840 | let waker = wait_on_atomic_waker(device.waker); |
| 764 | let publish = match embassy_futures::select::select(receive, waker).await { | 841 | let publish = match embassy_time::with_timeout( |
| 765 | embassy_futures::select::Either::First(packet) => match packet { | 842 | MQTT_TIMEOUT, |
| 843 | embassy_futures::select::select(receive, waker), | ||
| 844 | ) | ||
| 845 | .await | ||
| 846 | { | ||
| 847 | Ok(embassy_futures::select::Either::First(packet)) => match packet { | ||
| 766 | Ok(embedded_mqtt::Packet::Publish(publish)) => publish, | 848 | Ok(embedded_mqtt::Packet::Publish(publish)) => publish, |
| 767 | Err(err) => { | 849 | Err(err) => { |
| 768 | crate::log::error!( | 850 | crate::log::error!( |
| @@ -773,7 +855,11 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 773 | } | 855 | } |
| 774 | _ => continue, | 856 | _ => continue, |
| 775 | }, | 857 | }, |
| 776 | embassy_futures::select::Either::Second(_) => continue, | 858 | Ok(embassy_futures::select::Either::Second(_)) => continue, |
| 859 | Err(_) => { | ||
| 860 | crate::log::error!("mqtt receive timed out"); | ||
| 861 | return Err(Error::new("mqtt receive timed out")); | ||
| 862 | } | ||
| 777 | }; | 863 | }; |
| 778 | 864 | ||
| 779 | let entity = 'entity_search_block: { | 865 | let entity = 'entity_search_block: { |
| @@ -816,12 +902,24 @@ pub async fn run<T: Transport>(device: &mut Device<'_>, transport: &mut T) -> Re | |||
| 816 | ); | 902 | ); |
| 817 | 903 | ||
| 818 | let data_len = publish.data_len; | 904 | let data_len = publish.data_len; |
| 819 | if let Err(err) = client.receive_data(&mut read_buffer[..data_len]).await { | 905 | match embassy_time::with_timeout( |
| 820 | crate::log::error!( | 906 | MQTT_TIMEOUT, |
| 821 | "mqtt receive data failed with: {:?}", | 907 | client.receive_data(&mut read_buffer[..data_len]), |
| 822 | crate::log::Debug2Format(&err) | 908 | ) |
| 823 | ); | 909 | .await |
| 824 | return Err(Error::new("mqtt receive data failed")); | 910 | { |
| 911 | Ok(Ok(())) => {} | ||
| 912 | Ok(Err(err)) => { | ||
| 913 | crate::log::error!( | ||
| 914 | "mqtt receive data failed with: {:?}", | ||
| 915 | crate::log::Debug2Format(&err) | ||
| 916 | ); | ||
| 917 | return Err(Error::new("mqtt receive data failed")); | ||
| 918 | } | ||
| 919 | Err(_) => { | ||
| 920 | crate::log::error!("mqtt receive data timed out"); | ||
| 921 | return Err(Error::new("mqtt receive data timed out")); | ||
| 922 | } | ||
| 825 | } | 923 | } |
| 826 | 924 | ||
| 827 | let command = match str::from_utf8(&read_buffer[..data_len]) { | 925 | let command = match str::from_utf8(&read_buffer[..data_len]) { |
| @@ -990,13 +1088,21 @@ pub async fn connect_and_run( | |||
| 990 | let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); | 1088 | let mut socket = TcpSocket::new(stack, &mut rx_buffer, &mut tx_buffer); |
| 991 | socket.set_timeout(Some(embassy_time::Duration::from_secs(10))); | 1089 | socket.set_timeout(Some(embassy_time::Duration::from_secs(10))); |
| 992 | 1090 | ||
| 993 | if let Err(err) = socket.connect(addr).await { | 1091 | let connect_fut = embassy_time::with_timeout(Duration::from_secs(10), socket.connect(addr)); |
| 994 | crate::log::error!( | 1092 | match connect_fut.await { |
| 995 | "TCP connect to {} failed with: {:?}", | 1093 | Ok(Err(err)) => { |
| 996 | addr, | 1094 | crate::log::error!( |
| 997 | crate::log::Debug2Format(&err) | 1095 | "TCP connect to {} failed with: {:?}", |
| 998 | ); | 1096 | addr, |
| 999 | continue; | 1097 | crate::log::Debug2Format(&err) |
| 1098 | ); | ||
| 1099 | continue; | ||
| 1100 | } | ||
| 1101 | Err(_) => { | ||
| 1102 | crate::log::error!("TCP connect to {} timed out", addr); | ||
| 1103 | continue; | ||
| 1104 | } | ||
| 1105 | _ => {} | ||
| 1000 | } | 1106 | } |
| 1001 | 1107 | ||
| 1002 | socket.set_timeout(None); | 1108 | socket.set_timeout(None); |
