最新消息:Welcome to the puzzle paradise for programmers! Here, a well-designed puzzle awaits you. From code logic puzzles to algorithmic challenges, each level is closely centered on the programmer's expertise and skills. Whether you're a novice programmer or an experienced tech guru, you'll find your own challenges on this site. In the process of solving puzzles, you can not only exercise your thinking skills, but also deepen your understanding and application of programming knowledge. Come to start this puzzle journey full of wisdom and challenges, with many programmers to compete with each other and show your programming wisdom! Translated with DeepL.com (free version)

javascript - ZeroMQ with node.js pipeline sink stops receiving messages after a while - Stack Overflow

matteradmin8PV0评论

I've been trying to set up a ventilator / worker / sink pattern in order to crawl pages, but I never got past the testing phase. The one particularity of my setup is that the sink lives in the same process as the ventilator. All nodes use ipc:// transport. For the moment only test messages are exchanged. the ventilator sends tasks, workers receive them and wait then send a confirmation to the sink.

Symptoms: After some time (generally less than 5 minutes) the sink stops receiving confirmation messages even though the ventilator keeps on sending tasks and workers keep on receiving them and sending confirmations messages.

I know that confirmations are sent because if I restart my sink, it gets all the missing messages on startup.

I thought ZeroMQ dealt with auto-reconnect.

ventilator/sink

var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
    push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
    push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got message', args.join(' '));
});

worker.js

var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);

pull.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got job ', args.join(' '));
    setTimeout(function() {
        console.log('job done ', args.join(' '));
        sink.send(['job done', args.join(' ')]);
    }, Math.random() * 5 * 1000);
});

EDIT I tried moving the sink to another process and it seems to work. However I would really like it to live in the same process and I observed similar behaviour when dealing with more than one zmq socket per process, regardless of the pattern used

EDIT I'm using this module .node

I've been trying to set up a ventilator / worker / sink pattern in order to crawl pages, but I never got past the testing phase. The one particularity of my setup is that the sink lives in the same process as the ventilator. All nodes use ipc:// transport. For the moment only test messages are exchanged. the ventilator sends tasks, workers receive them and wait then send a confirmation to the sink.

Symptoms: After some time (generally less than 5 minutes) the sink stops receiving confirmation messages even though the ventilator keeps on sending tasks and workers keep on receiving them and sending confirmations messages.

I know that confirmations are sent because if I restart my sink, it gets all the missing messages on startup.

I thought ZeroMQ dealt with auto-reconnect.

ventilator/sink

var push = zmq.socket('push');
var sink = zmq.socket('pull');
var pi = 0;
setInterval(function() {
    push.send(['ping', pi++], zmq.ZMQ_SNDMORE);
    push.send('end');
}, 2000);
push.bind('ipc://crawl.ipc');
sink.bind('ipc://crawl-sink.ipc');
sink.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got message', args.join(' '));
});

worker.js

var pull = zmq.socket('pull');
var sink = zmq.socket('push');
sink.connect(opt.sink);
pull.connect(opt.push);

pull.on('message', function() {
    var args = [].slice.apply(arguments).map(function(e) {return e.toString()});
    console.log('got job ', args.join(' '));
    setTimeout(function() {
        console.log('job done ', args.join(' '));
        sink.send(['job done', args.join(' ')]);
    }, Math.random() * 5 * 1000);
});

EDIT I tried moving the sink to another process and it seems to work. However I would really like it to live in the same process and I observed similar behaviour when dealing with more than one zmq socket per process, regardless of the pattern used

EDIT I'm using this module https://github./JustinTulloss/zeromq.node

Share Improve this question edited Mar 7, 2013 at 8:44 Floby asked Feb 5, 2013 at 14:07 FlobyFloby 2,33618 silver badges15 bronze badges 3
  • Have you tried other forms of transport? – robertklep Commented Mar 5, 2013 at 15:35
  • yes, mainly tcp, and the problem remains. – Floby Commented Mar 5, 2013 at 16:13
  • I've been facing a similar issue, but with TCP. The push socket is a part of a c# application though, and I am trying to receive the data in a pull socket in node.js. It works perfectly, but it randomly stops working/listening after some time. I am not able to pin point the time when it stops working too. – Munim Commented Mar 7, 2013 at 5:26
Add a ment  | 

2 Answers 2

Reset to default 4

I don't necessarily expect this answer to be accepted, but I'm placing it here for reference. There is a very faithful node-only module called Axon which is inspired by ZeroMQ.

  • Axon has no piled dependencies, and re-creates the same socket types as ZeroMQ.
  • Axon also builds upon the pub/sub socket type to create a network event-emitter.
  • Finally, ZMQs req/rep socket does not work with Node.js because ZMQ expects the reply to occur synchronously. Being native Node, the Axon library handles the req/rep pattern properly.

Note: ZMQ and Axon are not interoperable.

Not sure if you are using the base AMQP client, or a package that uses it under the covers, I am having similar issues with RabbitMQ. The actual process is still running (setInterval works)

I am running my services/workers via cluster.fork from the main process... there are listeners in the main process that re-launch the workers/services upon exit... inside my worker I have a setInterval that runs every X seconds, if no work is done during that time, I have my worker process.exit (where the main process listener will launch a new fork). This works out as enough resiliancy for me. By having several workers running (listening for queue), work still gets done.

As another suggested, I've been considering a switch to Axon, as all my interfaces to the MQ are currently going through Node. My other systems are interfacing via a NodeJS driven API service. For that matter, it probably wouldn't be too hard to expose what you may need via an API service.

Post a comment

comment list (0)

  1. No comments so far