Skip to content

Commit

Permalink
fix: retry Cancelled stream errors (#1124)
Browse files Browse the repository at this point in the history
* fix: retry Cancelled stream errors

* Fix lint
  • Loading branch information
tmdiep authored Apr 26, 2022
1 parent ab92e3f commit 6d54da8
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ public final class ErrorCodes {
Code.INTERNAL,
Code.UNAVAILABLE,
Code.UNKNOWN,
Code.RESOURCE_EXHAUSTED);
Code.RESOURCE_EXHAUSTED,
Code.CANCELLED);

public static boolean IsRetryableForStreams(Code code) {
return STREAM_RETRYABLE_CODES.contains(code);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ private void rebatchForRestart() {
messages.add(UnbatchedMessage.of(batch.messages.get(i), batch.messageFutures.get(i)));
}
}
logger.atFiner().log("Re-publishing %s messages after reconnection", messages.size());
logger.atFiner().log(
"Re-publishing %s messages after reconnection for partition %s",
messages.size(), initialRequest.getInitialRequest().getPartition());
long size = 0;
int count = 0;
Queue<UnbatchedMessage> currentBatch = new ArrayDeque<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ public void reinitialize(StreamRequestT initialRequest) {
lastInitialRequest = initialRequest;
logger.atFiner().log("Start initializing connection for %s", streamDescription());
currentConnection = connectionFactory.New(streamFactory, this, lastInitialRequest);
logger.atFiner().log("Initialized connection for %s", streamDescription());
logger.atFiner().log("Finished initializing connection for %s", streamDescription());
}
}

Expand All @@ -123,7 +123,6 @@ protected void doStop() {
notifyFailed(t);
return;
}
logger.atFine().log("Terminated connection for %s", streamDescription());
notifyStopped();
}

Expand All @@ -144,6 +143,7 @@ void setPermanentError(Throwable error) {
if (completed) return;
completed = true;
}
logger.atInfo().withCause(error).log("Permanent error occurred for %s", streamDescription());
notifyFailed(error);
}

Expand Down

0 comments on commit 6d54da8

Please sign in to comment.