123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642 |
- /* Copyright (C) 2016 Max Riechelmann <max.riechelmann@student.kit.edu>
- (Karlsruhe Institute of Technology)
- This library is free software; you can redistribute it and/or modify it
- under the terms of the GNU Lesser General Public License as published by the
- Free Software Foundation; either version 2.1 of the License, or (at your
- option) any later version.
- This library is distributed in the hope that it will be useful, but WITHOUT
- ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
- FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
- details.
- You should have received a copy of the GNU Lesser General Public License along
- with this library; if not, write to the Free Software Foundation, Inc., 51
- Franklin St, Fifth Floor, Boston, MA 02110, USA
- */
- #include "myco-daemon.h"
- #include "../src/myco-memory.c"
- #include "../src/myco-modules.h"
- #define DEBUG 0
- myco_agent *first_agent = NULL;
- myco_resource *first_resource = NULL;
- in_addr_t myco_daemon_node_ip;
- myco_agent *myco_daemon_find_agent(const char *agent_name) {
- myco_agent *current_agent;
- current_agent = first_agent;
- while (current_agent != NULL) {
- if (strcmp(current_agent->name, agent_name) == 0) {
- return current_agent;
- }
- current_agent = current_agent->next;
- }
- return NULL;
- }
- int myco_daemon_register_agent(message msg) {
- myco_agent *current_agent;
- if (myco_daemon_find_agent(msg.agent_name) != NULL) {
- sprintf(msg.message, "ERROR: agent %s already exists\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- if (first_agent == NULL) {
- first_agent = malloc(sizeof(myco_agent));
- first_agent->next = NULL;
- first_agent->prev = NULL;
- sprintf(first_agent->name, "%s", msg.agent_name);
- first_agent->message_queue_id = msg.agent_message_queue_id;
- } else {
- current_agent = first_agent;
- while (current_agent->next != NULL) {
- current_agent = current_agent->next;
- }
- current_agent->next = malloc(sizeof(myco_agent));
- current_agent->next->prev = current_agent;
- current_agent->next->next = NULL;
- sprintf(current_agent->next->name, "%s", msg.agent_name);
- current_agent->next->message_queue_id = msg.agent_message_queue_id;
- }
- sprintf(msg.message, "SUCCESS: agent %s registered\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- int myco_daemon_unregister_agent(message msg) {
- myco_agent *current_agent;
- myco_resource *current_resource;
- current_agent = myco_daemon_find_agent(msg.agent_name);
- if ((current_resource = myco_daemon_find_resource_by_agent(msg.agent_name)) != NULL) {
- sprintf(msg.message, "ERROR: agent %s still has resource %s\n", msg.agent_name, current_resource->name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- if (current_agent == NULL) {
- sprintf(msg.message, "ERROR: agent %s could not be unregistered\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- // Element in the middle
- if (current_agent->next != NULL && current_agent->prev != NULL) {
- current_agent->prev->next = current_agent->next;
- current_agent->next->prev = current_agent->prev;
- free(current_agent);
- sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- // First element
- if (current_agent->next != NULL && current_agent->prev == NULL) {
- first_agent = current_agent->next;
- current_agent->next->prev = NULL;
- free(current_agent);
- sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- // Last element
- if (current_agent->next == NULL && current_agent->prev != NULL) {
- current_agent->prev->next = NULL;
- free(current_agent);
- sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- // Only remaining
- if (current_agent->next == NULL && current_agent->prev == NULL) {
- first_agent = NULL;
- free(current_agent);
- sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- sprintf(msg.message, "ERROR: agent %s could not be unregistered - data structure seems to be damaged!\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- myco_resource *myco_daemon_find_resource_by_agent(const char *agent_name) {
- myco_resource *current_resource;
- current_resource = first_resource;
- while (current_resource != NULL) {
- if (strcmp(current_resource->agent, agent_name) == 0) {
- return current_resource;
- }
- current_resource = current_resource->next;
- }
- return NULL;
- }
- myco_resource *myco_daemon_find_resource(const char *resource_name) {
- myco_resource *current_resource;
- current_resource = first_resource;
- while (current_resource != NULL) {
- if (strcmp(current_resource->name, resource_name) == 0) {
- return current_resource;
- }
- current_resource = current_resource->next;
- }
- return NULL;
- }
- int myco_daemon_register_resource(message msg) {
- myco_resource *current_resource;
- if (myco_daemon_find_agent(msg.agent_name) == NULL) {
- sprintf(msg.message, "ERROR: agent %s does not exist\n", msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- if (myco_daemon_find_resource(msg.resource_name) != NULL) {
- sprintf(msg.message, "ERROR: resource %s already exists\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- // Insert as first element
- if (first_resource == NULL) {
- first_resource = malloc(sizeof(myco_resource));
- first_resource->next = NULL;
- first_resource->prev = NULL;
- sprintf(first_resource->name, "%s", msg.resource_name);
- sprintf(first_resource->agent, "%s", msg.agent_name);
- first_resource->pid = msg.sender_pid;
- first_resource->pointer = msg.resource_pointer;
- first_resource->size = msg.resource_size;
- first_resource->read_locked = 0;
- first_resource->transfer_locked = 1;
- first_resource->transactional = msg.resource_transactional;
- first_resource->version = 0;
- first_resource->node_ip = myco_daemon_node_ip;
- } else {
- // Insert as last element
- current_resource = first_resource;
- while (current_resource->next != NULL) {
- current_resource = current_resource->next;
- }
- current_resource->next = malloc(sizeof(myco_resource));
- current_resource->next->prev = current_resource;
- current_resource->next->next = NULL;
- sprintf(current_resource->next->name, "%s", msg.resource_name);
- sprintf(current_resource->next->agent, "%s", msg.agent_name);
- current_resource->next->pid = msg.sender_pid;
- current_resource->next->pointer = msg.resource_pointer;
- current_resource->next->size = msg.resource_size;
- current_resource->next->transfer_locked = 1;
- current_resource->next->read_locked = 0;
- current_resource->next->transactional = msg.resource_transactional;
- current_resource->next->version = 0;
- current_resource->next->node_ip = myco_daemon_node_ip;
- }
- sprintf(msg.message, "SUCCESS: resource %s registered\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- int myco_daemon_unregister_resource(message msg) {
- myco_resource *current_resource;
- current_resource = myco_daemon_find_resource(msg.resource_name);
- if (current_resource == NULL) {
- sprintf(msg.message, "ERROR: resource %s could not be unregistered\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- // Element in the middle
- if (current_resource->next != NULL && current_resource->prev != NULL) {
- current_resource->prev->next = current_resource->next;
- current_resource->next->prev = current_resource->prev;
- free(current_resource);
- sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- // First element
- if (current_resource->next != NULL && current_resource->prev == NULL) {
- first_resource = current_resource->next;
- current_resource->next->prev = NULL;
- free(current_resource);
- sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- // Last element
- if (current_resource->next == NULL && current_resource->prev != NULL) {
- current_resource->prev->next = NULL;
- free(current_resource);
- sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- // Only remaining
- if (current_resource->next == NULL && current_resource->prev == NULL) {
- first_resource = NULL;
- free(current_resource);
- sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- sprintf(msg.message, "FATAL ERROR: resource %s could not be unregistered - data structure seems to be damaged!\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- int myco_daemon_request_resource(message msg) {
- myco_resource *current_resource;
- // TODO: Uncommented for performance tests...
- current_resource = myco_daemon_find_resource(msg.resource_name);
- // Check if resource exists on this node
- /*if ((current_resource = myco_daemon_find_resource(msg.resource_name)) != NULL) {
- // Check if agent that requests already owns the resource
- if (strcmp(current_resource->agent, msg.agent_name) == 0) {
- sprintf(msg.message, "ERROR: resource %s already belongs to agent %s\n", msg.resource_name, msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- // Check if resource is transactional
- if (current_resource->transactional != RESOURCE_TRANSACTIONAL) {
- sprintf(msg.message, "ERROR: resource %s is not transactional\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- */
-
- // Check if resource resides on same node
- // printf("\n The adress is: %s\n", inet_ntoa(*(struct in_addr *)¤t_resource->node_ip));
- // if (current_resource->node_ip == myco_daemon_node_ip) {
-
- // If on same node, send information
- msg.sender_pid = current_resource->pid;
- msg.resource_size = current_resource->size;
- msg.resource_pointer = current_resource->pointer;
- sprintf(msg.message, "SUCCESS: resource found on same node, sending information\n");
- myco_send(msg.agent_message_queue_id, msg);
- msg = myco_receive(msg.agent_message_queue_id);
- if (strcmp(msg.message, "RESOURCE GRANTED") == 0) {
- // Transfer ownership of resource to agent
- sprintf(current_resource->agent, "%s", msg.agent_name);
- current_resource->pid = msg.sender_pid;
- current_resource->pointer = msg.resource_pointer;
- } else {
- fprintf(stderr, "ERROR: myco_daemon_request: %s\n", strerror(errno));
- return -1;
- }
- /*} else {
- // Handle case where resource is on another node
- // TODO: Request Resource information from indexer
- // Fetch resource
- myco_network_fetch_resource(msg.resource_pointer, msg.sender_pid, msg.resource_size);
- // if no resource registered with indexer:
- if (0 == 1) {
- sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- }
- */
- return 0;
- }
- int myco_daemon_request_list(message msg, pid_t pid) {
- myco_resource *current_resource;
- current_resource = first_resource;
- char *resource_pointer;
- char tmp[64];
- int resource_size = 0;
- if (current_resource == NULL) {
- sprintf(msg.message, "ERROR: no resources\n");
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- while (current_resource != NULL) {
- resource_size += strlen(current_resource->name) + strlen(current_resource->agent) + strlen("1");
- resource_size += strlen("(,,,);");
- resource_size += 22;
- current_resource = current_resource->next;
- }
- resource_pointer = malloc(resource_size);
- memset(resource_pointer, 0, resource_size);
- current_resource = first_resource;
- while (current_resource != NULL) {
- strcat(resource_pointer, current_resource->name);
- strcat(resource_pointer, "(");
- strcat(resource_pointer, current_resource->agent);
- strcat(resource_pointer, ",");
- sprintf(tmp, "%d", current_resource->size);
- strcat(resource_pointer, tmp);
- strcat(resource_pointer, ",");
- sprintf(tmp, "%p", current_resource->pointer);
- strcat(resource_pointer, tmp);
- strcat(resource_pointer, ",");
- sprintf(tmp, "%d", current_resource->read_locked);
- strcat(resource_pointer, tmp);
- strcat(resource_pointer, ",");
- sprintf(tmp, "%d", current_resource->transfer_locked);
- strcat(resource_pointer, tmp);
- strcat(resource_pointer, ",");
- if (current_resource->transactional) {
- strcat(resource_pointer, "1");
- } else {
- strcat(resource_pointer, "0");
- }
- strcat(resource_pointer, ");");
- current_resource = current_resource->next;
- }
- msg.sender_pid = pid;
- msg.resource_size = resource_size;
- msg.resource_pointer = (void *)resource_pointer;
- sprintf(msg.message, "SUCCESS: sending resource list\n");
- myco_send(msg.agent_message_queue_id, msg);
- msg = myco_receive(msg.agent_message_queue_id);
- if (strcmp(msg.message, "RESOURCE LIST GRANTED") == 0) {
- free(resource_pointer);
- } else {
- fprintf(stderr, "ERROR: resource list was not granted: %s\n", strerror(errno));
- free(resource_pointer);
- return -1;
- }
- return 0;
- }
- int myco_daemon_write_remote_resource(message msg, int force) {
- myco_resource *current_resource;
- // Check if resource exists
- if ((current_resource = myco_daemon_find_resource(msg.resource_name)) == NULL) {
- sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- if (force == 1) {
- sprintf(msg.message, "SUCCESS: forcing to overwrite resource %s, sending information\n", msg.resource_name);
- msg.resource_pointer = current_resource->pointer;
- msg.resource_size = current_resource->size;
- msg.sender_pid = current_resource->pid;
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- } else {
- if (current_resource->version > msg.version) {
- sprintf(msg.message, "ERROR: resource %s is newer than your copied version\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- } else {
- sprintf(msg.message, "SUCCESS: version updated, sending information for resource %s\n", msg.resource_name);
- msg.resource_pointer = current_resource->pointer;
- msg.resource_size = current_resource->size;
- msg.sender_pid = current_resource->pid;
- myco_send(msg.agent_message_queue_id, msg);
- current_resource->version += 1;
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- }
- }
- }
- int myco_daemon_read_remote_resource(message msg) {
- myco_resource *current_resource;
- // Check if resource exists
- if ((current_resource = myco_daemon_find_resource(msg.resource_name)) == NULL) {
- sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- // Check if agent that requests already owns the resource
- if (strcmp(current_resource->agent, msg.agent_name) == 0) {
- sprintf(msg.message, "ERROR: resource %s already belongs to agent %s\n", msg.resource_name, msg.agent_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- // TODO: Handle case where resource is on another node (another agent)
- // If on same node, send information
- msg.sender_pid = current_resource->pid;
- msg.resource_size = current_resource->size;
- msg.resource_pointer = current_resource->pointer;
- sprintf(msg.message, "SUCCESS: resource found on same node, sending information\n");
- myco_send(msg.agent_message_queue_id, msg);
- msg = myco_receive(msg.agent_message_queue_id);
- if (strcmp(msg.message, "RESOURCE READ") == 0) {
- } else {
- fprintf(stderr, "ERROR: myco_daemon_reead_remote: %s\n", strerror(errno));
- return -1;
- }
- return 0;
- }
- int myco_daemon_lock_resource(message msg) {
- myco_resource *current_resource;
- current_resource = myco_daemon_find_resource(msg.resource_name);
- if (current_resource != NULL && current_resource->transfer_locked == 1 && strcmp(current_resource->agent, msg.agent_name) == 0) {
- current_resource->read_locked = 1;
- sprintf(msg.message, "SUCCESS: resource %s locked\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- } else {
- sprintf(msg.message, "ERROR: resource %s could not be locked\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- }
- int myco_daemon_release_resource(message msg) {
- myco_resource *current_resource;
- current_resource = myco_daemon_find_resource(msg.resource_name);
- if (current_resource != NULL && strcmp(current_resource->agent, msg.agent_name) == 0) {
- current_resource->transfer_locked = 0;
- sprintf(msg.message, "SUCCESS: resource %s released\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- } else {
- sprintf(msg.message, "ERROR: resource %s could not be released\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- }
- int myco_daemon_unlock_resource(message msg) {
- myco_resource *current_resource;
- current_resource = myco_daemon_find_resource(msg.resource_name);
- if (current_resource != NULL && current_resource->transfer_locked == 1 && strcmp(current_resource->agent, msg.agent_name) == 0) {
- current_resource->read_locked = 0;
- sprintf(msg.message, "SUCCESS: resource %s unlocked\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return 0;
- } else {
- sprintf(msg.message, "ERROR: resource %s could not be locked\n", msg.resource_name);
- myco_send(msg.agent_message_queue_id, msg);
- return -1;
- }
- }
- int myco_daemon_connect(const char* ip, int port) {
- struct sockaddr_in address;
- int mysocket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
- address.sin_family = AF_INET;
- address.sin_port = htons (port);
- inet_aton (ip, &address.sin_addr);
- connect (mysocket, (struct sockaddr *) &address, sizeof (address));
- return mysocket;
- }
- int myco_daemon_start(pid_t pid, const char *node_ip, const char *indexer_ip) {
- myco_daemon_node_ip = inet_network(node_ip);
- char *buffer = malloc (1024);
- int daemon_message_queue_id;
- message msg = {0};
- // Connect to indexer
- int socket = myco_daemon_connect(indexer_ip, 15000);
- // Create message queue
- daemon_message_queue_id = myco_create_global_message_queue();
- if (daemon_message_queue_id < 0) {
- fprintf(stderr, "ERROR: Message queue could not be created. %s\n", strerror(errno));
- return -1;
- }
- // Receive messages
- while (1) {
- // Receive messages from indexer
- int size = recv (socket, buffer, 1023, 0);
- if( size > 0)
- buffer[size] = '\0';
- printf ("%s\n", buffer);
- // Receive messages from agents
- msg = myco_receive(daemon_message_queue_id);
- if (DEBUG) {
- printf("%d, %s, %s, %s, %p, %d, %d, %d, %d", msg.agent_message_queue_id, msg.agent_name, msg.message, \
- msg.resource_name, msg.resource_pointer, msg.resource_size, msg.resource_transactional, msg.sender_pid, msg.version);
- }
- if (msg.message == NULL) {
- fprintf(stderr, "FATAL ERROR: No message could be received. %s\n", strerror(errno));
- return -1;
- } else {
- // Handle indexer
- char tcp_message[MESSAGE_LENGTH + RESOURCE_NAME_LENGTH + AGENT_NAME_LENGTH];
- strcpy(tcp_message, msg.message);
- strcat(tcp_message, ";");
- if (msg.agent_name[0] != '\0') {
- strcat(tcp_message, msg.agent_name);
- } else {
- strcat(tcp_message, "-");
- }
- strcat(tcp_message, ";");
- if (msg.resource_name[0] != '\0') {
- strcat(tcp_message, msg.resource_name);
- } else {
- strcat(tcp_message, "-");
- }
- send(socket, tcp_message, strlen(tcp_message), 0);
- // Handle agents
- if (strcmp(msg.message, "REGISTER AGENT") == 0) {
- myco_daemon_register_agent(msg);
- }
- if (strcmp(msg.message, "UNREGISTER AGENT") == 0) {
- myco_daemon_unregister_agent(msg);
- }
- if (strcmp(msg.message, "REGISTER RESOURCE") == 0) {
- myco_daemon_register_resource(msg);
- }
- if (strcmp(msg.message, "UNREGISTER RESOURCE") == 0) {
- myco_daemon_unregister_resource(msg);
- }
- if (strcmp(msg.message, "REQUEST RESOURCE") == 0) {
- myco_daemon_request_resource(msg);
- }
- if (strcmp(msg.message, "REQUEST LIST") == 0) {
- myco_daemon_request_list(msg, pid);
- }
- if (strcmp(msg.message, "WRITE REMOTE RESOURCE") == 0) {
- myco_daemon_write_remote_resource(msg, 0);
- }
- if (strcmp(msg.message, "FORCE WRITE REMOTE RESOURCE") == 0) {
- myco_daemon_write_remote_resource(msg, 1);
- }
- if (strcmp(msg.message, "READ REMOTE RESOURCE") == 0) {
- myco_daemon_read_remote_resource(msg);
- }
- if (strcmp(msg.message, "LOCK RESOURCE") == 0) {
- myco_daemon_lock_resource(msg);
- }
- if (strcmp(msg.message, "UNLOCK RESOURCE") == 0) {
- myco_daemon_unlock_resource(msg);
- }
- if (strcmp(msg.message, "RELEASE RESOURCE") == 0) {
- myco_daemon_release_resource(msg);
- }
- }
- }
- close (socket);
- if (myco_remove_message_queue(daemon_message_queue_id) == -1) {
- fprintf(stderr, "FATAL ERROR: could not remove message queue %d\n", daemon_message_queue_id);
- return -1;
- }
- return EXIT_SUCCESS;
- }
|