autopush_common/
notification.rs1use std::collections::HashMap;
3#[cfg(feature = "postgres")]
4use std::str::FromStr;
5
6use serde_derive::{Deserialize, Serialize};
7use uuid::Uuid;
8
9#[cfg(feature = "postgres")]
10use crate::db::error::DbError;
11use crate::util::ms_since_epoch;
12
13#[derive(Serialize, Default, Deserialize, Clone, Debug)]
14pub struct Notification {
18 #[serde(rename = "channelID")]
20 pub channel_id: Uuid,
21 pub version: String,
22 #[serde(skip_serializing)]
23 pub timestamp: u64,
24 #[serde(default = "default_ttl", skip_serializing)]
26 pub ttl: u64,
27 #[serde(skip_serializing)]
29 pub topic: Option<String>,
30 #[serde(skip_serializing_if = "Option::is_none")]
31 pub data: Option<String>,
32 #[serde(skip_serializing)]
33 pub sortkey_timestamp: Option<u64>,
34 #[serde(skip_serializing_if = "Option::is_none")]
35 pub headers: Option<HashMap<String, String>>,
36 #[serde(skip_serializing_if = "Option::is_none")]
37 #[cfg(feature = "reliable_report")]
38 pub reliability_id: Option<String>,
39 #[cfg(feature = "reliable_report")]
40 #[serde(skip_serializing_if = "Option::is_none")]
41 pub reliable_state: Option<crate::reliability::ReliabilityState>,
42}
43
44pub const TOPIC_NOTIFICATION_PREFIX: &str = "01";
45pub const STANDARD_NOTIFICATION_PREFIX: &str = "02";
46
47impl Notification {
48 pub fn chidmessageid(&self) -> String {
59 let chid = self.channel_id.as_hyphenated();
60
61 if let Some(ref topic) = self.topic {
62 format!("{TOPIC_NOTIFICATION_PREFIX}:{chid}:{topic}")
63 } else if let Some(sortkey_timestamp) = self.sortkey_timestamp {
64 format!(
65 "{STANDARD_NOTIFICATION_PREFIX}:{}:{}",
66 if sortkey_timestamp == 0 {
67 ms_since_epoch()
68 } else {
69 sortkey_timestamp
70 },
71 chid
72 )
73 } else {
74 warn!("🚨 LEGACY MESSAGE!? {:?} ", self);
75 format!("{}:{}", chid, self.version)
77 }
78 }
79
80 pub fn expiry(&self) -> u64 {
81 self.timestamp + self.ttl
82 }
83
84 pub fn expired(&self, at_sec: u64) -> bool {
87 at_sec >= self.expiry()
88 }
89
90 #[cfg(feature = "reliable_report")]
91 pub async fn record_reliability(
92 &mut self,
93 reliability: &crate::reliability::PushReliability,
94 state: crate::reliability::ReliabilityState,
95 ) {
96 self.reliable_state = reliability
97 .record(
98 &self.reliability_id,
99 state,
100 &self.reliable_state,
101 Some(self.expiry()),
102 )
103 .await
104 .inspect_err(|e| {
105 warn!("🔍⚠️ Unable to record reliability state log: {:?}", e);
106 })
107 .unwrap_or(Some(state));
108 }
109
110 #[cfg(feature = "reliable_report")]
111 pub fn clone_without_reliability_state(&self) -> Self {
112 let mut cloned = self.clone();
113 cloned.reliable_state = None;
114 cloned
115 }
116}
117
118pub(crate) fn default_ttl() -> u64 {
119 0
120}
121
122#[cfg(feature = "postgres")]
123impl TryFrom<&sqlx::postgres::PgRow> for Notification {
125 type Error = DbError;
126
127 fn try_from(row: &sqlx::postgres::PgRow) -> Result<Self, Self::Error> {
128 #[cfg(feature = "reliable_report")]
129 use crate::reliability::ReliabilityState;
130 use sqlx::Row;
131 Ok(Self {
132 channel_id: row
133 .try_get::<&str, _>("channel_id")
134 .map(|v| Uuid::from_str(v).unwrap())
135 .unwrap(),
136 version: row.try_get::<String, _>("version").unwrap(),
137 ttl: row.try_get::<i64, _>("ttl").map(|v| v as u64).unwrap(),
138 topic: row
139 .try_get::<String, _>("topic")
140 .map(|v| if v.is_empty() { None } else { Some(v) })
141 .unwrap_or_default(),
142 timestamp: row
143 .try_get::<i64, _>("timestamp")
144 .map(|v| v as u64)
145 .unwrap(),
146 data: row.try_get::<String, _>("data").map(Some).unwrap(),
147 sortkey_timestamp: row
148 .try_get::<i64, _>("sortkey_timestamp")
149 .map(|v| Some(v as u64))
150 .unwrap_or_default(),
151 headers: row
152 .try_get::<&str, _>("headers")
153 .map(|v| {
154 if v.is_empty() || v == "null" || v == "{}" {
155 return None;
156 }
157 let hdrs: HashMap<String, String> = serde_json::from_str(v).unwrap();
158 Some(hdrs)
159 })
160 .unwrap_or_default(),
161 #[cfg(feature = "reliable_report")]
162 reliability_id: row.try_get::<String, _>("reliability_id").ok(),
163 #[cfg(feature = "reliable_report")]
164 reliable_state: row
165 .try_get::<&str, _>("reliable_state")
166 .map(|v| ReliabilityState::from_str(v).unwrap())
167 .ok(),
168 })
169 }
170}