Пример асинхронного шаблона Majordomo с использованием нового обновленного API zzock CZMQ-4.1.0, не работающего

onepix спросил: 03 февраля 2018 в 10:12 в: c

После установки zmq и czmq с пивом я попытался скомпилировать и воспроизвести шаблон Asynchronous-Majordomo, но он не сработал, так как он требует czmq v3. Насколько я понял, я попытался обновить его до версии v4, используя zactor, потому что

zthread устарел в пользу zactor http://czmq.zeromq.org/czmq3-0:zthread

Так что теперь следующий код выглядит отлично для меня как обновленный шаблон async-majordomo, но он не работает должным образом. Он не создает нить, когда я запускаю его через мой терминал.

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.#include "czmq.h"
#include <stdlib.h>void dbg_write_in_file(char * txt, int nb_request) {
    FILE * pFile;
    pFile = fopen ("myfile.txt","a");    if (pFile!=NULL)
    {
        fputs (txt, pFile);        char str_nb_request[12];
        sprintf(str_nb_request, "%d", nb_request);
        fputs (str_nb_request, pFile);        fputs ("\n", pFile);
        fclose (pFile);
    }
}static void
client_task (zsock_t *pipe, void *args)
{
    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://localhost:5555");
    printf ("Setting up test...\n");
    zclock_sleep (100);    printf("child 1: parent: %i\n\n", getppid());
    printf("child 1: my pid: %i\n\n", getpid());    int requests;
    int64_t start;    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");        // stuck here /!\        char *reply = zstr_recv (client);
        zstr_free (&reply);        // check if it does something
        dbg_write_in_file("sync round-trip requests : ", requests);
        // end check
    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");        // check if it does something
        dbg_write_in_file("async round-trip send requests : ", requests);
        // end check
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);        // check if it does something
        dbg_write_in_file("async round-trip rec requests : ", requests);
        // end check
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));    zstr_send (pipe, "done");
}//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:static void
worker_task (zsock_t *pipe, void *args)
{
    printf("child 2: parent: %i\n\n", getppid());
    printf("child 2: my pid: %i\n\n", getpid());    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://localhost:5556");    while (true) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);    
    }
    zsock_destroy (&worker);
}//  Here is the broker task. It uses the zmq_proxy function to switch
//  messages between frontend and backend:static void
broker_task (zsock_t *pipe, void *args)
{
    printf("child 3: parent: %i\n\n", getppid());
    printf("child 3: my pid: %i\n\n", getpid());    //  Prepare our sockets
    zsock_t *frontend = zsock_new (ZMQ_DEALER);
    zsock_bind (frontend, "tcp://localhost:5555");
    zsock_t *backend = zsock_new (ZMQ_DEALER);
    zsock_bind (backend, "tcp://localhost:5556");
    zmq_proxy (frontend, backend, NULL);    zsock_destroy (&frontend);
    zsock_destroy (&backend);
}//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:int main (void)
{
    //  Create threads
    zactor_t *client = zactor_new (client_task, NULL);
    assert (client);    
    zactor_t *worker = zactor_new (worker_task, NULL);
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, NULL);
    assert (broker);    //  Wait for signal on client pipe
    char *signal = zstr_recv (client);
    zstr_free (&signal);    zactor_destroy (&client);
    zactor_destroy (&worker);
    zactor_destroy (&broker);
    return 0;
}

Когда я запускаю его, похоже, что программа застревает в комментарии

// застрял здесь /! \

Затем, когда я убиваю так как он не заканчивается или вообще ничего печатать, мне нужно нажать пять раз Ctrl + C (^ C). Только тогда он выглядит более наглядным на консоли, как будто он действительно работает.
= & GT; Обратите внимание, что я удаляю все мои шаги printf(), поскольку было действительно беспорядочно читать.

Когда он запускается, он ничего не пишет в файл, вызываемый dbg_write_in_file() , только после отправки пяти Ctrl + C (^ C).

Оба клиента и задача брокера возвращают тот же номер getppid (мой терминал) и getpid как сама программа.

Я использую gcc trippingv4.c -o trippingv4 -L/usr/local/lib -lzmq -lczmq для компиляции.

Когда я пытаюсь его убить:

./trippingv4
Setting up test...
child 1: parent: 60967child 1: my pid: 76853Synchronous round-trip test...
^Cchild 2: parent: 60967child 2: my pid: 76853^Cchild 3: parent: 60967child 3: my pid: 76853^C^C^CE: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:29
E: 18-02-28 00:16:37 [76853]dangling 'PAIR' socket created at src/zsys.c:471
E: 18-02-28 00:16:37 [76853]dangling 'DEALER' socket created at trippingv4.c:89

Обновить

Спасибо за подробный ответ @ user3666197 , В первой части компилятор не компилирует вызов assert, поэтому я просто показываю значение вместо этого и сравниваю визуально, они одинаковы.

int czmqMAJOR,
czmqMINOR,
czmqPATCH;zsys_version ( &czmqMAJOR, &czmqMINOR, &czmqPATCH );
printf( "INF: detected CZMQ ( %d, %d, %d ) -version\n",
         czmqMAJOR,
         czmqMINOR,
         czmqPATCH
         );printf( "INF: CZMQ_VERSION_MAJOR %d, CZMQ_VERSION_MINOR %d, CZMQ_VERSION_PATCH %d\n",
         CZMQ_VERSION_MAJOR,
         CZMQ_VERSION_MINOR,
         CZMQ_VERSION_PATCH
         );

