Skip to content

Commit

Permalink
Improve service address allocation (#3294)
Browse files Browse the repository at this point in the history
* Improve service address allocation
Should fix #3265
  • Loading branch information
karolz-ms authored Apr 3, 2024
1 parent f728973 commit 83bc549
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 14 deletions.
72 changes: 59 additions & 13 deletions src/Aspire.Hosting/Dcp/ApplicationExecutor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
using Microsoft.Extensions.Options;
using Polly;
using Polly.Retry;
using Polly.Timeout;

namespace Aspire.Hosting.Dcp;

Expand Down Expand Up @@ -887,7 +888,9 @@ private async Task CreateServicesAsync(CancellationToken cancellationToken = def
{
AspireEventSource.Instance.DcpServicesCreationStart();

var needAddressAllocated = _appResources.OfType<ServiceAppResource>().Where(sr => !sr.Service.HasCompleteAddress).ToList();
var needAddressAllocated = _appResources.OfType<ServiceAppResource>()
.Where(sr => !sr.Service.HasCompleteAddress && sr.Service.Spec.AddressAllocationMode != AddressAllocationModes.Proxyless)
.ToList();

await CreateResourcesAsync<Service>(cancellationToken).ConfigureAwait(false);

Expand All @@ -897,26 +900,69 @@ private async Task CreateServicesAsync(CancellationToken cancellationToken = def
return;
}

// We do not specify the initial list version, so the watcher will give us all updates to Service objects.
IAsyncEnumerable<(WatchEventType, Service)> serviceChangeEnumerator = kubernetesService.WatchAsync<Service>(cancellationToken: cancellationToken);
await foreach (var (evt, updated) in serviceChangeEnumerator)
var withTimeout = new TimeoutStrategyOptions()
{
if (evt == WatchEventType.Bookmark) { continue; } // Bookmarks do not contain any data.
Timeout = _options.Value.ServiceStartupWatchTimeout
};

var tryTwice = new RetryStrategyOptions()
{
BackoffType = DelayBackoffType.Constant,
MaxDelay = TimeSpan.FromSeconds(1),
UseJitter = true,
MaxRetryAttempts = 1,
ShouldHandle = new PredicateBuilder().Handle<Exception>(),
OnRetry = (retry) =>
{
_logger.LogDebug(
retry.Outcome.Exception,
"Watching for service port allocation ended with an error after {WatchDurationMs} (iteration {Iteration})",
retry.Duration.TotalMilliseconds,
retry.AttemptNumber
);
return ValueTask.CompletedTask;
}
};

var srvResource = needAddressAllocated.Where(sr => sr.Service.Metadata.Name == updated.Metadata.Name).FirstOrDefault();
if (srvResource == null) { continue; } // This service most likely already has full address information, so it is not on needAddressAllocated list.
var execution = new ResiliencePipelineBuilder().AddRetry(tryTwice).AddTimeout(withTimeout).Build();

if (updated.HasCompleteAddress || updated.Spec.AddressAllocationMode == AddressAllocationModes.Proxyless)
await execution.ExecuteAsync(async (attemptCancellationToken) =>
{
IAsyncEnumerable<(WatchEventType, Service)> serviceChangeEnumerator = kubernetesService.WatchAsync<Service>(cancellationToken: attemptCancellationToken);
await foreach (var (evt, updated) in serviceChangeEnumerator)
{
srvResource.Service.ApplyAddressInfoFrom(updated);
needAddressAllocated.Remove(srvResource);
if (evt == WatchEventType.Bookmark) { continue; } // Bookmarks do not contain any data.

var srvResource = needAddressAllocated.FirstOrDefault(sr => sr.Service.Metadata.Name == updated.Metadata.Name);
if (srvResource == null) { continue; } // This service most likely already has full address information, so it is not on needAddressAllocated list.

if (updated.HasCompleteAddress)
{
srvResource.Service.ApplyAddressInfoFrom(updated);
needAddressAllocated.Remove(srvResource);
}

if (needAddressAllocated.Count == 0)
{
return; // We are done
}
}
}, cancellationToken).ConfigureAwait(false);

if (needAddressAllocated.Count == 0)
// If there are still services that need address allocated, try a final direct query in case the watch missed some updates.
foreach(var sar in needAddressAllocated)
{
var dcpSvc = await kubernetesService.GetAsync<Service>(sar.Service.Metadata.Name, cancellationToken: cancellationToken).ConfigureAwait(false);
if (dcpSvc.HasCompleteAddress)
{
sar.Service.ApplyAddressInfoFrom(dcpSvc);
}
else
{
return; // We are done
distributedApplicationLogger.LogWarning("Unable to allocate a network port for service '{ServiceName}'; service may be unreachable and its clients may not work properly.", sar.Service.Metadata.Name);
}
}

}
finally
{
Expand Down Expand Up @@ -1073,7 +1119,7 @@ private void PrepareProjectExecutables()
}
else
{
#pragma warning disable CS0612 // These annotations are obsolete; remove in Aspire Preview 6
#pragma warning disable CS0612 // These annotations are obsolete; remove after Aspire GA
annotationHolder.Annotate(Executable.CSharpProjectPathAnnotation, projectMetadata.ProjectPath);

// ExcludeLaunchProfileAnnotation takes precedence over LaunchProfileAnnotation.
Expand Down
4 changes: 4 additions & 0 deletions src/Aspire.Hosting/Dcp/DcpOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ internal sealed class DcpOptions

public int KubernetesConfigReadRetryIntervalMilliseconds { get; set; } = 100;

public TimeSpan ServiceStartupWatchTimeout { get; set; } = TimeSpan.FromSeconds(10);

public void ApplyApplicationConfiguration(DistributedApplicationOptions appOptions, IConfiguration dcpPublisherConfiguration, IConfiguration publishingConfiguration, IConfiguration coreConfiguration)
{
string? publisher = publishingConfiguration[nameof(PublishingOptions.Publisher)];
Expand Down Expand Up @@ -144,6 +146,8 @@ public void ApplyApplicationConfiguration(DistributedApplicationOptions appOptio
{
throw new InvalidOperationException($"Could not resolve the path to the Aspire application host. The application cannot be run without it.");
}

ServiceStartupWatchTimeout = coreConfiguration.GetValue<TimeSpan>("DOTNET_ASPIRE_SERVICE_STARTUP_WATCH_TIMEOUT", ServiceStartupWatchTimeout);
}

private static string? GetMetadataValue(IEnumerable<AssemblyMetadataAttribute>? assemblyMetadata, string key)
Expand Down
33 changes: 33 additions & 0 deletions src/Aspire.Hosting/Dcp/KubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,13 @@ internal enum DcpApiOperationType
Delete = 3,
Watch = 4,
GetLogSubresource = 5,
Get = 6,
}

internal interface IKubernetesService
{
Task<T> GetAsync<T>(string name, string? namespaceParameter = null, CancellationToken cancellationToken = default)
where T: CustomResource;
Task<T> CreateAsync<T>(T obj, CancellationToken cancellationToken = default)
where T : CustomResource;
Task<List<T>> ListAsync<T>(string? namespaceParameter = null, CancellationToken cancellationToken = default)
Expand Down Expand Up @@ -54,6 +57,36 @@ internal sealed class KubernetesService(ILogger<KubernetesService> logger, IOpti

public TimeSpan MaxRetryDuration { get; set; } = TimeSpan.FromSeconds(20);

public Task<T> GetAsync<T>(string name, string? namespaceParameter = null, CancellationToken cancellationToken = default)
where T : CustomResource
{
var resourceType = GetResourceFor<T>();

return ExecuteWithRetry(
DcpApiOperationType.Get,
resourceType,
async (kubernetes) =>
{
var response = string.IsNullOrEmpty(namespaceParameter)
? await kubernetes.CustomObjects.GetClusterCustomObjectWithHttpMessagesAsync(
GroupVersion.Group,
GroupVersion.Version,
resourceType,
name,
cancellationToken: cancellationToken).ConfigureAwait(false)
: await kubernetes.CustomObjects.GetNamespacedCustomObjectWithHttpMessagesAsync(
GroupVersion.Group,
GroupVersion.Version,
namespaceParameter,
resourceType,
name,
cancellationToken: cancellationToken).ConfigureAwait(false);

return KubernetesJson.Deserialize<T>(response.Body.ToString());
},
cancellationToken);
}

public Task<T> CreateAsync<T>(T obj, CancellationToken cancellationToken = default)
where T : CustomResource
{
Expand Down
15 changes: 14 additions & 1 deletion tests/Aspire.Hosting.Tests/Dcp/MockKubernetesService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,20 @@ internal sealed class MockKubernetesService : IKubernetesService
{
internal sealed record DeletedResource(Type Type, object Value);

public List<object> CreatedResources { get; } = [];
public List<CustomResource> CreatedResources { get; } = [];

public Task<T> GetAsync<T>(string name, string? namespaceParameter = null, CancellationToken _ = default) where T : CustomResource
{
var res = CreatedResources.OfType<T>().FirstOrDefault(r =>
r.Metadata.Name == name &&
string.Equals(r.Metadata.NamespaceProperty ?? string.Empty, namespaceParameter ?? string.Empty)
);
if (res == null)
{
throw new ArgumentException($"Resource '{namespaceParameter ?? ""}/{name}' not found");
}
return Task.FromResult(res);
}

public Task<T> CreateAsync<T>(T obj, CancellationToken cancellationToken = default) where T : CustomResource
{
Expand Down

0 comments on commit 83bc549

Please sign in to comment.