Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async story to ClientCore #44277

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,61 @@
package io.clientcore.core.http.client;

import io.clientcore.core.http.models.HttpRequest;
import io.clientcore.core.http.models.RequestOptions;
import io.clientcore.core.http.models.Response;
import io.clientcore.core.implementation.http.client.DefaultHttpClientProvider;
import io.clientcore.core.utils.SharedExecutorService;

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;

/**
* A generic interface for sending HTTP requests and getting responses.
*/
public interface HttpClient {
/**
* Sends the provided request synchronously with contextual information.
* Sends the provided request.
*
* @param request The HTTP request to send.
* @return The response.
* @throws IOException If an I/O error occurs during sending the request or receiving the response.
*/
Response<?> send(HttpRequest request) throws IOException;

/**
* Sends the provided request asynchronously.
* <p>
* The asynchronous request will be sent using the {@link ExecutorService} provided in the
* {@link HttpRequest#getRequestOptions()} ({@link RequestOptions#getAsyncExecutor()}) if the {@link RequestOptions}
* and the {@link ExecutorService} is not null. If either is null, the request will be sent using a shared
* {@link SharedExecutorService}.
* <p>
* If an I/O error occurs while sending the request or receiving the response, the returned
* {@link CompletableFuture} will complete exceptionally.
*
* @param request The HTTP request to send.
* @return A CompletableFuture that will complete with the response.
*/
default CompletableFuture<Response<?>> sendAsync(HttpRequest request) {
ExecutorService asyncExecutor = (request.getRequestOptions() != null)
? request.getRequestOptions().getAsyncExecutor()
: null;

if (asyncExecutor == null) {
asyncExecutor = SharedExecutorService.getInstance();
}

return CompletableFuture.supplyAsync(() -> {
try {
return send(request);
} catch (IOException e) {
throw new CompletionException(e);
}
}, asyncExecutor);
}

/**
* Get a new instance of the {@link HttpClient} that the {@link HttpClientProvider} loaded from the classpath is
* configured to create.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,12 @@
import io.clientcore.core.implementation.http.rest.UriEscapers;
import io.clientcore.core.instrumentation.InstrumentationContext;
import io.clientcore.core.instrumentation.logging.ClientLogger;
import io.clientcore.core.utils.Context;
import io.clientcore.core.models.binarydata.BinaryData;
import io.clientcore.core.utils.Context;
import io.clientcore.core.utils.SharedExecutorService;

import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.function.Consumer;

/**
Expand Down Expand Up @@ -42,7 +44,7 @@
* To <a href="https://petstore.swagger.io/#/pet/addPet">add a new pet to the pet store</a>, an HTTP POST call should be
* made to the service with the details of the pet that is to be added. The details of the pet are included as the
* request body in JSON format.
*
* <p>
* The JSON structure for the request is defined as follows:
*
* <pre>{@code
Expand Down Expand Up @@ -121,6 +123,7 @@ public final class RequestOptions {
private boolean locked;
private ClientLogger logger;
private InstrumentationContext instrumentationContext;
private ExecutorService asyncExecutor;

/**
* Creates a new instance of {@link RequestOptions}.
Expand Down Expand Up @@ -174,9 +177,7 @@ public ClientLogger getLogger() {
* otherwise a new header will be created.</p>
*
* @param header The header key.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions addHeader(HttpHeader header) {
Expand All @@ -186,7 +187,6 @@ public RequestOptions addHeader(HttpHeader header) {
}

this.requestCallback = this.requestCallback.andThen(request -> request.getHeaders().add(header));

return this;
}

Expand All @@ -197,9 +197,7 @@ public RequestOptions addHeader(HttpHeader header) {
*
* @param header The header key.
* @param value The header value.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions setHeader(HttpHeaderName header, String value) {
Expand All @@ -209,7 +207,6 @@ public RequestOptions setHeader(HttpHeaderName header, String value) {
}

this.requestCallback = this.requestCallback.andThen(request -> request.getHeaders().set(header, value));

return this;
}

Expand All @@ -219,9 +216,7 @@ public RequestOptions setHeader(HttpHeaderName header, String value) {
*
* @param parameterName The name of the query parameter.
* @param value The value of the query parameter.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions addQueryParam(String parameterName, String value) {
Expand All @@ -236,9 +231,7 @@ public RequestOptions addQueryParam(String parameterName, String value) {
* @param parameterName The name of the query parameter.
* @param value The value of the query parameter.
* @param encoded Whether this query parameter is already encoded.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions addQueryParam(String parameterName, String value, boolean encoded) {
Expand All @@ -263,9 +256,7 @@ public RequestOptions addQueryParam(String parameterName, String value, boolean
* modifications made on a {@link RequestOptions} object are applied in order on the request.
*
* @param requestCallback The request callback.
*
* @return The updated {@link RequestOptions} object.
*
* @throws NullPointerException If {@code requestCallback} is {@code null}.
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
Expand All @@ -278,17 +269,14 @@ public RequestOptions addRequestCallback(Consumer<HttpRequest> requestCallback)
Objects.requireNonNull(requestCallback, "'requestCallback' cannot be null.");

this.requestCallback = this.requestCallback.andThen(requestCallback);

return this;
}

/**
* Sets the body to send as part of the {@link HttpRequest}.
*
* @param requestBody the request body data
*
* @return The updated {@link RequestOptions} object.
*
* @throws NullPointerException If {@code requestBody} is {@code null}.
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
Expand All @@ -301,17 +289,14 @@ public RequestOptions setBody(BinaryData requestBody) {
Objects.requireNonNull(requestBody, "'requestBody' cannot be null.");

this.requestCallback = this.requestCallback.andThen(request -> request.setBody(requestBody));

return this;
}

/**
* Sets the additional context on the request that is passed during the service call.
*
* @param context Additional context that is passed during the service call.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions setContext(Context context) {
Expand All @@ -321,7 +306,6 @@ public RequestOptions setContext(Context context) {
}

this.context = context;

return this;
}

Expand All @@ -331,7 +315,6 @@ public RequestOptions setContext(Context context) {
* @param key The key to add to the context.
* @param value The value to add to the context.
* @return The updated {@link RequestOptions} object.
*
* @see #setContext(Context)
*/
public RequestOptions putContext(Object key, Object value) {
Expand All @@ -341,7 +324,6 @@ public RequestOptions putContext(Object key, Object value) {
}

this.context = this.context.put(key, value);

return this;
}

Expand All @@ -353,9 +335,7 @@ public RequestOptions putContext(Object key, Object value) {
*
* @param responseBodyMode The configuration indicating how the body of the resulting HTTP response should be
* handled.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions setResponseBodyMode(ResponseBodyMode responseBodyMode) {
Expand All @@ -365,17 +345,14 @@ public RequestOptions setResponseBodyMode(ResponseBodyMode responseBodyMode) {
}

this.responseBodyMode = responseBodyMode;

return this;
}

/**
* Sets the {@link ClientLogger} used to log the request and response.
*
* @param logger The {@link ClientLogger} used to log the request and response.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions setLogger(ClientLogger logger) {
Expand All @@ -385,7 +362,36 @@ public RequestOptions setLogger(ClientLogger logger) {
}

this.logger = logger;
return this;
}

/**
* Gets the {@link ExecutorService} used to execute async operations.
* <p>
* If null, the default {@link SharedExecutorService} will be used.
*
* @return The {@link ExecutorService} used to execute async operations.
*/
public ExecutorService getAsyncExecutor() {
return asyncExecutor;
}

/**
* Sets the {@link ExecutorService} used to execute async operations.
* <p>
* If null, the default {@link SharedExecutorService} will be used.
*
* @param asyncExecutor The {@link ExecutorService} used to execute async operations.
* @return The updated {@link RequestOptions} object.
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions setAsyncExecutor(ExecutorService asyncExecutor) {
if (locked) {
throw LOGGER.logThrowableAsError(
new IllegalStateException("This instance of RequestOptions is immutable. Cannot set async executor."));
}

this.asyncExecutor = asyncExecutor;
return this;
}

Expand All @@ -396,7 +402,6 @@ public RequestOptions setLogger(ClientLogger logger) {
*/
private RequestOptions lock() {
locked = true;

return this;
}

Expand Down Expand Up @@ -424,9 +429,7 @@ public InstrumentationContext getInstrumentationContext() {
* Sets the {@link InstrumentationContext} used to instrument the request.
*
* @param instrumentationContext The {@link InstrumentationContext} used to instrument the request.
*
* @return The updated {@link RequestOptions} object.
*
* @throws IllegalStateException if this instance is obtained by calling {@link RequestOptions#none()}.
*/
public RequestOptions setInstrumentationContext(InstrumentationContext instrumentationContext) {
Expand All @@ -436,7 +439,6 @@ public RequestOptions setInstrumentationContext(InstrumentationContext instrumen
}

this.instrumentationContext = instrumentationContext;

return this;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;

/**
* The HTTP pipeline that HTTP requests and responses will flow through.
Expand Down Expand Up @@ -63,12 +64,23 @@ public HttpClient getHttpClient() {
* Sends the request through the pipeline.
*
* @param request THe HTTP request to send.
*
* @return An {@link Response}.
*/
public Response<?> send(HttpRequest request) {
HttpPipelineNextPolicy next = new HttpPipelineNextPolicy(new HttpPipelineCallState(this, request));

return next.process();
}

/**
* Sends the request through the pipeline asynchronously.
*
* @param request The HTTP request to send.
* @return A {@link CompletableFuture} that will complete with the {@link Response}.
*/
public CompletableFuture<Response<?>> sendAsync(HttpRequest request) {
HttpPipelineNextPolicy next = new HttpPipelineNextPolicy(new HttpPipelineCallState(this, request));

return next.processAsync();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.CompletableFuture;

/**
* A type that invokes next policy in the pipeline.
Expand Down Expand Up @@ -49,6 +50,20 @@ public Response<?> process() {
}
}

public CompletableFuture<Response<?>> processAsync() {
// TODO (alzimmer): Do we need a different design for async where this is doing something like
// CompletableFuture.thenCompose or CompletableFuture.thenApply, etc?
// I would imagine in most cases we don't want any thread switching happening while executing the pipeline,
// except maybe in the case of TokenCredentials where they might be making a network call to get a token.
HttpPipelinePolicy nextPolicy = state.getNextPolicy();

if (nextPolicy == null) {
return this.state.getPipeline().getHttpClient().sendAsync(this.state.getHttpRequest());
} else {
return nextPolicy.processAsync(this.state.getHttpRequest(), this);
}
}

/**
* Copies the current state of the {@link HttpPipelineNextPolicy}.
* <p>
Expand Down
Loading
Loading