Browse Source

data structures of indexer

max 7 years ago
parent
commit
6aec3a0bed
8 changed files with 438 additions and 57 deletions
  1. 16 6
      src/myco-daemon.c
  2. 0 2
      src/myco-daemon.h
  3. 321 43
      src/myco-indexer.c
  4. 18 3
      src/myco-indexer.h
  5. 1 1
      test/mycoagent.c
  6. 1 1
      test/mycoagent2.c
  7. 1 1
      test/mycoagent3.c
  8. 80 0
      test/mycotest.c

+ 16 - 6
src/myco-daemon.c

@@ -545,7 +545,7 @@ int myco_daemon_start(pid) {
         // Receive messages from agents
         msg = myco_receive(daemon_message_queue_id);
         if (DEBUG) {
-            printf("%d, %s, %s, %s, %p, %d, %d, %d, %d\n", msg.agent_message_queue_id, msg.agent_name, msg.message, \
+            printf("%d, %s, %s, %s, %p, %d, %d, %d, %d", msg.agent_message_queue_id, msg.agent_name, msg.message, \
                     msg.resource_name, msg.resource_pointer, msg.resource_size, msg.resource_transactional, msg.sender_pid, msg.version);
         }
         if (msg.message == NULL) {
@@ -553,7 +553,21 @@ int myco_daemon_start(pid) {
             return -1;
         } else {
             // Handle indexer
-            send(socket, msg.message, strlen(msg.message), 0);
+            char tcp_message[MESSAGE_LENGTH + RESOURCE_NAME_LENGTH + AGENT_NAME_LENGTH];
+            strcpy(tcp_message, msg.message);
+            strcat(tcp_message, ";");
+            if (msg.agent_name[0] != '\0') {
+                strcat(tcp_message, msg.agent_name);
+            } else {
+                strcat(tcp_message, "-");
+            }
+            strcat(tcp_message, ";");
+            if (msg.resource_name[0] != '\0') {
+                strcat(tcp_message, msg.resource_name);
+            } else {
+                strcat(tcp_message, "-");
+            }
+            send(socket, tcp_message, strlen(tcp_message), 0);
             // Handle agents
             if (strcmp(msg.message, "REGISTER AGENT") == 0) {
                 myco_daemon_register_agent(msg);
@@ -604,7 +618,3 @@ int myco_daemon_start(pid) {
 
     return EXIT_SUCCESS;
 }
-
-// TODO: Communication with indexserver
-
-

+ 0 - 2
src/myco-daemon.h

@@ -83,6 +83,4 @@ int myco_daemon_request_remote_resource(message msg);
 int myco_daemon_write_remote_resource(message msg, int force);
 int myco_daemon_read_remote_resource(message msg);
 
-// TODO: Communication with indexserver
-
 #endif //__MYCO_DAEMON_H

+ 321 - 43
src/myco-indexer.c

@@ -6,50 +6,328 @@
 #include <stdlib.h>
 #include <stdio.h>
 #include <string.h>
+#include "myco-indexer.h"
+
+myco_agent *first_agent = NULL;
+myco_resource *first_resource = NULL;
+
+myco_agent *myco_indexer_find_agent(const char *agent_name) {
+    myco_agent *current_agent;
+    current_agent = first_agent;
+    while (current_agent != NULL) {
+        if (strcmp(current_agent->name, agent_name) == 0) {
+            return current_agent;
+        }
+        current_agent = current_agent->next;
+    }
+    return NULL;
+}
+
+myco_resource *myco_indexer_find_resource_by_agent(const char *agent_name) {
+    myco_resource *current_resource;
+    current_resource = first_resource;
+    while (current_resource != NULL) {
+        if (strcmp(current_resource->agent, agent_name) == 0) {
+            return current_resource;
+        }
+        current_resource = current_resource->next;
+    }
+    return NULL;
+}
+
+myco_resource *myco_indexer_find_resource(const char *resource_name) {
+    myco_resource *current_resource;
+    current_resource = first_resource;
+    while (current_resource != NULL) {
+        if (strcmp(current_resource->name, resource_name) == 0) {
+            return current_resource;
+        }
+        current_resource = current_resource->next;
+    }
+    return NULL;
+}
+
+int myco_indexer_register_agent(command, agent, resource){
+    myco_agent *current_agent;
+
+    if (myco_indexer_find_agent(agent) != NULL) {
+        printf("ERROR: agent %s already exists\n", agent);
+        return -1;
+    }
+
+    if (first_agent == NULL) {
+        first_agent = malloc(sizeof(myco_agent));
+        first_agent->next = NULL;
+        first_agent->prev = NULL;
+        sprintf(first_agent->name, "%s", agent);
+    } else {
+        current_agent = first_agent;
+        while (current_agent->next != NULL) {
+            current_agent = current_agent->next;
+        }
+        current_agent->next = malloc(sizeof(myco_agent));
+        current_agent->next->prev = current_agent;
+        current_agent->next->next = NULL;
+        sprintf(current_agent->next->name, "%s", agent);
+    }
+
+    printf("SUCCESS: agent %s registered\n", agent);
+
+    return 0;
+}
+
+int myco_indexer_unregister_agent(command, agent, resource){
+    myco_agent *current_agent;
+    myco_resource *current_resource;
+
+    current_agent = myco_indexer_find_agent(agent);
+
+    if ((current_resource = myco_indexer_find_resource_by_agent(agent)) != NULL) {
+        printf("ERROR: agent %s still has resource %s\n", agent, current_resource->name);
+        return -1;
+    }
+
+    if (current_agent == NULL) {
+        printf("ERROR: agent %s could not be unregistered\n", agent);
+        return -1;
+    }
+
+    // Element in the middle
+    if (current_agent->next != NULL && current_agent->prev != NULL) {
+        current_agent->prev->next = current_agent->next;
+        current_agent->next->prev = current_agent->prev;
+        free(current_agent);
+        printf("SUCCESS: agent %s unregistered\n", agent);
+        return 0;
+    }
+
+    // First element
+    if (current_agent->next != NULL && current_agent->prev == NULL) {
+        first_agent = current_agent->next;
+        current_agent->next->prev = NULL;
+        free(current_agent);
+        printf("SUCCESS: agent %s unregistered\n", agent);
+        return 0;
+    }
+
+    // Last element
+    if (current_agent->next == NULL && current_agent->prev != NULL) {
+        current_agent->prev->next = NULL;
+        free(current_agent);
+        printf("SUCCESS: agent %s unregistered\n", agent);
+        return 0;
+    }
+
+    // Only remaining
+    if (current_agent->next == NULL && current_agent->prev == NULL) {
+        first_agent = NULL;
+        free(current_agent);
+        printf("SUCCESS: agent %s unregistered\n", agent);
+        return 0;
+    }
+
+    printf("ERROR: agent %s could not be unregistered - data structure seems to be damaged!\n", agent);
+
+    return -1;
+}
+
+int myco_indexer_register_resource(command, agent, resource){
+    myco_resource *current_resource;
+
+    if (myco_indexer_find_agent(agent) == NULL) {
+        printf("ERROR: agent %s does not exist\n", agent);
+
+        return -1;
+    }
+
+    if (myco_indexer_find_resource(resource) != NULL) {
+        printf("ERROR: resource %s already exists\n", resource);
+
+        return -1;
+    }
+
+    // Insert as first element
+    if (first_resource == NULL) {
+        first_resource = malloc(sizeof(myco_resource));
+        first_resource->next = NULL;
+        first_resource->prev = NULL;
+        sprintf(first_resource->name, "%s", resource);
+        sprintf(first_resource->agent, "%s", agent);
+    } else {
+        // Insert as last element
+        current_resource = first_resource;
+        while (current_resource->next != NULL) {
+            current_resource = current_resource->next;
+        }
+        current_resource->next = malloc(sizeof(myco_resource));
+        current_resource->next->prev = current_resource;
+        current_resource->next->next = NULL;
+        sprintf(current_resource->next->name, "%s", resource);
+        sprintf(current_resource->next->agent, "%s", agent);
+    }
+
+    printf("SUCCESS: resource %s registered\n", resource);
+
+    return 0;
+}
+
+int myco_indexer_unregister_resource(command, agent, resource){
+    myco_resource *current_resource;
+
+    current_resource = myco_indexer_find_resource(resource);
+
+    if (current_resource == NULL) {
+        printf("ERROR: resource %s could not be unregistered\n", resource);
+
+        return -1;
+    }
+
+    // Element in the middle
+    if (current_resource->next != NULL && current_resource->prev != NULL) {
+        current_resource->prev->next = current_resource->next;
+        current_resource->next->prev = current_resource->prev;
+        free(current_resource);
+        printf("SUCCESS: resource %s unregistered\n", resource);
+        return 0;
+    }
+
+    // First element
+    if (current_resource->next != NULL && current_resource->prev == NULL) {
+        first_resource = current_resource->next;
+        current_resource->next->prev = NULL;
+        free(current_resource);
+        printf("SUCCESS: resource %s unregistered\n", resource);
+        return 0;
+    }
+
+    // Last element
+    if (current_resource->next == NULL && current_resource->prev != NULL) {
+        current_resource->prev->next = NULL;
+        free(current_resource);
+        printf("SUCCESS: resource %s unregistered\n", resource);
+        return 0;
+    }
+
+    // Only remaining
+    if (current_resource->next == NULL && current_resource->prev == NULL) {
+        first_resource = NULL;
+        free(current_resource);
+        printf("SUCCESS: resource %s unregistered\n", resource);
+        return 0;
+    }
+
+    printf("FATAL ERROR: resource %s could not be unregistered - data structure seems to be damaged!\n", resource);
+
+    return -1;
+}
+
+int myco_indexer_request_resource(command, agent, resource){
+
+}
+
+int myco_indexer_write_remote_resource(command, agent, resource){
+
+}
+
+int myco_indexer_read_remote_resource(command, agent, resource){
+
+}
+
+int myco_indexer_lock_resource(command, agent, resource){
+
+}
+
+int myco_indexer_unlock_resource(command, agent, resource){
+
+}
+
+int myco_indexer_release_resource(command, agent, resource){
+
+}
 
 #define BUF 1024
 int myco_indexer_start(int port) {
-  int create_socket, new_socket;
-  socklen_t addrlen;
-  char *buffer = malloc (BUF);
-  ssize_t size;
-  struct sockaddr_in address;
-  const int y = 1;
-  printf ("\e[2J");
-  if ((create_socket=socket (AF_INET, SOCK_STREAM, 0)) > 0)
-    printf ("Socket wurde angelegt\n");
-  setsockopt( create_socket, SOL_SOCKET,
-              SO_REUSEADDR, &y, sizeof(int));
-  address.sin_family = AF_INET;
-  address.sin_addr.s_addr = INADDR_ANY;
-  address.sin_port = htons (port);
-  if (bind ( create_socket,
-             (struct sockaddr *) &address,
-             sizeof (address)) != 0) {
-    printf( "Der Port ist nicht frei – belegt!\n");
-  }
-  listen (create_socket, 5);
-  addrlen = sizeof (struct sockaddr_in);
-  while (1) {
-     new_socket = accept ( create_socket,
-                           (struct sockaddr *) &address,
-                           &addrlen );
-     if (new_socket > 0)
-      printf ("Ein Client (%s) ist verbunden ...\n",
-         inet_ntoa (address.sin_addr));
-     do {
-        /*
-        printf ("Nachricht zum Versenden: ");
-        fgets (buffer, BUF, stdin);
-        send (new_socket, buffer, strlen (buffer), 0);
-        */
-        size = recv (new_socket, buffer, BUF-1, 0);
-        if( size > 0)
-           buffer[size] = '\0';
-        printf ("Nachricht empfangen: %s\n", buffer);
-     } while (strcmp (buffer, "quit\n") != 0);
-     close (new_socket);
-  }
-  close (create_socket);
-  return EXIT_SUCCESS;
+    int create_socket, new_socket;
+    socklen_t addrlen;
+    char *buffer = malloc (BUF);
+    ssize_t size;
+    struct sockaddr_in address;
+    const int y = 1;
+    char *command;
+    char *agent;
+    char *resource;
+
+    printf ("\e[2J");
+    create_socket=socket (AF_INET, SOCK_STREAM, 0);
+    setsockopt( create_socket, SOL_SOCKET,
+            SO_REUSEADDR, &y, sizeof(int));
+    address.sin_family = AF_INET;
+    address.sin_addr.s_addr = INADDR_ANY;
+    address.sin_port = htons (port);
+    if (bind ( create_socket,
+                (struct sockaddr *) &address,
+                sizeof (address)) != 0) {
+        printf( "Port already taken!\n");
+    }
+    listen (create_socket, 5);
+    addrlen = sizeof (struct sockaddr_in);
+    while (1) {
+        new_socket = accept ( create_socket,
+                (struct sockaddr *) &address,
+                &addrlen );
+        if (new_socket > 0)
+            inet_ntoa (address.sin_addr);
+        do {
+            /*
+               printf ("Nachricht zum Versenden: ");
+               fgets (buffer, BUF, stdin);
+               send (new_socket, buffer, strlen (buffer), 0);
+               */
+            size = recv (new_socket, buffer, BUF-1, 0);
+            if( size > 0)
+                buffer[size] = '\0';
+
+            // Handle agents
+            command = strtok(buffer, ";");
+            agent = strtok(NULL, ";");
+            resource = strtok(NULL, ";");
+
+            if (strcmp(command, "REGISTER AGENT") == 0) {
+                myco_indexer_register_agent(command, agent, resource);
+            }
+            if (strcmp(command, "UNREGISTER AGENT") == 0) {
+                myco_indexer_unregister_agent(command, agent, resource);
+            }
+            if (strcmp(command, "REGISTER RESOURCE") == 0) {
+                myco_indexer_register_resource(command, agent, resource);
+            }
+            if (strcmp(command, "UNREGISTER RESOURCE") == 0) {
+                myco_indexer_unregister_resource(command, agent, resource);
+            }
+            if (strcmp(command, "REQUEST RESOURCE") == 0) {
+                myco_indexer_request_resource(command, agent, resource);
+            }
+            if (strcmp(command, "WRITE REMOTE RESOURCE") == 0) {
+                myco_indexer_write_remote_resource(command, agent, resource, 0);
+            }
+            if (strcmp(command, "FORCE WRITE REMOTE RESOURCE") == 0) {
+                myco_indexer_write_remote_resource(command, agent, resource, 1);
+            }
+            if (strcmp(command, "READ REMOTE RESOURCE") == 0) {
+                myco_indexer_read_remote_resource(command, agent, resource);
+            }
+            if (strcmp(command, "LOCK RESOURCE") == 0) {
+                myco_indexer_lock_resource(command, agent, resource);
+            }
+            if (strcmp(command, "UNLOCK RESOURCE") == 0) {
+                myco_indexer_unlock_resource(command, agent, resource);
+            }
+            if (strcmp(command, "RELEASE RESOURCE") == 0) {
+                myco_indexer_release_resource(command, agent, resource);
+            }
+        } while (strcmp (command, "quit\n") != 0);
+        close (new_socket);
+    }
+    close (create_socket);
+    return EXIT_SUCCESS;
 }

+ 18 - 3
src/myco-indexer.h

@@ -25,6 +25,23 @@
 
 #ifndef __MYCO_DAEMON_H
 #define __MYCO_DAEMON_H
+#include "myco-ipc.c"
+
+typedef struct myco_agent_ myco_agent;
+struct myco_agent_ {
+    char name[AGENT_NAME_LENGTH];
+    myco_agent *next;
+    myco_agent *prev;
+};
+
+typedef struct myco_resource_ myco_resource;
+struct myco_resource_ {
+    char name[RESOURCE_NAME_LENGTH];
+    char agent[AGENT_NAME_LENGTH];
+    myco_resource *next;
+    myco_resource *prev;
+};
+
 
 /**
  * myco_daemon_start:
@@ -36,6 +53,4 @@
  */
 int myco_daemon_start ();
 
-// TODO: Communication with indexserver
-
-
+#endif //__MYCO_DAEMON_H

+ 1 - 1
test/mycoagent.c

@@ -18,7 +18,7 @@
 
 int main() {
     // At the beginning every agent needs to register itself with the agent server.
-    char *agent_name = "MYAGENT";
+    char *agent_name = "DOMAIN1_MYAGENT";
     int agent_message_queue_id = myco_agent_register(agent_name);
 
     // MYCO create

+ 1 - 1
test/mycoagent2.c

@@ -18,7 +18,7 @@
 
 int main() {
     // At the beginning every agent needs to register itself with the agent server.
-    char *agent_name = "MYAGENT2";
+    char *agent_name = "DOMAIN1_MYAGENT2";
     int agent_message_queue_id;
     agent_message_queue_id = myco_agent_register(agent_name);
 

+ 1 - 1
test/mycoagent3.c

@@ -18,7 +18,7 @@
 
 int main() {
     // At the beginning every agent needs to register itself with the agent server.
-    char *agent_name = "MYAGENT3";
+    char *agent_name = "DOMAIN1_MYAGENT3";
     int agent_message_queue_id = myco_agent_register(agent_name);
 
     // MYCO read

+ 80 - 0
test/mycotest.c

@@ -0,0 +1,80 @@
+/*  
+ * mycoagent.c
+ *
+ * This is an example agent that showcases the use of mycorrhiza.
+ */
+
+#include "../src/myco-agent.c"
+
+// MYCO fetch:          Requests a remote, transactional resource. Blocks until
+//                      the resource has been granted.
+// MYCO create:         Creates a new resource and registers it with the agent.
+// MYCO release:        Releases a resource. It can now be taken by other agents via fetch.
+// MYCO free:           Destroys a resource for the entire system. Only works if the calling agent has ownership (fetch/create) of the resource.
+// MYCO read:           Creates a local copy of a remote resource.
+// MYCO push:           Updates the remote resource with the local copy obtained from read.
+// MYCO lock:           Protects a resource from read access.
+// MYCO unlock:         Allows read access again.
+
+int main() {
+    // At the beginning every agent needs to register itself with the agent server.
+    char *agent_name = "MYAGENT";
+    int agent_message_queue_id = myco_agent_register(agent_name);
+
+    // MYCO create
+    // #MYCO create("RESOURCE_1", 1024, transactional)
+    char *resource_name = "RESOURCE_1";
+    int resource_size = 1024;
+    void *resource_pointer = myco_agent_register_resource(agent_message_queue_id, agent_name, resource_name, RESOURCE_TRANSACTIONAL, getpid(), resource_size);
+
+    // MYCO lock
+    myco_agent_lock_resource(agent_message_queue_id, agent_name, resource_name);
+
+    // MYCO unlock
+    myco_agent_unlock_resource(agent_message_queue_id, agent_name, resource_name);
+
+    // MYCO release
+    myco_agent_release_resource(agent_message_queue_id, agent_name, resource_name, resource_pointer, resource_size);
+
+    // View available resources
+    myco_agent_request_resource_list(agent_message_queue_id);
+
+    // MYCO fetch
+    // #MYCO fetch("RESOURCE_2")
+    struct resource myresource2;
+    myresource2 = myco_agent_request_resource(agent_message_queue_id, agent_name, "RESOURCE_2");
+    if (myresource2.size != -1) {
+        printf("The first integer of RESOURCE_2 is: %d\n", *((int*)myresource2.pointer));
+    }
+
+    // MYCO read
+    // #MYCO read("RESOURCE_3")
+    struct resource myresource3;
+    myresource3 = myco_agent_read_remote_resource(agent_message_queue_id, "RESOURCE_3");
+    if (myresource3.size != -1) {
+        printf("The first integer of RESOURCE_3 is: %d\n", *((int*)myresource3.pointer));
+    }
+
+    // Change some values in RESOURCE_3
+    if (myresource3.size != -1) {
+        ((int*)myresource3.pointer)[0] = 0;
+    }
+
+    // MYCO push
+    // #MYCO push("RESOURCE_3")
+    myco_agent_write_remote_resource(agent_message_queue_id, "RESOURCE_3", myresource3.pointer, myresource3.size, 0);
+
+
+    // MYCO free
+    // #MYCO free("RESOURCE_1")
+    myco_agent_unregister_resource(agent_message_queue_id, resource_name);
+
+    // MYCO free
+    // #MYCO free("RESOURCE_2")
+    myco_agent_unregister_resource(agent_message_queue_id, "RESOURCE_2");
+
+    // At the end of its runtime the agent unregisters with the daemon.
+    myco_agent_unregister(agent_name, agent_message_queue_id);
+
+    return EXIT_SUCCESS;
+}