Category Archives: MQTT

Manifold – a new networking infrastructure for rtndf

While I have been using MQTT so far for rtndf, I always had in mind using my own infrastructure. I have been developing the concepts on and off since about 2003 and there’s a direct line from the early versions (intended for clusters of robots to form ad-hoc meshes), through SyntroNet and SNC to the latest incarnation called Manifold. It has some nice features such as auto-discovery, optimized distributed multicast, easy resilience and a distributed directory system that makes node discovery really easy.

The Manifold is made up of nodes. The most important node is ManifoldNexus which forms the hyper-connected fabric of the Manifold. The plan is for rtndf apps to become Manifold nodes to take advantage of the capabilities of Manifold. Manifold has APIs for C++ and Python.

Even though it is very new, Manifold is working quite well. Using Python source and sink scripts, it’s possible get throughput of around 2G bytes per second for both end to end (E2E) and multicast traffic. This figure was obtained using 5000 400,000 byte packets per second on an I7 5820K machine. Between machines, rates are obviously limited by link speeds for large packets. Round-trip E2E latency is around 50uS for small packets which could probably be improved. Maximum E2E message rate is about 100,000 per second between two nodes.

Manifold does potentially lend itself to being used with poll mode Ethernet links and shared memory links. Poll mode shared memory links are especially effective as latency is minimized and data predominately bounces off the CPU’s caches, not to mention DPDK links for inter-machine connectivity. Plenty of work left to do…

picam – a Python rtndf video pipeline processing element for the Raspberry Pi/PiCamera

IMG_5370rtndf now has a simple Python script that allows a Raspberry Pi fitted with PiCamera to be used as a PPE to generate an MJPEG video stream over MQTT. The resulting stream looks identical to that generated by uvccam (which works with UVC webcams) so picam and uvccam can be used interchangeably as video sources in rtndf pipelines.

imu and imuview – adding IMU sensing to rtndf data flow pipelines

imuviewUp to now the only data sources in rtndf were video and audio. imu is a new Python PPE that can be used to stream IMU data (fused pose, sensor readings etc) into an rtndf data flow pipeline. Another new PPE is imuview, this time a C++ PPE, that can display the resulting stream. The screen capture above shows the data being streamed from a Raspberry Pi SenseHat which is a full 11-dof sensor.

One of the nice things about using a pub/sub system like MQTT is that it is possible to hook into any of the pipeline links to see what data is flowing. To this end, a future PPE will be a generic viewer. The user just gives it the topic and it determines the type of data and displays it appropriately. A very handy debugging tool!

rtndf – Python scripts for creating streaming data flow processing pipelines

LaplacianThe idea of joining together separate, lightweight processing elements to form complex pipelines is nothing new. DirectX and GStreamer have been doing this kind of thing for a long time. More recently, Apache NiFi has done a similar kind of thing but with Java classes. While Apache NiFi does have a lot of nice features, I really don’t want to live in Java hell.
I have been playing with MQTT for some time now and it is a very easy to use publish/subscribe system that’s used in all kinds of places. Seemed like it could be the glue for something…

So that’s really the background for rtnDataFlow or rtndf as it is now called. It currently uses MQTT as its pub/sub infrastructure but there’s nothing too specific there – MQTT could easily be swapped out for something else if required. The repo consists of a number of pipeline processing elements that can be used to do some (hopefully) useful things. The primary language is Python although there’s nothing stopping anything being used provided it has an MQTT client and handles the JSON messages correctly. It will even be able to include pipeline processing elements in Docker containers. This will make deployment of new, complex, pipeline processing elements very simple.

The pipeline processing elements are all joined up using topics. Pipeline processing elements can publish to one or more topics and/or subscribe to one or more topics. Because pub/sub systems are intrinsically multicasting, it’s very easy to process data in multiple ways in parallel (for redundancy, performance or functionality). MQTT also allows pipeline processing elements to be distributed on multiple systems, allowing load sharing and heterogeneous computing systems (where only some machines might be fitted with GPUs for example).

