-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added scheduled session cleanup #10802
Added scheduled session cleanup #10802
Conversation
…ore last session command processed
application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java
Outdated
Show resolved
Hide resolved
application/src/main/java/org/thingsboard/server/service/ws/DefaultWebSocketService.java
Outdated
Show resolved
Hide resolved
@@ -539,6 +539,11 @@ public void close(WebSocketSessionRef sessionRef, CloseStatus reason) throws IOE | |||
} | |||
} | |||
|
|||
@Override | |||
public boolean isOpen(String sessionId) { | |||
return externalSessionMap.containsKey(sessionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that we should additionally check the actual native WS session:
sessionMd.session.isOpen()
.
@ashvayka Do you agree?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure. If the session is removed from the map, it is no longer used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just in case :)
wsSessionsMap.values().forEach(md -> { | ||
String sessionId = md.getSessionRef().getSessionId(); | ||
if (!msgEndpoint.isOpen(sessionId)) { | ||
log.info("Cleaning up stale session [sessionId: {}, tenantId: {}, userId: {}] ", sessionId, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use standard way of logging: "[tenantId][userId][sessionId] Cleaning up stale session"
md.getSessionRef().getSecurityCtx().getTenantId(), md.getSessionRef().getSecurityCtx().getId()); | ||
wsSessionsMap.remove(sessionId); | ||
oldSubService.cancelAllSessionSubscriptions(sessionId); | ||
entityDataSubService.cancelAllSessionSubscriptions(sessionId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't we need to also call processSessionClose(sessionRef)
?
@@ -107,6 +113,8 @@ public void initExecutor() { | |||
subscriptionUpdateExecutor = ThingsBoardExecutors.newWorkStealingPool(20, getClass()); | |||
tsCallBackExecutor = Executors.newSingleThreadExecutor(ThingsBoardThreadFactory.forName("ts-sub-callback")); | |||
serviceId = serviceInfoProvider.getServiceId(); | |||
staleSessionCleanupExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("stale-session-cleanup")); | |||
staleSessionCleanupExecutor.scheduleWithFixedDelay(this::cleanupStaleSessions, 0, 60000, TimeUnit.MILLISECONDS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Initial delay - also 60 seconds
if (sessionSubscriptions.isEmpty()) { | ||
subscriptionsBySessionId.remove(sessionId); | ||
ModifySubscriptionResult modifySubscriptionResult = null; | ||
subsLock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move up
} | ||
|
||
private void pushSubscriptionEvent(ModifySubscriptionResult modifySubResult) { | ||
TbEntitySubEvent event = modifySubResult.getEvent(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
try catch with logging (warn) for both pushSubscriptionEvent and modifySubscription
private TbSubscription<?> missedUpdatesCandidate; | ||
private TbEntitySubEvent event; | ||
|
||
public boolean isEmpty() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hasEvent
modifySubscription(subscription.getTenantId(), subscription.getEntityId(), subscription, false); | ||
} catch (Exception e) { | ||
log.warn("[{}][{}] Failed to remove subscription {} due to ", subscription.getTenantId(), subscription.getEntityId(), subscription, e); | ||
List<ModifySubscriptionResult> modifySubscriptionResults = new ArrayList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-> SubscriptionModificationResult
, for variable name might just use 'result(s)'
Pull Request description
Added scheduled session cleanup for cases when session was closed before last session command processed
General checklist
Front-End feature checklist
Back-End feature checklist