|
|
@@ -1,8 +1,10 @@ |
|
|
|
#include "megbrain/opr/zmq_rpc.h" |
|
|
|
#include "megbrain/exception.h" |
|
|
|
#include "megbrain_build_config.h" |
|
|
|
|
|
|
|
#if MGB_CUDA |
|
|
|
#include "megbrain/opr/zmq_rpc.h" |
|
|
|
#include "megbrain/common.h" |
|
|
|
#include "megbrain/exception.h" |
|
|
|
|
|
|
|
#include <unistd.h> |
|
|
|
#include <cassert> |
|
|
|
#include <cstdio> |
|
|
@@ -18,6 +20,8 @@ using namespace std; |
|
|
|
using namespace zmq; |
|
|
|
using namespace ZmqRpc; |
|
|
|
|
|
|
|
#define DISCARD_RETVAL MGB_MARK_USED_VAR |
|
|
|
|
|
|
|
ZmqRpcWorker::ZmqRpcWorker(context_t* context, ZmqRpcServerImpl* impl) |
|
|
|
: m_ctx(context), m_runable(0), m_impl(impl) {} |
|
|
|
|
|
|
@@ -57,10 +61,10 @@ void ZmqRpcWorker::work(string uid) { |
|
|
|
if (m_stop) |
|
|
|
break; |
|
|
|
message_t empty; |
|
|
|
socket.recv(empty); |
|
|
|
DISCARD_RETVAL(socket.recv(empty)); |
|
|
|
assert(empty.size() == 0); |
|
|
|
message_t request; |
|
|
|
socket.recv(request); |
|
|
|
DISCARD_RETVAL(socket.recv(request)); |
|
|
|
|
|
|
|
m_mtx.lock(); |
|
|
|
if (--m_runable <= 0) { |
|
|
@@ -144,24 +148,24 @@ void ZmqRpcServer::work() { |
|
|
|
if (items[0].revents & ZMQ_POLLIN) { |
|
|
|
message_t address; |
|
|
|
|
|
|
|
m_backend.recv(address); |
|
|
|
DISCARD_RETVAL(m_backend.recv(address)); |
|
|
|
worker_queue.push({(char*)address.data(), address.size()}); |
|
|
|
|
|
|
|
message_t empty; |
|
|
|
m_backend.recv(empty); |
|
|
|
DISCARD_RETVAL(m_backend.recv(empty)); |
|
|
|
assert(empty.size() == 0); |
|
|
|
|
|
|
|
// the third frame is READY or a client address |
|
|
|
message_t client_address; |
|
|
|
m_backend.recv(client_address); |
|
|
|
DISCARD_RETVAL(m_backend.recv(client_address)); |
|
|
|
string tmp((char*)client_address.data(), client_address.size()); |
|
|
|
if (strcmp(tmp.c_str(), "READY") != 0) { |
|
|
|
empty.rebuild(); |
|
|
|
m_backend.recv(empty); |
|
|
|
DISCARD_RETVAL(m_backend.recv(empty)); |
|
|
|
assert(empty.size() == 0); |
|
|
|
|
|
|
|
message_t respones; |
|
|
|
m_backend.recv(respones); |
|
|
|
DISCARD_RETVAL(m_backend.recv(respones)); |
|
|
|
m_frontend.send(client_address, send_flags::sndmore); |
|
|
|
m_frontend.send(empty, send_flags::sndmore); |
|
|
|
m_frontend.send(respones, send_flags::dontwait); |
|
|
@@ -169,14 +173,14 @@ void ZmqRpcServer::work() { |
|
|
|
} |
|
|
|
if (items[1].revents & ZMQ_POLLIN) { |
|
|
|
message_t address; |
|
|
|
m_frontend.recv(address); |
|
|
|
DISCARD_RETVAL(m_frontend.recv(address)); |
|
|
|
|
|
|
|
message_t empty; |
|
|
|
m_frontend.recv(empty); |
|
|
|
DISCARD_RETVAL(m_frontend.recv(empty)); |
|
|
|
assert(empty.size() == 0); |
|
|
|
|
|
|
|
message_t request; |
|
|
|
m_frontend.recv(request); |
|
|
|
DISCARD_RETVAL(m_frontend.recv(request)); |
|
|
|
|
|
|
|
string worker_uid = worker_queue.front(); |
|
|
|
worker_queue.pop(); |
|
|
@@ -221,7 +225,7 @@ void ZmqRpcClient::add_socket(socket_t* socket) { |
|
|
|
void ZmqRpcClient::request(message_t& request, message_t& reply) { |
|
|
|
socket_t* client = get_socket(); |
|
|
|
client->send(request, send_flags::dontwait); |
|
|
|
client->recv(reply); |
|
|
|
DISCARD_RETVAL(client->recv(reply)); |
|
|
|
add_socket(client); |
|
|
|
} |
|
|
|
#endif |
|
|
|
#endif // MGB_CUDA |