Выход:

INF: detected CZMQ ( 4, 1, 0 ) -version
INF: CZMQ_VERSION_MAJOR 4, CZMQ_VERSION_MINOR 1, CZMQ_VERSION_PATCH 0

Вызов zsys_info компилируется, но ничего не показывает на терминале, даже с fflush(stdout) на всякий случай, поэтому я просто использовал printf:

INF: This system's Context() limit is 65535 ZeroMQ socketsINF: current state of the global Context()-instance has:
     ( 1 )-IO-threads ready
     ( 1 )-ZMQ_BLOCKY state

Затем я изменил значение глобального контекстного потока с помощью zsys_set_io_threads(2) и / или zmq_ctx_set (aGlobalCONTEXT, ZMQ_BLOCKY, false);, все еще заблокированного. Похоже, zactor не работает с потоками систем, поскольку zthread был ... или не дает аналогичного поведения. Учитывая мой опыт в zeromq (также нулевой), вероятно, я пытаюсь сделать что-то, что не может быть достигнуто.

Обновление разрешено, но неproper

Моя основная ошибка заключалась в том, чтобы не правильно инициализировать экземпляр zactor

Актерская функция ДОЛЖНА вызывать zsock_signal (pipe) при инициализации и ДОЛЖНА прослушивать канал и выйти из команды $ TERM.

И чтобы не заблокировать выполнение прокси-сервера zactor до того, как он назовет zactor_destroy (&proxy);

, я даю последний код ниже, но вам все равно нужно выйти в конце с помощью Ctrl + C , потому что я не понял, как правильно управлять сигналом $TERM. Кроме того, zactor по-прежнему не использует системные адды. Вероятно, это дизайн, но я не знаю, как это работает за деревом.

//  Round-trip demonstrator
//  While this example runs in a single process, that is just to make
//  it easier to start and stop the example. The client task signals to
//  main when it's ready.#include <czmq.h>static void
client_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Client"));
    zsock_signal (pipe, 0);    zsock_t *client = zsock_new (ZMQ_DEALER);
    zsock_connect (client, "tcp://127.0.0.1:5555");    printf ("Setting up test...\n");
    zclock_sleep (100);    int requests;
    int64_t start;    printf ("Synchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 10000; requests++) {
        zstr_send (client, "hello");        zmsg_t *msgh = zmsg_recv (client);
        zmsg_destroy (&msgh);    }
    printf (" %d calls/second\n",
        (1000 * 10000) / (int) (zclock_time () - start));    printf ("Asynchronous round-trip test...\n");
    start = zclock_time ();
    for (requests = 0; requests < 100000; requests++) {
        zstr_send (client, "hello");
    }
    for (requests = 0; requests < 100000; requests++) {
        char *reply = zstr_recv (client);
        zstr_free (&reply);
    }
    printf (" %d calls/second\n",
        (1000 * 100000) / (int) (zclock_time () - start));    zstr_send (pipe, "done");
    printf("send 'done' to pipe\n");
}//  Here is the worker task. All it does is receive a message, and
//  bounce it back the way it came:static void
worker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Worker"));
    zsock_signal (pipe, 0);    zsock_t *worker = zsock_new (ZMQ_DEALER);
    zsock_connect (worker, "tcp://127.0.0.1:5556");    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (worker);
        zmsg_send (&msg, worker);
        // zstr_send (worker, "hello back"); // Give better perf I don't know why    }
    zsock_destroy (&worker);
}//  Here is the broker task. It uses the zmq_proxy function to switch
//  messages between frontend and backend:static void
broker_task (zsock_t *pipe, void *args)
{
    assert (streq ((char *) args, "Hello, Task"));
    zsock_signal (pipe, 0);    //  Prepare our proxy and its sockets
    zactor_t *proxy = zactor_new (zproxy, NULL);
    zstr_sendx (proxy, "FRONTEND", "DEALER", "tcp://127.0.0.1:5555", NULL);
    zsock_wait (proxy);
    zstr_sendx (proxy, "BACKEND", "DEALER", "tcp://127.0.0.1:5556", NULL);
    zsock_wait (proxy);    bool terminated = false;
    while (!terminated) {
        zmsg_t *msg = zmsg_recv (pipe);
        if (!msg)
            break;              //  Interrupted
        char *command = zmsg_popstr (msg);        if (streq (command, "$TERM")) {
            terminated = true;
            printf("broker received $TERM\n");
        }        freen (command);
        zmsg_destroy (&msg);
    }    zactor_destroy (&proxy);
}//  Finally, here's the main task, which starts the client, worker, and
//  broker, and then runs until the client signals it to stop:int main (void)
{    //  Create threads
    zactor_t *client = zactor_new (client_task, "Hello, Client");
    assert (client);
    zactor_t *worker = zactor_new (worker_task, "Hello, Worker");
    assert (worker);
    zactor_t *broker = zactor_new (broker_task, "Hello, Task");
    assert (broker);    char *signal = zstr_recv (client);
    printf("signal %s\n", signal);
    zstr_free (&signal);    zactor_destroy (&client);
    printf("client done\n");
    zactor_destroy (&worker);
    printf("worker done\n");
    zactor_destroy (&broker);
    printf("broker done\n");    return 0;
}

0 ответов