Skip to content

Commit

Permalink
fix: Flush messages in serial executor before handling admin seek (#1079
Browse files Browse the repository at this point in the history
)

Messages queued in the serial executor should not be delivered post-seek.
  • Loading branch information
tmdiep authored Mar 17, 2022
1 parent c293c24 commit 49bed38
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 37 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies

```Groovy
implementation 'com.google.cloud:google-cloud-pubsublite:1.4.12'
implementation 'com.google.cloud:google-cloud-pubsublite:1.5.0'
```

If you are using SBT, add this to your dependencies

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.4.12"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.5.0"
```

## Authentication
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,27 +16,44 @@

package com.google.cloud.pubsublite.internal;

import com.google.common.util.concurrent.Monitor.Guard;
import java.util.ArrayDeque;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.annotation.concurrent.GuardedBy;

/** An executor that runs tasks sequentially. */
public final class SerialExecutor implements AutoCloseable, Executor {
private final Executor executor;
private final AtomicBoolean isShutdown = new AtomicBoolean(false);

@GuardedBy("this")
private final CloseableMonitor monitor = new CloseableMonitor();
private final Guard isInactive =
new Guard(monitor.monitor) {
@Override
public boolean isSatisfied() {
return !isTaskActive;
}
};

@GuardedBy("monitor.monitor")
private final Queue<Runnable> tasks;

@GuardedBy("this")
@GuardedBy("monitor.monitor")
private boolean isTaskActive;

@GuardedBy("monitor.monitor")
private boolean isShutdown;

public SerialExecutor(Executor executor) {
this.executor = executor;
this.tasks = new ArrayDeque<>();
this.isTaskActive = false;
this.isShutdown = false;
}

/** Waits until there are no active tasks. */
public void waitUntilInactive() {
try (CloseableMonitor.Hold h = monitor.enterWhenUninterruptibly(isInactive)) {}
}

/**
Expand All @@ -45,34 +62,45 @@ public SerialExecutor(Executor executor) {
*/
@Override
public void close() {
isShutdown.set(true);
try (CloseableMonitor.Hold h = monitor.enter()) {
isShutdown = true;
}
}

@Override
public synchronized void execute(Runnable r) {
if (isShutdown.get()) {
return;
public void execute(Runnable r) {
try (CloseableMonitor.Hold h = monitor.enter()) {
if (isShutdown) {
return;
}
tasks.add(
() -> {
try {
if (shouldExecuteTask()) {
r.run();
}
} finally {
scheduleNextTask();
}
});
if (!isTaskActive) {
scheduleNextTask();
}
}
tasks.add(
() -> {
if (isShutdown.get()) {
return;
}
try {
r.run();
} finally {
scheduleNextTask();
}
});
if (!isTaskActive) {
scheduleNextTask();
}

private boolean shouldExecuteTask() {
try (CloseableMonitor.Hold h = monitor.enter()) {
return !isShutdown;
}
}

private synchronized void scheduleNextTask() {
isTaskActive = !tasks.isEmpty();
if (isTaskActive) {
executor.execute(tasks.poll());
private void scheduleNextTask() {
try (CloseableMonitor.Hold h = monitor.enter()) {
isTaskActive = !tasks.isEmpty() && !isShutdown;
if (isTaskActive) {
executor.execute(tasks.poll());
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,10 @@ public void triggerReinitialize(CheckedApiException streamError) {
}
if (ResetSignal.isResetSignal(streamError)) {
try {
// Flush pre-seek messages.
messageDeliveryExecutor.waitUntilInactive();
if (resetHandler.handleReset()) {
// Wait for cursor commit.
reset();
}
} catch (CheckedApiException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,10 @@
package com.google.cloud.pubsublite.internal;

import static com.google.common.truth.Truth.assertThat;
import static java.util.concurrent.TimeUnit.SECONDS;

import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
Expand All @@ -35,18 +33,16 @@ public final class SerialExecutorTest {
public void serializesTasks() throws Exception {
final int numTasks = 100;
List<Integer> receivedSequences = new ArrayList<>();
CountDownLatch tasksDone = new CountDownLatch(numTasks);
for (int i = 0; i < numTasks; i++) {
int sequence = i;
executor.execute(
() -> {
synchronized (receivedSequences) {
receivedSequences.add(sequence);
}
tasksDone.countDown();
});
}
assertThat(tasksDone.await(30, SECONDS)).isTrue();
executor.waitUntilInactive();

for (int i = 0; i < receivedSequences.size(); i++) {
assertThat(receivedSequences.get(i)).isEqualTo(i);
Expand All @@ -56,19 +52,17 @@ public void serializesTasks() throws Exception {
@Test
public void closeDiscardsTasks() throws Exception {
List<Integer> receivedSequences = new ArrayList<>();
CountDownLatch tasksDone = new CountDownLatch(1);
for (int i = 0; i < 10; i++) {
int sequence = i;
executor.execute(
() -> {
synchronized (receivedSequences) {
receivedSequences.add(sequence);
}
tasksDone.countDown();
executor.close();
});
}
assertThat(tasksDone.await(10, SECONDS)).isTrue();
executor.waitUntilInactive();

assertThat(receivedSequences).containsExactly(0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -354,8 +354,6 @@ public void reinitialize_handlesSuccessfulReset() throws Exception {
SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10));
CountDownLatch messagesReceived = countdownMessageBatches(1);
leakedResponseObserver.onResponse(messages);
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
verify(mockMessageConsumer).accept(messages);

doAnswer(
args -> {
Expand All @@ -369,6 +367,8 @@ public void reinitialize_handlesSuccessfulReset() throws Exception {
// from the committed cursor upon reconnect.
when(mockResetHandler.handleReset()).thenReturn(true);
subscriber.triggerReinitialize(TestResetSignal.newCheckedException());
assertThat(messagesReceived.await(10, SECONDS)).isTrue();
verify(mockMessageConsumer).accept(messages); // Pre-seek messages always received.
verify(mockSubscriberFactory, times(2)).New(any(), any(), eq(initialRequest()));
verify(mockConnectedSubscriber2)
.allowFlow(
Expand Down

0 comments on commit 49bed38

Please sign in to comment.