comparison FXAnalyse.c @ 240:7fd5cb857d07

Add data pubblication through ZMQ socket
author Daniele Nicolodi <daniele.nicolodi@obspm.fr>
date Thu, 12 Feb 2015 19:46:54 +0100
parents 78fdba868884
children b1dc2ba9a315
comparison
equal deleted inserted replaced
239:ec81395bf08d 240:7fd5cb857d07
1 #include <zmq.h>
1 #include <tcpsupp.h> 2 #include <tcpsupp.h>
2 #include <utility.h> 3 #include <utility.h>
3 #include <ansi_c.h> 4 #include <ansi_c.h>
4 #include <lowlvlio.h> 5 #include <lowlvlio.h>
5 #include <cvirte.h> 6 #include <cvirte.h>
47 // data queue 48 // data queue
48 CmtTSQHandle dataQueue; 49 CmtTSQHandle dataQueue;
49 // data provider thread 50 // data provider thread
50 CmtThreadFunctionID dataProviderThread; 51 CmtThreadFunctionID dataProviderThread;
51 52
53 // ZMQ
54 void *zmqcontext;
55 void *zmqsocket;
56
57 // utility function to send data through ZMQ socket framed by an envelope
58 // see "Pub-Sub Message Envelopes" in chapter 2 "Sockets and Patterns"
59 // of "ZMQ The Guide" http://zguide.zeromq.org/page:all#toc49
60 int zmq_xpub(void *socket, char *envelope, void *data, size_t len)
61 {
62 int r;
63
64 r = zmq_send(socket, envelope, strlen(envelope), ZMQ_SNDMORE);
65 if (r < 0)
66 return zmq_errno();
67
68 r = zmq_send(socket, data, len, 0);
69 if (r < 0)
70 return zmq_errno();
71
72 return 0;
73 }
74
52 75
53 struct event ev; 76 struct event ev;
54 double utc; 77 double utc;
55 #define Ch1 ev.data[0] 78 #define Ch1 ev.data[0]
56 #define Ch2 ev.data[1] 79 #define Ch2 ev.data[1]
639 ad9912_set_frequency_w(&ad9912, 1, frequency); 662 ad9912_set_frequency_w(&ad9912, 1, frequency);
640 GetCtrlVal(MainPanel, PANEL_DDS3, &frequency); 663 GetCtrlVal(MainPanel, PANEL_DDS3, &frequency);
641 ad9912_set_frequency_w(&ad9912, 2, frequency); 664 ad9912_set_frequency_w(&ad9912, 2, frequency);
642 GetCtrlVal(MainPanel, PANEL_DDS4, &frequency); 665 GetCtrlVal(MainPanel, PANEL_DDS4, &frequency);
643 ad9912_set_frequency_w(&ad9912, 3, frequency); 666 ad9912_set_frequency_w(&ad9912, 3, frequency);
667
668 // setup ZMQ pub socket
669 char *socket;
670 rv = Ini_GetStringCopy(configuration, "ZMQ", "socket", &socket);
671 if (! rv)
672 socket = strdup("tcp://127.0.0.1:3456");
673 logmessage(INFO, "data sent to ZMQ socket '%s'", socket);
674 zmqcontext = zmq_ctx_new();
675 zmqsocket = zmq_socket(zmqcontext, ZMQ_PUB);
676 rv = zmq_bind(zmqsocket, socket);
677 if (rv)
678 logmessage(ERROR, "cannot bind ZMQ socket '%s': %s", socket, zmq_strerror(zmq_errno()));
679 free(socket);
644 680
645 // dispose configuration 681 // dispose configuration
646 Ini_Dispose(configuration); 682 Ini_Dispose(configuration);
647 683
648 // Sr data logger 684 // Sr data logger
1449 for (struct datafile *d = datafiles; d->data; d++) 1485 for (struct datafile *d = datafiles; d->data; d++)
1450 datafile_append(d, id, timestr); 1486 datafile_append(d, id, timestr);
1451 1487
1452 // send Sr frequency (Math4) to Sr data logger 1488 // send Sr frequency (Math4) to Sr data logger
1453 sr_datalogger_send(&datalogger, utc, Math4); 1489 sr_datalogger_send(&datalogger, utc, Math4);
1490
1491 // publish data through ZMQ
1492 int r = zmq_xpub(zmqsocket, "RAW", &ev, sizeof(ev));
1493 if (r)
1494 logmessage(ERROR, "cannot send data through ZMQ socket: %s", zmq_strerror(r));
1454 } 1495 }
1455 break; 1496 break;
1456 } 1497 }
1457 } 1498 }
1458 1499