Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT messages sent when connectivity service was stopped are not processed #1868

Open
dimabarbul opened this issue Jan 19, 2024 · 2 comments

Comments

@dimabarbul
Copy link
Contributor

Recently we had similar issue #1767 and it was fixed. This issue has different cause.

When connectivity service is being stopped, it stops all connections and unsubscribes (link to code) from all MQTT topics. In MQTT if client unsubscribes from topic, then new messages for this topic will not be kept by broker in the client session and will not be delivered on next client connect. This means that even if connection has "cleanSession" set to false and "qos" 1 or 2, the messages will not be delivered when connectivity service will start again.

Following are my thoughts on that.
My initial idea was to not do unsubscribing for MQTT at all. For cases if we do not need session (for example, we set ditto.connectivity.connection.mqtt.session-expiry-interval to 0s) this will have no impact as broker will discard session upon client disconnect.
But this approach will not work for edge case when we change list of topic we want to subscribe to. Apart from session, broker remembers client subscriptions in session: if client subscribed to topic "topic1", then disconnected and connected back, it will be subscribed to "topic1" and will receive all new messages for this topic. So if we change list of topics for connection and do not change client ID, the client will receive messages from all previous and newly subscribed topics.

@thjaeckle
Copy link
Member

Good catch, thanks for finding and bringing it up 👍

My thoughts:

  • Yes, I also think, we should not unsubscribe from the MqttClient in stopConsuming()
  • But we should unsubscribe in other cases (as you mentioned regarding e.g. the topic change):
    • when a connection is actively closed
    • when a connection is edited
      • even more sophisticated would be of course to only unsubscribe when "relevant" parts were edited, e.g. the topics to consume from changed

@dimabarbul
Copy link
Contributor Author

dimabarbul commented Jan 28, 2024

I'm fairly new to Akka/Pekko, so I might get it wrong. But from what I read I don't see possibility (for now) to distinguish between closing connection and modifying connection.
I checked CloseConnectionStrategy.java and ModifyConnectionStrategy.java and they do the same things when connection is open, but now should be closed: close connection, stop actor:

    actions = Arrays.asList(ConnectionAction.PERSIST_AND_APPLY_EVENT, ConnectionAction.UPDATE_SUBSCRIPTIONS, ConnectionAction.CLOSE_CONNECTION,
            ConnectionAction.STOP_CLIENT_ACTORS, ConnectionAction.SEND_RESPONSE);

Moreover, modifying of open connection closes it first, then stops actors and then opens it again. This makes MQTT actor unaware if we want to create brand new connection or update existing one. So it needs to be taken to upper level - ConnectionPersistenceActor, I guess.

Modifying of closed connection does not do any unsubscribe, apparently. So if we modify closed connection to closed one, we'll need to do unsubscribe on opening the connection.

Having that said, understanding if the MQTT client should unsubscribe (from some or all topics) or not does not seem a straightforward thing.

To be honest, I don't imagine how it would be intuitive to understand if we want just to close connection or close and discard saved session. Maybe we could introduce some header, e.g., ditto-mqtt-clear-session and process it when connection is being closed (for example, when processing CloseConnection or ModifyConnection commands), and not do unsubscribe anywhere else? I'm not fan of magic headers, but guessing if user wants to clear session seem to be complicated and prone to mistakes. Introducing new commands only for MQTT (like, CloseConnectionAndClear) does not seem reasonable to me.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants