diff --git a/src/bin/avml-upload.rs b/src/bin/avml-upload.rs index da13ffa..895ae67 100644 --- a/src/bin/avml-upload.rs +++ b/src/bin/avml-upload.rs @@ -7,7 +7,7 @@ #![deny(clippy::manual_assert)] #![deny(clippy::indexing_slicing)] -use avml::{put, BlobUploader, Error}; +use avml::{put, BlobUploader, Error, DEFAULT_CONCURRENCY}; use clap::{Parser, Subcommand}; use std::path::PathBuf; use tokio::runtime::Runtime; @@ -38,8 +38,8 @@ enum Commands { url: Url, /// specify blob upload concurrency - #[arg(long)] - sas_block_concurrency: Option, + #[arg(long, default_value_t=DEFAULT_CONCURRENCY)] + sas_block_concurrency: usize, /// specify maximum block size in MiB #[arg(long)] diff --git a/src/bin/avml.rs b/src/bin/avml.rs index 07a3ed0..170215a 100644 --- a/src/bin/avml.rs +++ b/src/bin/avml.rs @@ -51,8 +51,8 @@ struct Config { /// specify blob upload concurrency #[cfg(feature = "blobstore")] - #[arg(long)] - sas_block_concurrency: Option, + #[arg(long, default_value_t=avml::DEFAULT_CONCURRENCY)] + sas_block_concurrency: usize, /// name of the file to write to on local system filename: PathBuf, diff --git a/src/lib.rs b/src/lib.rs index f15ff60..3d38541 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -14,7 +14,7 @@ mod upload; mod write_counter; #[cfg(feature = "blobstore")] -pub use crate::upload::blobstore::BlobUploader; +pub use crate::upload::blobstore::{BlobUploader, DEFAULT_CONCURRENCY}; #[cfg(feature = "put")] pub use crate::upload::http::put; diff --git a/src/upload/blobstore.rs b/src/upload/blobstore.rs index 25faaed..68d9389 100644 --- a/src/upload/blobstore.rs +++ b/src/upload/blobstore.rs @@ -41,26 +41,56 @@ pub enum Error { type Result = std::result::Result; -// https://docs.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage +/// Maximum number of blocks +/// +/// const BLOB_MAX_BLOCKS: usize = 50_000; + +/// Maximum size of any single block +/// +/// const BLOB_MAX_BLOCK_SIZE: usize = ONE_MB.saturating_mul(4000); + +/// Maximum total size of a file +/// +/// const BLOB_MAX_FILE_SIZE: usize = BLOB_MAX_BLOCKS.saturating_mul(BLOB_MAX_BLOCK_SIZE); -// trigger's the "high-throughput block blobs" on all storage accounts -// https://azure.microsoft.com/en-us/blog/high-throughput-with-azure-blob-storage/ +/// Minimum block size, which is required to trigger the "high-throughput block +/// blobs" feature on all storage accounts +/// +/// const BLOB_MIN_BLOCK_SIZE: usize = ONE_MB.saturating_mul(5); -// Azure's default max request rate for a storage account is 20,000 per second. -// By keeping to 10 or fewer concurrent upload threads, AVML can be used to -// simultaneously upload images from 1000 different hosts concurrently (a full -// VM scaleset) to a single default storage account. -// -// https://docs.microsoft.com/en-us/azure/storage/common/scalability-targets-standard-account#scale-targets-for-standard-storage-accounts +/// Azure's default max request rate for a storage account is 20,000 per second. +/// By keeping to 10 or fewer concurrent upload threads, AVML can be used to +/// simultaneously upload images from 1000 different hosts concurrently (a full +/// VM scaleset) to a single default storage account. +/// +/// const MAX_CONCURRENCY: usize = 10; -// if we're uploading *huge* files, use 100MB chunks +/// Azure's default max request rate for a storage account is 20,000 per second. +/// By keeping to 10 or fewer concurrent upload threads, AVML can be used to +/// simultaneously upload images from 1000 different hosts concurrently (a full +/// VM scaleset) to a single default storage account. +/// +/// +pub const DEFAULT_CONCURRENCY: usize = 10; + +/// As chunks stay in memory until the upload is complete, as to enable +/// automatic retries in the case of TCP or HTTP errors, chunks sizes for huge +/// files is capped to 100MB each const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100); +/// try to keep under 500MB in flight. If that's not possible due to block +/// size, concurrency will get disabled. +const MEMORY_THRESHOLD: usize = 500 * ONE_MB; + +/// When uploading a file without a size, such as when uploading a stream of an +/// unknown size, use a 1TB stream +const DEFAULT_FILE_SIZE: usize = 1024 * 1024 * 1024 * 1024; + pub struct UploadBlock { id: Bytes, data: Bytes, @@ -110,14 +140,12 @@ impl TryFrom<&Url> for SasToken { } fn calc_concurrency( - file_size: Option, + file_size: usize, block_size: Option, - upload_concurrency: Option, + upload_concurrency: usize, ) -> Result<(usize, usize)> { - if let Some(file_size) = file_size { - if file_size > BLOB_MAX_FILE_SIZE { - return Err(Error::TooLarge); - } + if file_size > BLOB_MAX_FILE_SIZE { + return Err(Error::TooLarge); } let block_size = match block_size { @@ -128,13 +156,12 @@ fn calc_concurrency( // if the file is small enough to fit with 5MB blocks, use that // to reduce impact for failure retries and increase // concurrency. - Some(x) if (x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS) => BLOB_MIN_BLOCK_SIZE, + x if (x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS) => BLOB_MIN_BLOCK_SIZE, // if the file is large enough that we can fit with 100MB blocks, use that. - Some(x) if (x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS) => REASONABLE_BLOCK_SIZE, + x if (x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS) => REASONABLE_BLOCK_SIZE, // otherwise, just use the smallest block size that will fit // within MAX BLOCKS to reduce memory pressure - Some(x) => (x / BLOB_MAX_BLOCKS) + 1, - None => REASONABLE_BLOCK_SIZE, + x => (x / BLOB_MAX_BLOCKS) + 1, } } // minimum required to hit high-throughput block blob performance thresholds @@ -148,12 +175,8 @@ fn calc_concurrency( let upload_concurrency = match upload_concurrency { // manually specifying concurrency of 0 will disable concurrency - Some(0) => 1, - Some(x) => x, - // if the user didn't specify concurrency, always try to keep under - // 200MB in flight. If that's not possible due to block size, disable - // concurrency. - None => match (200 * ONE_MB).saturating_div(block_size) { + 0 | 1 => 1, + _ => match (MEMORY_THRESHOLD).saturating_div(block_size) { 0 => 1, // cap the number of concurrent threads to reduce concurrency issues // at the server end. @@ -164,12 +187,30 @@ fn calc_concurrency( Ok((block_size, upload_concurrency)) } +/// Concurrently upload a Stream/File to an Azure Blob Store using a SAS URL. +/// +/// ```rust,no_run +/// use avml::BlobUploader; +/// # use url::Url; +/// # use avml::Result; +/// # use std::path::Path; +/// # async fn upload() -> Result<()> { +/// let sas_url = Url::parse("https://contoso.com/container_name/blob_name?sas_token_here=1") +/// .expect("url parsing failed"); +/// let path = Path::new("/tmp/image.lime"); +/// let uploader = BlobUploader::new(&sas_url)? +/// .block_size(Some(100)) +/// .concurrency(5); +/// uploader.upload_file(&path).await?; +/// # Ok(()) +/// # } +/// ``` #[derive(Clone)] pub struct BlobUploader { client: BlobClient, - size: Option, + size: usize, block_size: Option, - concurrency: Option, + concurrency: usize, sender: Sender, receiver: Receiver, } @@ -194,9 +235,9 @@ impl BlobUploader { Self { client, - size: None, + size: DEFAULT_FILE_SIZE, block_size: None, - concurrency: None, + concurrency: DEFAULT_CONCURRENCY, sender, receiver, } @@ -207,7 +248,7 @@ impl BlobUploader { /// If the anticipated upload size is not specified, the maximum file /// uploaded will be approximately 5TB. #[must_use] - pub fn size(self, size: Option) -> Self { + pub fn size(self, size: usize) -> Self { Self { size, ..self } } @@ -224,7 +265,7 @@ impl BlobUploader { } #[must_use] - pub fn concurrency(self, concurrency: Option) -> Self { + pub fn concurrency(self, concurrency: usize) -> Self { Self { concurrency, ..self @@ -240,7 +281,6 @@ impl BlobUploader { .await? .len() .try_into() - .map(Some) .map_err(|_| Error::SizeConversion)?; self.size = file_size; @@ -280,7 +320,9 @@ impl BlobUploader { } async fn uploaders(&self, count: usize) -> Result<()> { - let status = Status::new(self.size.map(|x| x as u64)); + let status = Status::new(Some( + self.size.try_into().map_err(|_| Error::SizeConversion)?, + )); let uploaders: Vec<_> = (0..usize::max(1, count)) .map(|_| { @@ -384,77 +426,66 @@ mod tests { #[test] fn test_calc_concurrency() -> Result<()> { assert_eq!( - (BLOB_MIN_BLOCK_SIZE, 1), - calc_concurrency(Some(ONE_MB * 300), Some(1), Some(1))?, + (BLOB_MIN_BLOCK_SIZE, 10), + calc_concurrency(ONE_MB * 300, Some(1), DEFAULT_CONCURRENCY)?, "specified blocksize would overflow block count, so we use the minimum block size" ); assert_eq!( (BLOB_MIN_BLOCK_SIZE, 10), - calc_concurrency(Some(ONE_GB * 30), Some(ONE_MB), None)?, - "specifying block size of ONE_MB" + calc_concurrency(ONE_GB * 30, Some(ONE_MB), DEFAULT_CONCURRENCY)?, + "30GB file, 1MB blocks" ); assert_eq!( - (ONE_MB * 100, 2), - calc_concurrency(Some(ONE_GB * 30), Some(ONE_MB * 100), None)?, - "specifying block size of 100MB but no concurrency" + (ONE_MB * 100, 5), + calc_concurrency(ONE_GB * 30, Some(ONE_MB * 100), DEFAULT_CONCURRENCY)?, + "30GB file, 100MB block size" ); assert_eq!( (5 * ONE_MB, 10), - calc_concurrency(Some(ONE_MB * 400), None, None)?, - "uploading 400MB file, 5MB chunks, 10 uploaders", + calc_concurrency(ONE_MB * 400, None, DEFAULT_CONCURRENCY)?, + "400MB file, no block size" ); assert_eq!( (5 * ONE_MB, 10), - calc_concurrency(Some(ONE_GB * 16), None, None)?, - "uploading 50,000 MB file. 5MB chunks, 10 uploaders", + calc_concurrency(ONE_GB * 16, None, DEFAULT_CONCURRENCY)?, + "16GB file, no block size" ); assert_eq!( (5 * ONE_MB, 10), - calc_concurrency(Some(ONE_GB * 32), None, None)?, - "uploading 32GB file" - ); - - assert_eq!( - (ONE_MB * 100, 2), - calc_concurrency(Some(ONE_TB), None, None)?, - "uploading 1TB file" + calc_concurrency(ONE_GB * 32, None, DEFAULT_CONCURRENCY)?, + "32GB file, no block size", ); assert_eq!( - (100 * ONE_MB, 2), - calc_concurrency(Some(ONE_TB * 4), None, None)?, - "uploading 5TB file. 100MB chunks, 2 uploaders" + (ONE_MB * 100, 5), + calc_concurrency(ONE_TB, None, DEFAULT_CONCURRENCY)?, + "1TB file, no block size" ); assert_eq!( - (100 * ONE_MB, 2), - calc_concurrency(Some(ONE_TB * 4), Some(0), None)?, - "uploading 5TB file with zero blocksize. 100MB chunks, 2 uploaders" + (100 * ONE_MB, 5), + calc_concurrency(ONE_TB * 4, None, DEFAULT_CONCURRENCY)?, + "4TB file, no block size" ); assert_eq!( - (100 * ONE_MB, 1), - calc_concurrency(Some(ONE_TB * 4), None, Some(0))?, - "uploading 5TB file with zero concurrency. 100MB chunks, 1 uploader" + (100 * ONE_MB, 5), + calc_concurrency(ONE_TB * 4, Some(0), DEFAULT_CONCURRENCY)?, + "4TB file, zero block size" ); let (block_size, uploaders_count) = - calc_concurrency(Some(ONE_TB.saturating_mul(32)), None, None)?; + calc_concurrency(ONE_TB.saturating_mul(32), None, DEFAULT_CONCURRENCY)?; assert!(block_size > REASONABLE_BLOCK_SIZE && block_size < BLOB_MAX_BLOCK_SIZE); assert_eq!(uploaders_count, 1); assert!( - calc_concurrency( - Some((BLOB_MAX_BLOCKS * BLOB_MAX_BLOCK_SIZE) + 1), - None, - None - ) - .is_err(), + calc_concurrency((BLOB_MAX_BLOCKS * BLOB_MAX_BLOCK_SIZE) + 1, None, 10).is_err(), "files beyond max size should fail" ); Ok(())