stream.rs 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145
  1. //
  2. //
  3. use std::time::Duration;
  4. use std::error::Error;
  5. use std::sync::atomic::{AtomicU32, Ordering};
  6. use std::collections::HashMap;
  7. use futures_util::{StreamExt};
  8. use reqwest::Client;
  9. use reqwest::header::{HeaderName, HeaderMap};
  10. static REQUEST_COUNTER: AtomicU32 = AtomicU32::new(0);
  11. #[derive(Debug, Clone, serde::Serialize)]
  12. pub struct StreamResponse {
  13. request_id: u32,
  14. status: u16,
  15. status_text: String,
  16. headers: HashMap<String, String>
  17. }
  18. #[derive(Clone, serde::Serialize)]
  19. pub struct EndPayload {
  20. request_id: u32,
  21. status: u16,
  22. }
  23. #[derive(Clone, serde::Serialize)]
  24. pub struct ChunkPayload {
  25. request_id: u32,
  26. chunk: bytes::Bytes,
  27. }
  28. #[tauri::command]
  29. pub async fn stream_fetch(
  30. window: tauri::Window,
  31. method: String,
  32. url: String,
  33. headers: HashMap<String, String>,
  34. body: Vec<u8>,
  35. ) -> Result<StreamResponse, String> {
  36. let event_name = "stream-response";
  37. let request_id = REQUEST_COUNTER.fetch_add(1, Ordering::SeqCst);
  38. let mut _headers = HeaderMap::new();
  39. for (key, value) in &headers {
  40. _headers.insert(key.parse::<HeaderName>().unwrap(), value.parse().unwrap());
  41. }
  42. // println!("method: {:?}", method);
  43. // println!("url: {:?}", url);
  44. // println!("headers: {:?}", headers);
  45. // println!("headers: {:?}", _headers);
  46. let method = method.parse::<reqwest::Method>().map_err(|err| format!("failed to parse method: {}", err))?;
  47. let client = Client::builder()
  48. .default_headers(_headers)
  49. .redirect(reqwest::redirect::Policy::limited(3))
  50. .connect_timeout(Duration::new(3, 0))
  51. .build()
  52. .map_err(|err| format!("failed to generate client: {}", err))?;
  53. let mut request = client.request(
  54. method.clone(),
  55. url.parse::<reqwest::Url>().map_err(|err| format!("failed to parse url: {}", err))?
  56. );
  57. if method == reqwest::Method::POST || method == reqwest::Method::PUT || method == reqwest::Method::PATCH {
  58. let body = bytes::Bytes::from(body);
  59. // println!("body: {:?}", body);
  60. request = request.body(body);
  61. }
  62. // println!("client: {:?}", client);
  63. // println!("request: {:?}", request);
  64. let response_future = request.send();
  65. let res = response_future.await;
  66. let response = match res {
  67. Ok(res) => {
  68. // get response and emit to client
  69. let mut headers = HashMap::new();
  70. for (name, value) in res.headers() {
  71. headers.insert(
  72. name.as_str().to_string(),
  73. std::str::from_utf8(value.as_bytes()).unwrap().to_string()
  74. );
  75. }
  76. let status = res.status().as_u16();
  77. tauri::async_runtime::spawn(async move {
  78. let mut stream = res.bytes_stream();
  79. while let Some(chunk) = stream.next().await {
  80. match chunk {
  81. Ok(bytes) => {
  82. // println!("chunk: {:?}", bytes);
  83. if let Err(e) = window.emit(event_name, ChunkPayload{ request_id, chunk: bytes }) {
  84. println!("Failed to emit chunk payload: {:?}", e);
  85. }
  86. }
  87. Err(err) => {
  88. println!("Error chunk: {:?}", err);
  89. }
  90. }
  91. }
  92. if let Err(e) = window.emit(event_name, EndPayload{ request_id, status: 0 }) {
  93. println!("Failed to emit end payload: {:?}", e);
  94. }
  95. });
  96. StreamResponse {
  97. request_id,
  98. status,
  99. status_text: "OK".to_string(),
  100. headers,
  101. }
  102. }
  103. Err(err) => {
  104. let error: String = err.source()
  105. .map(|e| e.to_string())
  106. .unwrap_or_else(|| "Unknown error occurred".to_string());
  107. println!("Error response: {:?}", error);
  108. tauri::async_runtime::spawn( async move {
  109. if let Err(e) = window.emit(event_name, ChunkPayload{ request_id, chunk: error.into() }) {
  110. println!("Failed to emit chunk payload: {:?}", e);
  111. }
  112. if let Err(e) = window.emit(event_name, EndPayload{ request_id, status: 0 }) {
  113. println!("Failed to emit end payload: {:?}", e);
  114. }
  115. });
  116. StreamResponse {
  117. request_id,
  118. status: 599,
  119. status_text: "Error".to_string(),
  120. headers: HashMap::new(),
  121. }
  122. }
  123. };
  124. // println!("Response: {:?}", response);
  125. Ok(response)
  126. }