Skip to content

Commit

Permalink
feat: Create subscriptions at a seek target (#1243)
Browse files Browse the repository at this point in the history
Support creating subscriptions at a nominated target location within the message backlog. A seek is performed for publish and event timestamps.

Export subscriptions (pre-release feature) are also supported.
  • Loading branch information
tmdiep authored Oct 6, 2022
1 parent 43ad527 commit c0802c3
Show file tree
Hide file tree
Showing 6 changed files with 412 additions and 2 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-pubsublite:1.7.0'
implementation 'com.google.cloud:google-cloud-pubsublite:1.7.1'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.7.0"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.7.1"
```

## Authentication
Expand Down
6 changes: 6 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Added method to AdminClient interface (Always okay) -->
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/AdminClient</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,20 @@ default ApiFuture<Subscription> createSubscription(Subscription subscription) {
ApiFuture<Subscription> createSubscription(
Subscription subscription, BacklogLocation startingOffset);

/**
* Create the provided subscription at the given target location within the message backlog, if it
* does not yet exist.
*
* <p>A seek is initiated if the target location is a publish or event time. If the seek fails,
* the created subscription is not deleted.
*
* @param subscription The subscription to create.
* @param target The target location that the subscription should be initialized to.
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
* the subscription on success.
*/
ApiFuture<Subscription> createSubscription(Subscription subscription, SeekTarget target);

/**
* Get the subscription with id {@code id} if it exists.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.cloud.pubsublite.proto.DeleteReservationRequest;
import com.google.cloud.pubsublite.proto.DeleteSubscriptionRequest;
import com.google.cloud.pubsublite.proto.DeleteTopicRequest;
import com.google.cloud.pubsublite.proto.ExportConfig;
import com.google.cloud.pubsublite.proto.GetReservationRequest;
import com.google.cloud.pubsublite.proto.GetSubscriptionRequest;
import com.google.cloud.pubsublite.proto.GetTopicPartitionsRequest;
Expand Down Expand Up @@ -166,6 +167,54 @@ public ApiFuture<Subscription> createSubscription(
.build());
}

@Override
public ApiFuture<Subscription> createSubscription(Subscription subscription, SeekTarget target) {
switch (target.getKind()) {
case BACKLOG_LOCATION:
return createSubscription(subscription, target.backlogLocation());
case PUBLISH_TIME:
case EVENT_TIME:
break;
}

// Export subscriptions must be paused while seeking. The state is later updated to active.
final boolean requiresUpdate =
subscription.hasExportConfig()
&& subscription.getExportConfig().getDesiredState() == ExportConfig.State.ACTIVE;
if (requiresUpdate) {
Subscription.Builder builder = subscription.toBuilder();
builder.getExportConfigBuilder().setDesiredState(ExportConfig.State.PAUSED);
subscription = builder.build();
}

// Request 1: create the subscription.
return ApiFutures.transformAsync(
createSubscription(subscription, BacklogLocation.BEGINNING),
newSubscription -> {
// Request 2: seek the subscription.
SubscriptionPath path = SubscriptionPath.parse(newSubscription.getName());
return ApiFutures.transformAsync(
seekSubscription(path, target).getInitialFuture(),
operation -> {
if (!requiresUpdate) {
return ApiFutures.immediateFuture(newSubscription);
}
// Request 3 (optional): make the export subscription active.
Subscription updatedSubscription =
Subscription.newBuilder()
.setName(newSubscription.getName())
.setExportConfig(
ExportConfig.newBuilder().setDesiredState(ExportConfig.State.ACTIVE))
.build();
FieldMask fieldMask =
FieldMask.newBuilder().addPaths("export_config.desired_state").build();
return updateSubscription(updatedSubscription, fieldMask);
},
SystemExecutors.getFuturesExecutor());
},
SystemExecutors.getFuturesExecutor());
}

@Override
public ApiFuture<Subscription> getSubscription(SubscriptionPath path) {
return serviceClient
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ExportConfig;
import com.google.cloud.pubsublite.proto.ExportConfig.PubSubConfig;
import com.google.cloud.pubsublite.proto.ExportConfig.State;
import com.google.cloud.pubsublite.proto.Reservation;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig;
Expand Down Expand Up @@ -58,6 +61,7 @@ private UnitTestExamples() {}
.put(Reservation.class, exampleReservation())
.put(LocationPath.class, exampleLocationPath())
.put(Offset.class, exampleOffset())
.put(ExportConfig.class, exampleExportConfig())
.build();

public static <T> T example(Class<T> klass) {
Expand Down Expand Up @@ -124,6 +128,13 @@ public static Subscription exampleSubscription() {
.build();
}

public static ExportConfig exampleExportConfig() {
return ExportConfig.newBuilder()
.setDesiredState(State.ACTIVE)
.setPubsubConfig(PubSubConfig.newBuilder().setTopic("pubsub_topic"))
.build();
}

public static ReservationName exampleReservationName() {
return ReservationName.of("example-reservation");
}
Expand Down
Loading

0 comments on commit c0802c3

Please sign in to comment.