[go: up one dir, main page]

worker/r2/
mod.rs

1use std::{collections::HashMap, convert::TryInto, ops::Deref};
2
3pub use builder::*;
4
5use js_sys::{JsString, Reflect, Uint8Array};
6use wasm_bindgen::{JsCast, JsValue};
7use wasm_bindgen_futures::JsFuture;
8use worker_sys::{
9    FixedLengthStream as EdgeFixedLengthStream, R2Bucket as EdgeR2Bucket, R2Checksums,
10    R2MultipartUpload as EdgeR2MultipartUpload, R2Object as EdgeR2Object,
11    R2ObjectBody as EdgeR2ObjectBody, R2Objects as EdgeR2Objects,
12    R2UploadedPart as EdgeR2UploadedPart,
13};
14
15use crate::{
16    env::EnvBinding, ByteStream, Date, Error, FixedLengthStream, Headers, ResponseBody, Result,
17};
18
19mod builder;
20
21/// An instance of the R2 bucket binding.
22#[derive(Debug, Clone)]
23pub struct Bucket {
24    inner: EdgeR2Bucket,
25}
26
27impl Bucket {
28    /// Retrieves the [Object] for the given key containing only object metadata, if the key exists.
29    pub async fn head(&self, key: impl Into<String>) -> Result<Option<Object>> {
30        let head_promise = self.inner.head(key.into())?;
31        let value = JsFuture::from(head_promise).await?;
32
33        if value.is_null() {
34            return Ok(None);
35        }
36
37        Ok(Some(Object {
38            inner: ObjectInner::NoBody(value.into()),
39        }))
40    }
41
42    /// Retrieves the [Object] for the given key containing object metadata and the object body if
43    /// the key exists. In the event that a precondition specified in options fails, get() returns
44    /// an [Object] with no body.
45    pub fn get(&self, key: impl Into<String>) -> GetOptionsBuilder<'_> {
46        GetOptionsBuilder {
47            edge_bucket: &self.inner,
48            key: key.into(),
49            only_if: None,
50            range: None,
51        }
52    }
53
54    /// Stores the given `value` and metadata under the associated `key`. Once the write succeeds,
55    /// returns an [Object] containing metadata about the stored Object.
56    ///
57    /// R2 writes are strongly consistent. Once the future resolves, all subsequent read operations
58    /// will see this key value pair globally.
59    pub fn put(&self, key: impl Into<String>, value: impl Into<Data>) -> PutOptionsBuilder<'_> {
60        PutOptionsBuilder {
61            edge_bucket: &self.inner,
62            key: key.into(),
63            value: value.into(),
64            http_metadata: None,
65            custom_metadata: None,
66            checksum: None,
67            checksum_algorithm: "md5".into(),
68        }
69    }
70
71    /// Deletes the given value and metadata under the associated key. Once the delete succeeds,
72    /// returns void.
73    ///
74    /// R2 deletes are strongly consistent. Once the Promise resolves, all subsequent read
75    /// operations will no longer see this key value pair globally.
76    pub async fn delete(&self, key: impl Into<String>) -> Result<()> {
77        let delete_promise = self.inner.delete(key.into())?;
78        JsFuture::from(delete_promise).await?;
79        Ok(())
80    }
81
82    /// Deletes the given values and metadata under the associated keys. Once
83    /// the delete succeeds, returns void.
84    ///
85    /// R2 deletes are strongly consistent. Once the Promise resolves, all
86    /// subsequent read operations will no longer see the provided key value
87    /// pairs globally.
88    ///
89    /// Up to 1000 keys may be deleted per call.
90    pub async fn delete_multiple(&self, keys: Vec<impl Deref<Target = str>>) -> Result<()> {
91        let fut: JsFuture = self
92            .inner
93            .delete_multiple(keys.into_iter().map(|key| JsValue::from(&*key)).collect())?
94            .into();
95        fut.await?;
96        Ok(())
97    }
98
99    /// Returns an [Objects] containing a list of [Objects]s contained within the bucket. By
100    /// default, returns the first 1000 entries.
101    pub fn list(&self) -> ListOptionsBuilder<'_> {
102        ListOptionsBuilder {
103            edge_bucket: &self.inner,
104            limit: None,
105            prefix: None,
106            cursor: None,
107            delimiter: None,
108            include: None,
109        }
110    }
111
112    /// Creates a multipart upload.
113    ///
114    /// Returns a [MultipartUpload] value representing the newly created multipart upload.
115    /// Once the multipart upload has been created, the multipart upload can be immediately
116    /// interacted with globally, either through the Workers API, or through the S3 API.
117    pub fn create_multipart_upload(
118        &self,
119        key: impl Into<String>,
120    ) -> CreateMultipartUploadOptionsBuilder<'_> {
121        CreateMultipartUploadOptionsBuilder {
122            edge_bucket: &self.inner,
123            key: key.into(),
124            http_metadata: None,
125            custom_metadata: None,
126        }
127    }
128
129    /// Returns an object representing a multipart upload with the given `key` and `uploadId`.
130    ///
131    /// The operation does not perform any checks to ensure the validity of the `uploadId`,
132    /// nor does it verify the existence of a corresponding active multipart upload.
133    /// This is done to minimize latency before being able to call subsequent operations on the returned object.
134    pub fn resume_multipart_upload(
135        &self,
136        key: impl Into<String>,
137        upload_id: impl Into<String>,
138    ) -> Result<MultipartUpload> {
139        Ok(MultipartUpload {
140            inner: self
141                .inner
142                .resume_multipart_upload(key.into(), upload_id.into())?
143                .into(),
144        })
145    }
146}
147
148impl EnvBinding for Bucket {
149    const TYPE_NAME: &'static str = "R2Bucket";
150}
151
152impl JsCast for Bucket {
153    fn instanceof(val: &JsValue) -> bool {
154        val.is_instance_of::<EdgeR2Bucket>()
155    }
156
157    fn unchecked_from_js(val: JsValue) -> Self {
158        Self { inner: val.into() }
159    }
160
161    fn unchecked_from_js_ref(val: &JsValue) -> &Self {
162        unsafe { &*(val as *const JsValue as *const Self) }
163    }
164}
165
166impl From<Bucket> for JsValue {
167    fn from(bucket: Bucket) -> Self {
168        JsValue::from(bucket.inner)
169    }
170}
171
172impl AsRef<JsValue> for Bucket {
173    fn as_ref(&self) -> &JsValue {
174        &self.inner
175    }
176}
177
178/// [Object] is created when you [put](Bucket::put) an object into a [Bucket]. [Object] represents
179/// the metadata of an object based on the information provided by the uploader. Every object that
180/// you [put](Bucket::put) into a [Bucket] will have an [Object] created.
181#[derive(Debug)]
182pub struct Object {
183    inner: ObjectInner,
184}
185
186impl Object {
187    pub fn key(&self) -> String {
188        match &self.inner {
189            ObjectInner::NoBody(inner) => inner.key().unwrap(),
190            ObjectInner::Body(inner) => inner.key().unwrap(),
191        }
192    }
193
194    pub fn version(&self) -> String {
195        match &self.inner {
196            ObjectInner::NoBody(inner) => inner.version().unwrap(),
197            ObjectInner::Body(inner) => inner.version().unwrap(),
198        }
199    }
200
201    pub fn size(&self) -> u64 {
202        let size = match &self.inner {
203            ObjectInner::NoBody(inner) => inner.size().unwrap(),
204            ObjectInner::Body(inner) => inner.size().unwrap(),
205        };
206        size.round() as u64
207    }
208
209    pub fn etag(&self) -> String {
210        match &self.inner {
211            ObjectInner::NoBody(inner) => inner.etag().unwrap(),
212            ObjectInner::Body(inner) => inner.etag().unwrap(),
213        }
214    }
215
216    pub fn http_etag(&self) -> String {
217        match &self.inner {
218            ObjectInner::NoBody(inner) => inner.http_etag().unwrap(),
219            ObjectInner::Body(inner) => inner.http_etag().unwrap(),
220        }
221    }
222
223    pub fn uploaded(&self) -> Date {
224        match &self.inner {
225            ObjectInner::NoBody(inner) => inner.uploaded().unwrap(),
226            ObjectInner::Body(inner) => inner.uploaded().unwrap(),
227        }
228        .into()
229    }
230
231    pub fn http_metadata(&self) -> HttpMetadata {
232        match &self.inner {
233            ObjectInner::NoBody(inner) => inner.http_metadata().unwrap(),
234            ObjectInner::Body(inner) => inner.http_metadata().unwrap(),
235        }
236        .into()
237    }
238
239    pub fn checksum(&self) -> R2Checksums {
240        match &self.inner {
241            ObjectInner::NoBody(inner) => inner.checksums().unwrap(),
242            ObjectInner::Body(inner) => inner.checksums().unwrap(),
243        }
244        .into()
245    }
246
247    pub fn custom_metadata(&self) -> Result<HashMap<String, String>> {
248        let metadata = match &self.inner {
249            ObjectInner::NoBody(inner) => inner.custom_metadata().unwrap(),
250            ObjectInner::Body(inner) => inner.custom_metadata().unwrap(),
251        };
252
253        let keys = js_sys::Object::keys(&metadata).to_vec();
254        let mut map = HashMap::with_capacity(keys.len());
255
256        for key in keys {
257            let key = key.unchecked_into::<JsString>();
258            let value = Reflect::get(&metadata, &key)?.dyn_into::<JsString>()?;
259            map.insert(key.into(), value.into());
260        }
261
262        Ok(map)
263    }
264
265    pub fn range(&self) -> Result<Range> {
266        match &self.inner {
267            ObjectInner::NoBody(inner) => inner.range().unwrap(),
268            ObjectInner::Body(inner) => inner.range().unwrap(),
269        }
270        .try_into()
271    }
272
273    pub fn body(&self) -> Option<ObjectBody<'_>> {
274        match &self.inner {
275            ObjectInner::NoBody(_) => None,
276            ObjectInner::Body(body) => Some(ObjectBody { inner: body }),
277        }
278    }
279
280    pub fn body_used(&self) -> Option<bool> {
281        match &self.inner {
282            ObjectInner::NoBody(_) => None,
283            ObjectInner::Body(inner) => Some(inner.body_used().unwrap()),
284        }
285    }
286
287    pub fn write_http_metadata(&self, headers: Headers) -> Result<()> {
288        match &self.inner {
289            ObjectInner::NoBody(inner) => inner.write_http_metadata(headers.0)?,
290            ObjectInner::Body(inner) => inner.write_http_metadata(headers.0)?,
291        };
292
293        Ok(())
294    }
295}
296
297/// The data contained within an [Object].
298#[derive(Debug)]
299pub struct ObjectBody<'body> {
300    inner: &'body EdgeR2ObjectBody,
301}
302
303impl ObjectBody<'_> {
304    /// Reads the data in the [Object] via a [ByteStream].
305    pub fn stream(self) -> Result<ByteStream> {
306        if self.inner.body_used()? {
307            return Err(Error::BodyUsed);
308        }
309
310        let stream = self.inner.body()?;
311        let stream = wasm_streams::ReadableStream::from_raw(stream.unchecked_into());
312        Ok(ByteStream {
313            inner: stream.into_stream(),
314        })
315    }
316
317    /// Returns a [ResponseBody] containing the data in the [Object].
318    ///
319    /// This function can be used to hand off the [Object] data to the workers runtime for streaming
320    /// to the client in a [crate::Response]. This ensures that the worker does not consume CPU time
321    /// while the streaming occurs, which can be significant if instead [ObjectBody::stream] is used.
322    pub fn response_body(self) -> Result<ResponseBody> {
323        if self.inner.body_used()? {
324            return Err(Error::BodyUsed);
325        }
326
327        Ok(ResponseBody::Stream(self.inner.body()?))
328    }
329
330    pub async fn bytes(self) -> Result<Vec<u8>> {
331        let js_buffer = JsFuture::from(self.inner.array_buffer()?).await?;
332        let js_buffer = Uint8Array::new(&js_buffer);
333        let mut bytes = vec![0; js_buffer.length() as usize];
334        js_buffer.copy_to(&mut bytes);
335
336        Ok(bytes)
337    }
338
339    pub async fn text(self) -> Result<String> {
340        String::from_utf8(self.bytes().await?).map_err(|e| Error::RustError(e.to_string()))
341    }
342}
343
344/// [UploadedPart] represents a part that has been uploaded.
345/// [UploadedPart] objects are returned from [upload_part](MultipartUpload::upload_part) operations
346/// and must be passed to the [complete](MultipartUpload::complete) operation.
347#[derive(Debug)]
348pub struct UploadedPart {
349    inner: EdgeR2UploadedPart,
350}
351
352impl UploadedPart {
353    pub fn new(part_number: u16, etag: String) -> Self {
354        let obj = js_sys::Object::new();
355        Reflect::set(
356            &obj,
357            &JsValue::from_str("partNumber"),
358            &JsValue::from_f64(part_number as f64),
359        )
360        .unwrap();
361        Reflect::set(&obj, &JsValue::from_str("etag"), &JsValue::from_str(&etag)).unwrap();
362
363        let val: JsValue = obj.into();
364        Self { inner: val.into() }
365    }
366
367    pub fn part_number(&self) -> u16 {
368        self.inner.part_number().unwrap()
369    }
370
371    pub fn etag(&self) -> String {
372        self.inner.etag().unwrap()
373    }
374}
375
376#[derive(Debug)]
377pub struct MultipartUpload {
378    inner: EdgeR2MultipartUpload,
379}
380
381impl MultipartUpload {
382    /// Uploads a single part with the specified part number to this multipart upload.
383    ///
384    /// Returns an [UploadedPart] object containing the etag and part number.
385    /// These [UploadedPart] objects are required when completing the multipart upload.
386    ///
387    /// Getting hold of a value of this type does not guarantee that there is an active
388    /// underlying multipart upload corresponding to that object.
389    ///
390    /// A multipart upload can be completed or aborted at any time, either through the S3 API,
391    /// or by a parallel invocation of your Worker.
392    /// Therefore it is important to add the necessary error handling code around each operation
393    /// on the [MultipartUpload] object in case the underlying multipart upload no longer exists.
394    pub async fn upload_part(
395        &self,
396        part_number: u16,
397        value: impl Into<Data>,
398    ) -> Result<UploadedPart> {
399        let uploaded_part =
400            JsFuture::from(self.inner.upload_part(part_number, value.into().into())?).await?;
401        Ok(UploadedPart {
402            inner: uploaded_part.into(),
403        })
404    }
405
406    /// Request the upload id.
407    pub async fn upload_id(&self) -> String {
408        self.inner.upload_id().unwrap()
409    }
410
411    /// Aborts the multipart upload.
412    pub async fn abort(&self) -> Result<()> {
413        JsFuture::from(self.inner.abort()?).await?;
414        Ok(())
415    }
416
417    /// Completes the multipart upload with the given parts.
418    /// When the future is ready, the object is immediately accessible globally by any subsequent read operation.
419    pub async fn complete(
420        self,
421        uploaded_parts: impl IntoIterator<Item = UploadedPart>,
422    ) -> Result<Object> {
423        let object = JsFuture::from(
424            self.inner.complete(
425                uploaded_parts
426                    .into_iter()
427                    .map(|part| part.inner.into())
428                    .collect(),
429            )?,
430        )
431        .await?;
432        Ok(Object {
433            inner: ObjectInner::Body(object.into()),
434        })
435    }
436}
437
438/// A series of [Object]s returned by [list](Bucket::list).
439#[derive(Debug)]
440pub struct Objects {
441    inner: EdgeR2Objects,
442}
443
444impl Objects {
445    /// An [Vec] of [Object] matching the [list](Bucket::list) request.
446    pub fn objects(&self) -> Vec<Object> {
447        self.inner
448            .objects()
449            .unwrap()
450            .into_iter()
451            .map(|raw| Object {
452                inner: ObjectInner::NoBody(raw),
453            })
454            .collect()
455    }
456
457    /// If true, indicates there are more results to be retrieved for the current
458    /// [list](Bucket::list) request.
459    pub fn truncated(&self) -> bool {
460        self.inner.truncated().unwrap()
461    }
462
463    /// A token that can be passed to future [list](Bucket::list) calls to resume listing from that
464    /// point. Only present if truncated is true.
465    pub fn cursor(&self) -> Option<String> {
466        self.inner.cursor().unwrap()
467    }
468
469    /// If a delimiter has been specified, contains all prefixes between the specified prefix and
470    /// the next occurrence of the delimiter.
471    ///
472    /// For example, if no prefix is provided and the delimiter is '/', `foo/bar/baz` would return
473    /// `foo` as a delimited prefix. If `foo/` was passed as a prefix with the same structure and
474    /// delimiter, `foo/bar` would be returned as a delimited prefix.
475    pub fn delimited_prefixes(&self) -> Vec<String> {
476        self.inner
477            .delimited_prefixes()
478            .unwrap()
479            .into_iter()
480            .map(Into::into)
481            .collect()
482    }
483}
484
485#[derive(Debug, Clone)]
486pub(crate) enum ObjectInner {
487    NoBody(EdgeR2Object),
488    Body(EdgeR2ObjectBody),
489}
490
491#[derive(Debug)]
492pub enum Data {
493    ReadableStream(web_sys::ReadableStream),
494    Stream(FixedLengthStream),
495    Text(String),
496    Bytes(Vec<u8>),
497    Empty,
498}
499
500impl From<web_sys::ReadableStream> for Data {
501    fn from(stream: web_sys::ReadableStream) -> Self {
502        Data::ReadableStream(stream)
503    }
504}
505
506impl From<FixedLengthStream> for Data {
507    fn from(stream: FixedLengthStream) -> Self {
508        Data::Stream(stream)
509    }
510}
511
512impl From<String> for Data {
513    fn from(value: String) -> Self {
514        Data::Text(value)
515    }
516}
517
518impl From<Vec<u8>> for Data {
519    fn from(value: Vec<u8>) -> Self {
520        Data::Bytes(value)
521    }
522}
523
524impl From<Data> for JsValue {
525    fn from(data: Data) -> Self {
526        match data {
527            Data::ReadableStream(stream) => stream.into(),
528            Data::Stream(stream) => {
529                let stream_sys: EdgeFixedLengthStream = stream.into();
530                stream_sys.readable().into()
531            }
532            Data::Text(text) => JsString::from(text).into(),
533            Data::Bytes(bytes) => {
534                let arr = Uint8Array::new_with_length(bytes.len() as u32);
535                arr.copy_from(&bytes);
536                arr.into()
537            }
538            Data::Empty => JsValue::NULL,
539        }
540    }
541}