Browse Source

resource transfer between agents on the same node

max 8 years ago
parent
commit
b8ae334417
9 changed files with 97 additions and 37 deletions
  1. 18 11
      src/myco-agent.c
  2. 2 2
      src/myco-agent.h
  3. 56 4
      src/myco-daemon.c
  4. 3 0
      src/myco-daemon.h
  5. 5 0
      src/myco-ipc.h
  6. 0 1
      src/myco-worker.c
  7. 12 6
      test/mycoagent.c
  8. 0 12
      test/mycoagent2.c
  9. 1 1
      test/worker.c

+ 18 - 11
src/myco-agent.c

@@ -14,6 +14,7 @@
    */
 
 #include "myco-agent.h"
+#include "../src/myco-memory.c"
 
 #define DEBUG 1
 
@@ -62,12 +63,13 @@ int myco_agent_unregister (const char *agent_name, int agent_message_queue_id) {
     return 0;
 }
 
-int myco_agent_register_resource (int agent_message_queue_id, const char *agent_name, const char *resource_name, int resource_transactional) {
+int myco_agent_register_resource (int agent_message_queue_id, const char *agent_name, const char *resource_name, int resource_transactional, pid_t pid) {
     message msg;
     msg.agent_message_queue_id = agent_message_queue_id;
     sprintf(msg.agent_name, "%s", agent_name);
     sprintf(msg.resource_name, "%s", resource_name);
     msg.resource_transactional = resource_transactional;
+    msg.sender_pid = pid;
     sprintf(msg.message, "REGISTER RESOURCE\n");
 
     msg = myco_send_and_receive(msg, myco_get_global_message_queue(), agent_message_queue_id);
@@ -102,8 +104,8 @@ int myco_agent_unregister_resource (int agent_message_queue_id, const char *reso
     return 0;
 }
 
-int myco_agent_request_resource(int agent_message_queue_id, char *resource_name) {
-    char *resource_pointer;
+struct resource myco_agent_request_resource(int agent_message_queue_id, char *resource_name) {
+    struct resource myresource;
     message msg;
     msg.agent_message_queue_id = agent_message_queue_id;
     sprintf(msg.resource_name, "%s", resource_name);
@@ -116,17 +118,22 @@ int myco_agent_request_resource(int agent_message_queue_id, char *resource_name)
     }
 
     if (strncmp(msg.message, "SUCCESS:", 8) != 0) {
-        return -1;
+        myresource.size = -1;
+        return myresource;
     }
 
-    resource_pointer = malloc(msg.resource_size);
+    myresource.pointer = myco_malloc(msg.resource_size);
+    myresource.size = msg.resource_size;
 
-    if (myco_read_transactional(msg.sender_pid, msg.resource_pointer, msg.resource_size, (void *)resource_pointer, msg.resource_size) == -1) {
+    // TODO: return size and start address
+
+    if (myco_read_transactional(msg.sender_pid, msg.resource_pointer, msg.resource_size, myresource.pointer, myresource.size) == -1) {
         fprintf(stderr, "FATAL ERROR in myco_read_transactional() %s\n", strerror(errno));
-        return -1;
+        myresource.size = -1;
+        return myresource;
     }
 
-    return 0;
+    return myresource;
 }
 
 int myco_agent_request_resource_list(int agent_message_queue_id) {
@@ -148,14 +155,14 @@ int myco_agent_request_resource_list(int agent_message_queue_id) {
         return -1;
     }
 
-    resource_pointer = malloc(msg.resource_size);
+    resource_pointer = myco_malloc(msg.resource_size);
 
     if (myco_read_transactional(msg.sender_pid, msg.resource_pointer, msg.resource_size, (void *)resource_pointer, msg.resource_size) == -1) {
         printf("ERROR: %s\n", strerror(errno));
         return -1;
     }
 
-    sprintf(msg.message, "RESOURCE GRANTED\n");
+    sprintf(msg.message, "RESOURCE LIST GRANTED\n");
     myco_send(agent_message_queue_id, msg);
 
     printf("%s\n", resource_pointer);
@@ -166,7 +173,7 @@ int myco_agent_request_resource_list(int agent_message_queue_id) {
         split = strtok(NULL, ";");
     }
     */
-    free(resource_pointer);
+    myco_free(resource_pointer);
 
     return 0;
 }

+ 2 - 2
src/myco-agent.h

@@ -60,7 +60,7 @@ int myco_agent_unregister (const char *agent_name, int message_queue_id);
  *
  * Returns: message_queue_id on success, -1 on error 
  */
-int myco_agent_register_resource (int message_queue_id, const char *agent_name, const char *resource_name, int resource_transactional);
+int myco_agent_register_resource (int message_queue_id, const char *agent_name, const char *resource_name, int resource_transactional, pid_t pid);
 
 /**
  * myco_agent_unregister_resource:
@@ -78,7 +78,7 @@ int myco_agent_unregister_resource (int message_queue_id, const char *resource_n
  *
  * Returns: message_queue_id on success, -1 on error 
  */
-int myco_agent_request_resource(int message_queue_id, char *resource_name);
+struct resource myco_agent_request_resource(int message_queue_id, char *resource_name);
 
 /**
  * myco_agent_request_resource_list:

+ 56 - 4
src/myco-daemon.c

@@ -14,6 +14,7 @@
    */
 
 #include "myco-daemon.h"
+#include "../src/myco-memory.c"
 
 myco_agent *first_agent = NULL;
 myco_resource *first_resource = NULL;
@@ -181,6 +182,9 @@ int myco_daemon_register_resource(message msg) {
         first_resource->prev = NULL;
         sprintf(first_resource->name, "%s", msg.resource_name);
         sprintf(first_resource->agent, "%s", msg.agent_name);
+        first_resource->pid = msg.sender_pid;
+        first_resource->pointer = msg.resource_pointer;
+        first_resource->size = msg.resource_size;
         first_resource->transactional = msg.resource_transactional;
     } else {
         // Insert as last element
@@ -193,6 +197,9 @@ int myco_daemon_register_resource(message msg) {
         current_resource->next->next = NULL;
         sprintf(current_resource->next->name, "%s", msg.resource_name);
         sprintf(current_resource->next->agent, "%s", msg.agent_name);
+        current_resource->next->pid = msg.sender_pid;
+        current_resource->next->pointer = msg.resource_pointer;
+        current_resource->next->size = msg.resource_size;
         current_resource->next->transactional = msg.resource_transactional;
     }
 
@@ -263,6 +270,51 @@ int myco_daemon_unregister_resource(message msg) {
 }
 
 int myco_daemon_request_resource(message msg) {
+    myco_resource *current_resource;
+    
+    // Check if resource exists
+    if ((current_resource = myco_daemon_find_resource(msg.resource_name)) == NULL) {
+        sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
+        myco_send(msg.agent_message_queue_id, msg);
+        return -1;
+    }
+    // Check if agent that requests already owns the resource
+    if (strcmp(current_resource->agent, msg.agent_name) == 0) {
+        sprintf(msg.message, "ERROR: resource %s already belongs to agent %s\n", msg.resource_name, msg.agent_name);
+        myco_send(msg.agent_message_queue_id, msg);
+        return -1;
+    }
+    // Check if resource is transactional
+    if (current_resource->transactional != RESOURCE_TRANSACTIONAL) {
+        sprintf(msg.message, "ERROR: resource %s is not transactional\n", msg.resource_name);
+        myco_send(msg.agent_message_queue_id, msg);
+        return -1;
+    }
+
+    //TODO:  Check if resource is on the same node
+
+    // If on same node, grant
+    msg.sender_pid = current_resource->pid; 
+    msg.resource_size = current_resource->size;
+    msg.resource_pointer = current_resource->pointer;
+    sprintf(msg.message, "SUCCESS: resource found on same node, sending information\n");
+    myco_send(msg.agent_message_queue_id, msg);
+
+    msg = myco_receive(msg.agent_message_queue_id);
+    if (strcmp(msg.message, "RESOURCE GRANTED\n") == 0) {
+        //TODO: Tell other agent to free resource
+
+        // Transfer ownership of resource to agent
+        sprintf(current_resource->agent, "%s", msg.agent_name);
+        current_resource->pid = msg.sender_pid;
+        current_resource->pointer = msg.resource_pointer;
+    } else {
+        fprintf(stderr, "ERROR: myco_daemon_request: %s\n", strerror(errno));
+        return -1;
+    }
+
+    return 0;
+
     sprintf(msg.message, "SUCCESS: resource granted\n");
     myco_send(msg.agent_message_queue_id, msg);
 
@@ -281,7 +333,7 @@ int myco_daemon_request_list(message msg, pid_t pid) {
     }
 
     resource_size += 4;
-    resource_pointer = malloc(resource_size);
+    resource_pointer = myco_malloc(resource_size);
     memset(resource_pointer, 0, resource_size);
 
     current_resource = first_resource;
@@ -306,8 +358,8 @@ int myco_daemon_request_list(message msg, pid_t pid) {
     myco_send(msg.agent_message_queue_id, msg);
 
     msg = myco_receive(msg.agent_message_queue_id);
-    if (strcmp(msg.message, "RESOURCE GRANTED\n") == 0) {
-        free(resource_pointer);
+    if (strcmp(msg.message, "RESOURCE LIST GRANTED\n") == 0) {
+        myco_free(resource_pointer);
     } else {
         fprintf(stderr, "ERROR: myco_daemon_request_list: %s\n", strerror(errno));
         return -1;
@@ -381,7 +433,7 @@ int myco_daemon_start(pid) {
             }
         } 
     }
-    
+
     if (myco_remove_message_queue(daemon_message_queue_id) == -1) {
         fprintf(stderr, "FATAL ERROR: could not remove message queue %d\n", daemon_message_queue_id);
         return -1;

+ 3 - 0
src/myco-daemon.h

@@ -47,6 +47,9 @@ struct myco_resource_ {
     char name[RESOURCE_NAME_LENGTH];
     int transactional;
     char agent[AGENT_NAME_LENGTH];
+    pid_t pid;
+    void* pointer;
+    int size;
     myco_resource *next;
     myco_resource *prev;
 };

+ 5 - 0
src/myco-ipc.h

@@ -46,6 +46,11 @@
 #define RESOURCE_TRANSACTIONAL 1
 #define RESOURCE_NOT_TRANSACTIONAL 2
 
+struct resource {
+    void *pointer;
+    int size;
+};
+
 typedef struct {
     int agent_message_queue_id;
     char message[MESSAGE_LENGTH];

+ 0 - 1
src/myco-worker.c

@@ -26,7 +26,6 @@ int myco_worker_start(int message_queue_id) {
             perror("fork()");
             return EXIT_FAILURE;
         case 0:
-            printf("Starting worker!\n");
             execv("./worker", arguments);
         default:
             return worker_pid;

+ 12 - 6
test/mycoagent.c

@@ -6,23 +6,28 @@
 #include "../src/myco-worker.c"
 
 int main() {
+    struct resource myresource;
     pid_t worker_pid;
-    int message_queue_id;
+    int message_queue_id, mqi2;
     message msg;
 
     message_queue_id = myco_agent_register("MYAGENT_1");
-    myco_agent_register_resource(message_queue_id, "MYAGENT_1", "RESOURCE_1", RESOURCE_TRANSACTIONAL);
-
+    mqi2 = myco_agent_register("MYAGENT_2");
 
     worker_pid = myco_worker_start(message_queue_id);
-    printf("Watching over worker with pid: %d\n", worker_pid);
+
+    myco_agent_register_resource(message_queue_id, "MYAGENT_1", "RESOURCE_1", RESOURCE_NOT_TRANSACTIONAL, worker_pid);
+//    printf("Watching over worker with pid: %d\n", worker_pid);
+
+    myco_agent_request_resource_list(message_queue_id);
+    myresource = myco_agent_request_resource(mqi2, "RESOURCE_1");
 
     while (myco_worker_is_running(worker_pid) == 0) {
-            printf("%d is still alive.\n", worker_pid);
+            //printf("%d is still alive.\n", worker_pid);
 
             // check for messages
             msg = myco_receive(message_queue_id);
-            printf("%s", msg.message);
+            //printf("%s", msg.message);
 
             sleep(1);
 
@@ -30,6 +35,7 @@ int main() {
 
     myco_agent_unregister_resource(message_queue_id, "RESOURCE_1");
     myco_agent_unregister("MYAGENT_1", message_queue_id);
+    myco_agent_unregister("MYAGENT_2", mqi2);
 
 
     return EXIT_SUCCESS;

+ 0 - 12
test/mycoagent2.c

@@ -5,18 +5,6 @@
 #include "../src/myco-agent.c"
 
 int main() {
-    int message_queue_id;
-
-    message_queue_id = myco_agent_register("MYAGENT");
-    myco_agent_register_resource(message_queue_id, "MYAGENT", "MYTRANSACTIONALRESOURCE", RESOURCE_TRANSACTIONAL);
-    myco_agent_register_resource(message_queue_id, "MYAGENT", "MYTRANSACTIONALRESOURCE2", RESOURCE_TRANSACTIONAL);
-    myco_agent_register_resource(message_queue_id, "MYAGENT2", "MYTRANSACTIONALRESOURCE2", RESOURCE_TRANSACTIONAL);
-    myco_agent_request_resource_list(message_queue_id);
-    myco_agent_unregister("MYAGENT", message_queue_id);
-    myco_agent_unregister_resource(message_queue_id, "MYTRANSACTIONALRESOURCE");
-    myco_agent_unregister_resource(message_queue_id, "MYTRANSACTIONALRESOURCE2");
-    myco_agent_unregister("MYAGENT", message_queue_id);
-
 
     return EXIT_SUCCESS;
 }

+ 1 - 1
test/worker.c

@@ -31,7 +31,7 @@ int main(int argc, char **argv) {
     sprintf(msg.message, "FREED MEMORY AT %p\n", resource_pointer);
     myco_send(message_queue_id, msg);
 
-    printf("Worker is shutting down\n");
+//    printf("Worker is shutting down\n");
 
     return 0;
 }