Obviously, tools are required to design the pipelines and also to manage them at runtime. The design aspect will come from an old code generation project. While that actually generates C and Python code from a design that the user inputs via a graphical interface, the rtnDataFlow version will just make sure all topic names and broker addresses line up correctly and then produce a pipeline configuration file. A special app, rtnFlowControl, will run on each system and will be responsible for implementing the pipeline design specified.

So what’s the point of all of this? I’m tired of writing (or reworking) code multiple times for slightly different applications. My goal is to keep the pipeline processing elements simple enough and tightly focused so that the specific application can be achieved by just wiring together pipeline processing elements. There’ll end up being quite a few of these of course and probably most applications will still need custom elements but it’s better than nothing. My initial use of rtnDataFlow will be to assist with experiments to see how machine learning tools can be used with IoT devices to do interesting things.

Motion detection pipeline processor using Python and OpenCV

MotionDetectI found this interesting tutorial describing ways to use OpenCV to implement motion detection. I thought that this might form the basis of a nice pipeline processing element for rtnDataFlow. Pipeline processing elements receive a stream from an MQTT topic, process it in some way and then output the modified stream on a new MQTT topic, usually in the same form but with appropriate changes. The new script is called modet.py and it takes a Jpeg over MQTT video stream and performs motion detection using OpenCV’s BackgroundSubtractorMOG2. The output stream consists of the input frames annotated with boxes around objects in motion in the frame. The screenshot shows an example. The small box is actually where the code has detected a moving screen saver on the monitor.

It can be tricky to get stable, large boxes rather than a whole bunch of smaller ones that percolate around. The code contains seven tunable parameters that can be modified as required – comments are in the code. Some will be dependent on frame size, some on frame rate. I tuned these parameters for 1280 x 720 frames at 30 frames per second, the default for the uvccam script.

The pipeline I was using for this test looked like this:

uvccam -> modet -> avview

I also tried it with the imageproc pipeline processor just for fun:

uvccam -> imageproc -> modet ->avview

This actually works pretty well too.

Using TensorFlow for things other than machine learning

LaplacianTensorFlow provides a very convenient dataflow graph framework for not just machine learning applications but really anything where data goes through a number of processing stages. The great thing about using TensorFlow for this is that all the GPU and scaling capabilities are potentially available, along with a Python API for added convenience.

To test this out, I created a simple Python script to act as an image processor that can be inserted into a Jpeg video stream using MQTT as a way of moving the data around. The collection of scripts to generate, process and display the video can be found here. The imageproc.py script uses TensorFlow to shrink each frame in the stream by a factor of two (using average pooling) and then performing simple edge detection using a discrete Laplacian, implemented with a 2-D convolution. Jpeg encoding and decoding is also performed using TensorFlow functions.

The frame rate tops out at around 17 frames per second on my i7-2700K/GTX 970 machine (video source frame rate was 30 frames per second). I am guessing that there is a finite startup latency in TensorFlow – it’s no doubt highly inefficient to run the graph with one image at a time.

There’s no rocket science here and the functionality is trivial. However, it is interesting to think how else TensorFlow can be used. Given the incredible interest and the likelihood of dedicated hardware acceleration one day, there might be considerable value in mapping problems onto TensorFlow graphs.

Processing video streams with TensorFlow and Inception-v3

InceptionMugI am currently working with TensorFlow and I thought it’d be interesting to see what kind of performance I could get when processing video and trying to recognize objects with Inception-v3. While I’d like to get TensorFlow integrated with some of my Qt apps, the whole “build with Bazel” thing is holding that up right now (problems with Eigen includes – one day I’ll get back to that). As a way of taking the path of least resistance, I included TensorFlow in an inline MQTT filter written in Python. It subscribes to a video topic sourced from a webcam and outputs recognized objects in the stream.

As can be seen from the screen capture, it’s currently achieving 11 frames per second using 640 x 480 frames with a GTX 970 GPU. With a GTX 960 GPU, the rate falls to around 8 frames per second. This is pretty much what I have seen with other TensorFlow graphs – the GTX 970 is about 50% faster than a GTX 960, probably due to the restricted memory bus width on the GTX 960.

Hopefully I’ll soon have a 10 series GPU – that should be an interesting comparison.