Mercurial > hg > fxanalyse
comparison FXAnalyse.c @ 240:7fd5cb857d07
Add data pubblication through ZMQ socket
author | Daniele Nicolodi <daniele.nicolodi@obspm.fr> |
---|---|
date | Thu, 12 Feb 2015 19:46:54 +0100 |
parents | 78fdba868884 |
children | b1dc2ba9a315 |
comparison
equal
deleted
inserted
replaced
239:ec81395bf08d | 240:7fd5cb857d07 |
---|---|
1 #include <zmq.h> | |
1 #include <tcpsupp.h> | 2 #include <tcpsupp.h> |
2 #include <utility.h> | 3 #include <utility.h> |
3 #include <ansi_c.h> | 4 #include <ansi_c.h> |
4 #include <lowlvlio.h> | 5 #include <lowlvlio.h> |
5 #include <cvirte.h> | 6 #include <cvirte.h> |
47 // data queue | 48 // data queue |
48 CmtTSQHandle dataQueue; | 49 CmtTSQHandle dataQueue; |
49 // data provider thread | 50 // data provider thread |
50 CmtThreadFunctionID dataProviderThread; | 51 CmtThreadFunctionID dataProviderThread; |
51 | 52 |
53 // ZMQ | |
54 void *zmqcontext; | |
55 void *zmqsocket; | |
56 | |
57 // utility function to send data through ZMQ socket framed by an envelope | |
58 // see "Pub-Sub Message Envelopes" in chapter 2 "Sockets and Patterns" | |
59 // of "ZMQ The Guide" http://zguide.zeromq.org/page:all#toc49 | |
60 int zmq_xpub(void *socket, char *envelope, void *data, size_t len) | |
61 { | |
62 int r; | |
63 | |
64 r = zmq_send(socket, envelope, strlen(envelope), ZMQ_SNDMORE); | |
65 if (r < 0) | |
66 return zmq_errno(); | |
67 | |
68 r = zmq_send(socket, data, len, 0); | |
69 if (r < 0) | |
70 return zmq_errno(); | |
71 | |
72 return 0; | |
73 } | |
74 | |
52 | 75 |
53 struct event ev; | 76 struct event ev; |
54 double utc; | 77 double utc; |
55 #define Ch1 ev.data[0] | 78 #define Ch1 ev.data[0] |
56 #define Ch2 ev.data[1] | 79 #define Ch2 ev.data[1] |
639 ad9912_set_frequency_w(&ad9912, 1, frequency); | 662 ad9912_set_frequency_w(&ad9912, 1, frequency); |
640 GetCtrlVal(MainPanel, PANEL_DDS3, &frequency); | 663 GetCtrlVal(MainPanel, PANEL_DDS3, &frequency); |
641 ad9912_set_frequency_w(&ad9912, 2, frequency); | 664 ad9912_set_frequency_w(&ad9912, 2, frequency); |
642 GetCtrlVal(MainPanel, PANEL_DDS4, &frequency); | 665 GetCtrlVal(MainPanel, PANEL_DDS4, &frequency); |
643 ad9912_set_frequency_w(&ad9912, 3, frequency); | 666 ad9912_set_frequency_w(&ad9912, 3, frequency); |
667 | |
668 // setup ZMQ pub socket | |
669 char *socket; | |
670 rv = Ini_GetStringCopy(configuration, "ZMQ", "socket", &socket); | |
671 if (! rv) | |
672 socket = strdup("tcp://127.0.0.1:3456"); | |
673 logmessage(INFO, "data sent to ZMQ socket '%s'", socket); | |
674 zmqcontext = zmq_ctx_new(); | |
675 zmqsocket = zmq_socket(zmqcontext, ZMQ_PUB); | |
676 rv = zmq_bind(zmqsocket, socket); | |
677 if (rv) | |
678 logmessage(ERROR, "cannot bind ZMQ socket '%s': %s", socket, zmq_strerror(zmq_errno())); | |
679 free(socket); | |
644 | 680 |
645 // dispose configuration | 681 // dispose configuration |
646 Ini_Dispose(configuration); | 682 Ini_Dispose(configuration); |
647 | 683 |
648 // Sr data logger | 684 // Sr data logger |
1449 for (struct datafile *d = datafiles; d->data; d++) | 1485 for (struct datafile *d = datafiles; d->data; d++) |
1450 datafile_append(d, id, timestr); | 1486 datafile_append(d, id, timestr); |
1451 | 1487 |
1452 // send Sr frequency (Math4) to Sr data logger | 1488 // send Sr frequency (Math4) to Sr data logger |
1453 sr_datalogger_send(&datalogger, utc, Math4); | 1489 sr_datalogger_send(&datalogger, utc, Math4); |
1490 | |
1491 // publish data through ZMQ | |
1492 int r = zmq_xpub(zmqsocket, "RAW", &ev, sizeof(ev)); | |
1493 if (r) | |
1494 logmessage(ERROR, "cannot send data through ZMQ socket: %s", zmq_strerror(r)); | |
1454 } | 1495 } |
1455 break; | 1496 break; |
1456 } | 1497 } |
1457 } | 1498 } |
1458 | 1499 |