Skip to content

Commit

Permalink
simplify upload configuration (#191)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmc-msft authored Oct 26, 2022
1 parent 6507e21 commit a3a3312
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 75 deletions.
6 changes: 3 additions & 3 deletions src/bin/avml-upload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
#![deny(clippy::manual_assert)]
#![deny(clippy::indexing_slicing)]

use avml::{put, BlobUploader, Error};
use avml::{put, BlobUploader, Error, DEFAULT_CONCURRENCY};
use clap::{Parser, Subcommand};
use std::path::PathBuf;
use tokio::runtime::Runtime;
Expand Down Expand Up @@ -38,8 +38,8 @@ enum Commands {
url: Url,

/// specify blob upload concurrency
#[arg(long)]
sas_block_concurrency: Option<usize>,
#[arg(long, default_value_t=DEFAULT_CONCURRENCY)]
sas_block_concurrency: usize,

/// specify maximum block size in MiB
#[arg(long)]
Expand Down
4 changes: 2 additions & 2 deletions src/bin/avml.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ struct Config {

/// specify blob upload concurrency
#[cfg(feature = "blobstore")]
#[arg(long)]
sas_block_concurrency: Option<usize>,
#[arg(long, default_value_t=avml::DEFAULT_CONCURRENCY)]
sas_block_concurrency: usize,

/// name of the file to write to on local system
filename: PathBuf,
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ mod upload;
mod write_counter;

#[cfg(feature = "blobstore")]
pub use crate::upload::blobstore::BlobUploader;
pub use crate::upload::blobstore::{BlobUploader, DEFAULT_CONCURRENCY};

#[cfg(feature = "put")]
pub use crate::upload::http::put;
Expand Down
169 changes: 100 additions & 69 deletions src/upload/blobstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,56 @@ pub enum Error {

type Result<T> = std::result::Result<T, Error>;

// https://docs.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage
/// Maximum number of blocks
///
/// <https://docs.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage>
const BLOB_MAX_BLOCKS: usize = 50_000;

/// Maximum size of any single block
///
/// <https://docs.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage>
const BLOB_MAX_BLOCK_SIZE: usize = ONE_MB.saturating_mul(4000);

/// Maximum total size of a file
///
/// <https://docs.microsoft.com/en-us/azure/storage/blobs/scalability-targets#scale-targets-for-blob-storage>
const BLOB_MAX_FILE_SIZE: usize = BLOB_MAX_BLOCKS.saturating_mul(BLOB_MAX_BLOCK_SIZE);

// trigger's the "high-throughput block blobs" on all storage accounts
// https://azure.microsoft.com/en-us/blog/high-throughput-with-azure-blob-storage/
/// Minimum block size, which is required to trigger the "high-throughput block
/// blobs" feature on all storage accounts
///
/// <https://azure.microsoft.com/en-us/blog/high-throughput-with-azure-blob-storage/>
const BLOB_MIN_BLOCK_SIZE: usize = ONE_MB.saturating_mul(5);

// Azure's default max request rate for a storage account is 20,000 per second.
// By keeping to 10 or fewer concurrent upload threads, AVML can be used to
// simultaneously upload images from 1000 different hosts concurrently (a full
// VM scaleset) to a single default storage account.
//
// https://docs.microsoft.com/en-us/azure/storage/common/scalability-targets-standard-account#scale-targets-for-standard-storage-accounts
/// Azure's default max request rate for a storage account is 20,000 per second.
/// By keeping to 10 or fewer concurrent upload threads, AVML can be used to
/// simultaneously upload images from 1000 different hosts concurrently (a full
/// VM scaleset) to a single default storage account.
///
/// <https://docs.microsoft.com/en-us/azure/storage/common/scalability-targets-standard-account#scale-targets-for-standard-storage-accounts>
const MAX_CONCURRENCY: usize = 10;

// if we're uploading *huge* files, use 100MB chunks
/// Azure's default max request rate for a storage account is 20,000 per second.
/// By keeping to 10 or fewer concurrent upload threads, AVML can be used to
/// simultaneously upload images from 1000 different hosts concurrently (a full
/// VM scaleset) to a single default storage account.
///
/// <https://docs.microsoft.com/en-us/azure/storage/common/scalability-targets-standard-account#scale-targets-for-standard-storage-accounts>
pub const DEFAULT_CONCURRENCY: usize = 10;

/// As chunks stay in memory until the upload is complete, as to enable
/// automatic retries in the case of TCP or HTTP errors, chunks sizes for huge
/// files is capped to 100MB each
const REASONABLE_BLOCK_SIZE: usize = ONE_MB.saturating_mul(100);

/// try to keep under 500MB in flight. If that's not possible due to block
/// size, concurrency will get disabled.
const MEMORY_THRESHOLD: usize = 500 * ONE_MB;

/// When uploading a file without a size, such as when uploading a stream of an
/// unknown size, use a 1TB stream
const DEFAULT_FILE_SIZE: usize = 1024 * 1024 * 1024 * 1024;

pub struct UploadBlock {
id: Bytes,
data: Bytes,
Expand Down Expand Up @@ -110,14 +140,12 @@ impl TryFrom<&Url> for SasToken {
}

fn calc_concurrency(
file_size: Option<usize>,
file_size: usize,
block_size: Option<usize>,
upload_concurrency: Option<usize>,
upload_concurrency: usize,
) -> Result<(usize, usize)> {
if let Some(file_size) = file_size {
if file_size > BLOB_MAX_FILE_SIZE {
return Err(Error::TooLarge);
}
if file_size > BLOB_MAX_FILE_SIZE {
return Err(Error::TooLarge);
}

let block_size = match block_size {
Expand All @@ -128,13 +156,12 @@ fn calc_concurrency(
// if the file is small enough to fit with 5MB blocks, use that
// to reduce impact for failure retries and increase
// concurrency.
Some(x) if (x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS) => BLOB_MIN_BLOCK_SIZE,
x if (x < BLOB_MIN_BLOCK_SIZE * BLOB_MAX_BLOCKS) => BLOB_MIN_BLOCK_SIZE,
// if the file is large enough that we can fit with 100MB blocks, use that.
Some(x) if (x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS) => REASONABLE_BLOCK_SIZE,
x if (x < REASONABLE_BLOCK_SIZE * BLOB_MAX_BLOCKS) => REASONABLE_BLOCK_SIZE,
// otherwise, just use the smallest block size that will fit
// within MAX BLOCKS to reduce memory pressure
Some(x) => (x / BLOB_MAX_BLOCKS) + 1,
None => REASONABLE_BLOCK_SIZE,
x => (x / BLOB_MAX_BLOCKS) + 1,
}
}
// minimum required to hit high-throughput block blob performance thresholds
Expand All @@ -148,12 +175,8 @@ fn calc_concurrency(

let upload_concurrency = match upload_concurrency {
// manually specifying concurrency of 0 will disable concurrency
Some(0) => 1,
Some(x) => x,
// if the user didn't specify concurrency, always try to keep under
// 200MB in flight. If that's not possible due to block size, disable
// concurrency.
None => match (200 * ONE_MB).saturating_div(block_size) {
0 | 1 => 1,
_ => match (MEMORY_THRESHOLD).saturating_div(block_size) {
0 => 1,
// cap the number of concurrent threads to reduce concurrency issues
// at the server end.
Expand All @@ -164,12 +187,30 @@ fn calc_concurrency(
Ok((block_size, upload_concurrency))
}

/// Concurrently upload a Stream/File to an Azure Blob Store using a SAS URL.
///
/// ```rust,no_run
/// use avml::BlobUploader;
/// # use url::Url;
/// # use avml::Result;
/// # use std::path::Path;
/// # async fn upload() -> Result<()> {
/// let sas_url = Url::parse("https://contoso.com/container_name/blob_name?sas_token_here=1")
/// .expect("url parsing failed");
/// let path = Path::new("/tmp/image.lime");
/// let uploader = BlobUploader::new(&sas_url)?
/// .block_size(Some(100))
/// .concurrency(5);
/// uploader.upload_file(&path).await?;
/// # Ok(())
/// # }
/// ```
#[derive(Clone)]
pub struct BlobUploader {
client: BlobClient,
size: Option<usize>,
size: usize,
block_size: Option<usize>,
concurrency: Option<usize>,
concurrency: usize,
sender: Sender<UploadBlock>,
receiver: Receiver<UploadBlock>,
}
Expand All @@ -194,9 +235,9 @@ impl BlobUploader {

Self {
client,
size: None,
size: DEFAULT_FILE_SIZE,
block_size: None,
concurrency: None,
concurrency: DEFAULT_CONCURRENCY,
sender,
receiver,
}
Expand All @@ -207,7 +248,7 @@ impl BlobUploader {
/// If the anticipated upload size is not specified, the maximum file
/// uploaded will be approximately 5TB.
#[must_use]
pub fn size(self, size: Option<usize>) -> Self {
pub fn size(self, size: usize) -> Self {
Self { size, ..self }
}

Expand All @@ -224,7 +265,7 @@ impl BlobUploader {
}

#[must_use]
pub fn concurrency(self, concurrency: Option<usize>) -> Self {
pub fn concurrency(self, concurrency: usize) -> Self {
Self {
concurrency,
..self
Expand All @@ -240,7 +281,6 @@ impl BlobUploader {
.await?
.len()
.try_into()
.map(Some)
.map_err(|_| Error::SizeConversion)?;

self.size = file_size;
Expand Down Expand Up @@ -280,7 +320,9 @@ impl BlobUploader {
}

async fn uploaders(&self, count: usize) -> Result<()> {
let status = Status::new(self.size.map(|x| x as u64));
let status = Status::new(Some(
self.size.try_into().map_err(|_| Error::SizeConversion)?,
));

let uploaders: Vec<_> = (0..usize::max(1, count))
.map(|_| {
Expand Down Expand Up @@ -384,77 +426,66 @@ mod tests {
#[test]
fn test_calc_concurrency() -> Result<()> {
assert_eq!(
(BLOB_MIN_BLOCK_SIZE, 1),
calc_concurrency(Some(ONE_MB * 300), Some(1), Some(1))?,
(BLOB_MIN_BLOCK_SIZE, 10),
calc_concurrency(ONE_MB * 300, Some(1), DEFAULT_CONCURRENCY)?,
"specified blocksize would overflow block count, so we use the minimum block size"
);

assert_eq!(
(BLOB_MIN_BLOCK_SIZE, 10),
calc_concurrency(Some(ONE_GB * 30), Some(ONE_MB), None)?,
"specifying block size of ONE_MB"
calc_concurrency(ONE_GB * 30, Some(ONE_MB), DEFAULT_CONCURRENCY)?,
"30GB file, 1MB blocks"
);

assert_eq!(
(ONE_MB * 100, 2),
calc_concurrency(Some(ONE_GB * 30), Some(ONE_MB * 100), None)?,
"specifying block size of 100MB but no concurrency"
(ONE_MB * 100, 5),
calc_concurrency(ONE_GB * 30, Some(ONE_MB * 100), DEFAULT_CONCURRENCY)?,
"30GB file, 100MB block size"
);

assert_eq!(
(5 * ONE_MB, 10),
calc_concurrency(Some(ONE_MB * 400), None, None)?,
"uploading 400MB file, 5MB chunks, 10 uploaders",
calc_concurrency(ONE_MB * 400, None, DEFAULT_CONCURRENCY)?,
"400MB file, no block size"
);

assert_eq!(
(5 * ONE_MB, 10),
calc_concurrency(Some(ONE_GB * 16), None, None)?,
"uploading 50,000 MB file. 5MB chunks, 10 uploaders",
calc_concurrency(ONE_GB * 16, None, DEFAULT_CONCURRENCY)?,
"16GB file, no block size"
);

assert_eq!(
(5 * ONE_MB, 10),
calc_concurrency(Some(ONE_GB * 32), None, None)?,
"uploading 32GB file"
);

assert_eq!(
(ONE_MB * 100, 2),
calc_concurrency(Some(ONE_TB), None, None)?,
"uploading 1TB file"
calc_concurrency(ONE_GB * 32, None, DEFAULT_CONCURRENCY)?,
"32GB file, no block size",
);

assert_eq!(
(100 * ONE_MB, 2),
calc_concurrency(Some(ONE_TB * 4), None, None)?,
"uploading 5TB file. 100MB chunks, 2 uploaders"
(ONE_MB * 100, 5),
calc_concurrency(ONE_TB, None, DEFAULT_CONCURRENCY)?,
"1TB file, no block size"
);

assert_eq!(
(100 * ONE_MB, 2),
calc_concurrency(Some(ONE_TB * 4), Some(0), None)?,
"uploading 5TB file with zero blocksize. 100MB chunks, 2 uploaders"
(100 * ONE_MB, 5),
calc_concurrency(ONE_TB * 4, None, DEFAULT_CONCURRENCY)?,
"4TB file, no block size"
);

assert_eq!(
(100 * ONE_MB, 1),
calc_concurrency(Some(ONE_TB * 4), None, Some(0))?,
"uploading 5TB file with zero concurrency. 100MB chunks, 1 uploader"
(100 * ONE_MB, 5),
calc_concurrency(ONE_TB * 4, Some(0), DEFAULT_CONCURRENCY)?,
"4TB file, zero block size"
);

let (block_size, uploaders_count) =
calc_concurrency(Some(ONE_TB.saturating_mul(32)), None, None)?;
calc_concurrency(ONE_TB.saturating_mul(32), None, DEFAULT_CONCURRENCY)?;
assert!(block_size > REASONABLE_BLOCK_SIZE && block_size < BLOB_MAX_BLOCK_SIZE);
assert_eq!(uploaders_count, 1);

assert!(
calc_concurrency(
Some((BLOB_MAX_BLOCKS * BLOB_MAX_BLOCK_SIZE) + 1),
None,
None
)
.is_err(),
calc_concurrency((BLOB_MAX_BLOCKS * BLOB_MAX_BLOCK_SIZE) + 1, None, 10).is_err(),
"files beyond max size should fail"
);
Ok(())
Expand Down

0 comments on commit a3a3312

Please sign in to comment.