Mercurial > hg > fxanalyse
diff FXAnalyse.c @ 240:7fd5cb857d07
Add data pubblication through ZMQ socket
author | Daniele Nicolodi <daniele.nicolodi@obspm.fr> |
---|---|
date | Thu, 12 Feb 2015 19:46:54 +0100 |
parents | 78fdba868884 |
children | b1dc2ba9a315 |
line wrap: on
line diff
--- a/FXAnalyse.c Thu Feb 12 19:13:55 2015 +0100 +++ b/FXAnalyse.c Thu Feb 12 19:46:54 2015 +0100 @@ -1,3 +1,4 @@ +#include <zmq.h> #include <tcpsupp.h> #include <utility.h> #include <ansi_c.h> @@ -49,6 +50,28 @@ // data provider thread CmtThreadFunctionID dataProviderThread; +// ZMQ +void *zmqcontext; +void *zmqsocket; + +// utility function to send data through ZMQ socket framed by an envelope +// see "Pub-Sub Message Envelopes" in chapter 2 "Sockets and Patterns" +// of "ZMQ The Guide" http://zguide.zeromq.org/page:all#toc49 +int zmq_xpub(void *socket, char *envelope, void *data, size_t len) +{ + int r; + + r = zmq_send(socket, envelope, strlen(envelope), ZMQ_SNDMORE); + if (r < 0) + return zmq_errno(); + + r = zmq_send(socket, data, len, 0); + if (r < 0) + return zmq_errno(); + + return 0; +} + struct event ev; double utc; @@ -641,6 +664,19 @@ ad9912_set_frequency_w(&ad9912, 2, frequency); GetCtrlVal(MainPanel, PANEL_DDS4, &frequency); ad9912_set_frequency_w(&ad9912, 3, frequency); + + // setup ZMQ pub socket + char *socket; + rv = Ini_GetStringCopy(configuration, "ZMQ", "socket", &socket); + if (! rv) + socket = strdup("tcp://127.0.0.1:3456"); + logmessage(INFO, "data sent to ZMQ socket '%s'", socket); + zmqcontext = zmq_ctx_new(); + zmqsocket = zmq_socket(zmqcontext, ZMQ_PUB); + rv = zmq_bind(zmqsocket, socket); + if (rv) + logmessage(ERROR, "cannot bind ZMQ socket '%s': %s", socket, zmq_strerror(zmq_errno())); + free(socket); // dispose configuration Ini_Dispose(configuration); @@ -1451,6 +1487,11 @@ // send Sr frequency (Math4) to Sr data logger sr_datalogger_send(&datalogger, utc, Math4); + + // publish data through ZMQ + int r = zmq_xpub(zmqsocket, "RAW", &ev, sizeof(ev)); + if (r) + logmessage(ERROR, "cannot send data through ZMQ socket: %s", zmq_strerror(r)); } break; }