You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
Hello,
I'm trying to use libmosquitto in an application with many open fds.
Due to issue #1299, mosquitto_loop() function cannot be relied upon, so I attempted to implement my own alternative based on epoll(), mosquitto_loop_read(), mosquitto_loop_write() and mosquitto_loop_misc() instead, using answer to #2335 as inspiration.
The code looks like this:
struct epoll_event ev;
int epfd = epoll_create1(0);
if (epfd < 0)
{
fprintf (stderr, "epfd < 0: %d\n", epfd);
exit (-1);
}
int sock = mosquitto_socket(mosq);
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
int err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
if (err != 0)
{
fprintf (stderr, "err != 0: %d\n", err);
exit (-1);
}
while ( !stopAsked )
{
int result;
if ( mosquitto_want_write(mosq) )
{
// Re-arm EPOLLOUT event...
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
err = epoll_ctl(epfd, EPOLL_CTL_MOD, ev.data.fd, &ev);
}
err = epoll_wait(epfd, &ev, 1, MQTT_POLL_INTERVAL_MS);
if (err < 0)
{
fprintf (stderr, "epoll_wait returned %d, errno: %d %s\n", err, errno, strerror(errno));
}
else if (err > 0)
{
if (ev.events & EPOLLIN)
{
result = mosquitto_loop_read(mosq, 1);
if ( result != MOSQ_ERR_SUCCESS && result != MOSQ_ERR_KEEPALIVE )
{
goto __handle_result;
}
else if ( result == MOSQ_ERR_KEEPALIVE )
{
fprintf (stderr, "mosquitto_loop_read() returned MOSQ_ERR_KEEPALIVE\n");
}
}
if ( (ev.events & EPOLLOUT)
&& mosquitto_want_write(mosq) )
{
result = mosquitto_loop_write(mosq, 1);
if ( result != MOSQ_ERR_SUCCESS && result != MOSQ_ERR_KEEPALIVE )
{
goto __handle_result;
}
else if ( result == MOSQ_ERR_KEEPALIVE )
{
fprintf (stderr, "mosquitto_loop_write() returned MOSQ_ERR_KEEPALIVE\n");
}
}
}
result = mosquitto_loop_misc(mosq);
if ( result == MOSQ_ERR_KEEPALIVE )
{
fprintf (stderr, "mosquitto_loop_misc() returned MOSQ_ERR_KEEPALIVE\n");
}
__handle_result:
switch (result)
{
case MOSQ_ERR_SUCCESS:
case MOSQ_ERR_KEEPALIVE:
break;
case MOSQ_ERR_INVAL:
fprintf (stderr, "mosquitto_loop: Invalid parameter!\n");
exit (-1);
case MOSQ_ERR_NOMEM:
fprintf (stderr, "mosquitto_loop: Out of memory!\n");
exit (-1);
case MOSQ_ERR_NO_CONN:
fprintf (stderr, "mosquitto_loop: No connection. Reconnecting...\n");
// Stop watching old socket FD...
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, &ev);
// (Re)connect to broker...
result = mosquitto_connect(mosq, mqttSubHost, mqttSubPort, MQTT_CONNECT_KEEPALIVE);
if (result != MOSQ_ERR_SUCCESS)
{
fprintf (stderr, "Could not connect to broker: %d (%d/%s)\n", result, errno, strerror(errno));
exit (-1);
}
// Start watching new socket FD...
ev.data.fd = sock = mosquitto_socket(mosq);
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
if (err != 0)
{
fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, sock );
exit (-1);
}
break;
case MOSQ_ERR_CONN_LOST:
fprintf (stderr, "mosquitto_loop: Connection lost. Reconnecting...\n");
// Stop watching old socket FD...
ev.data.fd = sock;
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
epoll_ctl(epfd, EPOLL_CTL_DEL, ev.data.fd, &ev);
// Reconnect...
mosquitto_reconnect(mosq);
// Start watching new socket FD...
ev.data.fd = sock = mosquitto_socket(mosq);
ev.events = EPOLLIN | EPOLLOUT | EPOLLET;
err = epoll_ctl(epfd, EPOLL_CTL_ADD, ev.data.fd, &ev);
if (err != 0)
{
fprintf (stderr, "Could not watch new socket, err != 0: %d, socket = %d\n", err, sock );
exit (-1);
}
break;
case MOSQ_ERR_PROTOCOL:
fprintf (stderr, "mosquitto_loop: Protocol error!\n");
break;
case MOSQ_ERR_ERRNO:
fprintf (stderr, "mosquitto_loop: syscall error; %s (%d)\n", strerror(errno), errno);
break;
default:
fprintf (stderr, "mosquitto_loop: unknown error %d!\n", result);
break;
}
}
I'm encountering stability issues, with mosquitto_loop_misc() returning MOSQ_ERR_KEEPALIVE, which is not a documented return value and seems to be returned from mosquitto__check_keepalive(), but I don't know what to do with this result.
My client eventually gets disconnected, with the following log on the broker side:
1709911264: Socket error on client XYZ, disconnecting.
When attempting this with a TLS connection, it gets worse and the client is missing a lot of messages (client is consumer-only with many topic subscriptions, managed from another thread).
Is there something I'm doing wrong, or some way I can work around this issue?
Regards,
The text was updated successfully, but these errors were encountered:
Hello,
Here is the complete source code for an epoll-based command-line subscriber exhibiting the issue: https://gist.github.com/erkoln/90a4efbf7c3f94d8c76a9a355939e2a2
I also tested this on Ubuntu 22.04 using deb packages in version 2.0.11 for both the mosquitto broker and library, as well as mosquitto 2.0.18 built from git repo.
The sample code makes unnecessary use of a thread for servicing the MQTT connection; it doesn't make any differences when calling consumer_loop() directly from main() but that was my attempt testing if it was a thread-related issue.
The problem seems to have something to do with the use of edge-triggered epoll. The epoll fd apparently needs to be be rearmed in more cases but I can't figure out what libmosquitto API I could base that on.
Removing EPOLLET everywhere works but I'm afraid it would cause a waste of CPU cycles.
Any suggestion would be welcome.
Best regards,
mosquitto version: 1.6.10
platform: CentOS 7.9
Hello,
I'm trying to use libmosquitto in an application with many open fds.
Due to issue #1299,
mosquitto_loop()
function cannot be relied upon, so I attempted to implement my own alternative based onepoll()
,mosquitto_loop_read()
,mosquitto_loop_write()
andmosquitto_loop_misc()
instead, using answer to #2335 as inspiration.The code looks like this:
I'm encountering stability issues, with
mosquitto_loop_misc()
returningMOSQ_ERR_KEEPALIVE
, which is not a documented return value and seems to be returned frommosquitto__check_keepalive()
, but I don't know what to do with this result.My client eventually gets disconnected, with the following log on the broker side:
When attempting this with a TLS connection, it gets worse and the client is missing a lot of messages (client is consumer-only with many topic subscriptions, managed from another thread).
Is there something I'm doing wrong, or some way I can work around this issue?
Regards,
The text was updated successfully, but these errors were encountered: