diff FXAnalyse.c @ 240:7fd5cb857d07

Add data pubblication through ZMQ socket
author Daniele Nicolodi <daniele.nicolodi@obspm.fr>
date Thu, 12 Feb 2015 19:46:54 +0100
parents 78fdba868884
children b1dc2ba9a315
line wrap: on
line diff
--- a/FXAnalyse.c	Thu Feb 12 19:13:55 2015 +0100
+++ b/FXAnalyse.c	Thu Feb 12 19:46:54 2015 +0100
@@ -1,3 +1,4 @@
+#include <zmq.h>
 #include <tcpsupp.h>
 #include <utility.h>
 #include <ansi_c.h>
@@ -49,6 +50,28 @@
 // data provider thread
 CmtThreadFunctionID dataProviderThread;
 
+// ZMQ
+void *zmqcontext;
+void *zmqsocket;
+
+// utility function to send data through ZMQ socket framed by an envelope
+// see "Pub-Sub Message Envelopes" in chapter 2 "Sockets and Patterns" 
+// of "ZMQ The Guide" http://zguide.zeromq.org/page:all#toc49
+int zmq_xpub(void *socket, char *envelope, void *data, size_t len)
+{
+	int r;
+	
+	r = zmq_send(socket, envelope, strlen(envelope), ZMQ_SNDMORE);
+	if (r < 0)
+		return zmq_errno();
+	
+	r = zmq_send(socket, data, len, 0);
+	if (r < 0)
+		return zmq_errno();
+
+	return 0;
+}
+	
 
 struct event ev;
 double utc;
@@ -641,6 +664,19 @@
 	ad9912_set_frequency_w(&ad9912, 2, frequency);
 	GetCtrlVal(MainPanel, PANEL_DDS4, &frequency);
 	ad9912_set_frequency_w(&ad9912, 3, frequency);
+
+	// setup ZMQ pub socket
+	char *socket;
+	rv = Ini_GetStringCopy(configuration, "ZMQ", "socket", &socket);
+	if (! rv)
+		socket = strdup("tcp://127.0.0.1:3456");
+	logmessage(INFO, "data sent to ZMQ socket '%s'", socket);
+	zmqcontext = zmq_ctx_new();
+	zmqsocket = zmq_socket(zmqcontext, ZMQ_PUB);
+	rv = zmq_bind(zmqsocket, socket);
+	if (rv)
+		logmessage(ERROR, "cannot bind ZMQ socket '%s': %s", socket, zmq_strerror(zmq_errno()));
+	free(socket);
 	
 	// dispose configuration
 	Ini_Dispose(configuration);
@@ -1451,6 +1487,11 @@
 				
 				// send Sr frequency (Math4) to Sr data logger
 				sr_datalogger_send(&datalogger, utc, Math4);
+				
+				// publish data through ZMQ
+				int r = zmq_xpub(zmqsocket, "RAW", &ev, sizeof(ev));
+				if (r)
+					logmessage(ERROR, "cannot send data through ZMQ socket: %s", zmq_strerror(r));
 			}		
 			break;
 	}