myco-daemon.c 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. /* Copyright (C) 2016 Max Riechelmann <max.riechelmann@student.kit.edu>
  2. (Karlsruhe Institute of Technology)
  3. This library is free software; you can redistribute it and/or modify it
  4. under the terms of the GNU Lesser General Public License as published by the
  5. Free Software Foundation; either version 2.1 of the License, or (at your
  6. option) any later version.
  7. This library is distributed in the hope that it will be useful, but WITHOUT
  8. ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
  9. FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
  10. details.
  11. You should have received a copy of the GNU Lesser General Public License along
  12. with this library; if not, write to the Free Software Foundation, Inc., 51
  13. Franklin St, Fifth Floor, Boston, MA 02110, USA
  14. */
  15. #include "myco-daemon.h"
  16. #include "../src/myco-memory.c"
  17. #define DEBUG 1
  18. myco_agent *first_agent = NULL;
  19. myco_resource *first_resource = NULL;
  20. myco_agent *myco_daemon_find_agent(const char *agent_name) {
  21. myco_agent *current_agent;
  22. current_agent = first_agent;
  23. while (current_agent != NULL) {
  24. if (strcmp(current_agent->name, agent_name) == 0) {
  25. return current_agent;
  26. }
  27. current_agent = current_agent->next;
  28. }
  29. return NULL;
  30. }
  31. int myco_daemon_register_agent(message msg) {
  32. myco_agent *current_agent;
  33. if (myco_daemon_find_agent(msg.agent_name) != NULL) {
  34. sprintf(msg.message, "ERROR: agent %s already exists\n", msg.agent_name);
  35. myco_send(msg.agent_message_queue_id, msg);
  36. return -1;
  37. }
  38. if (first_agent == NULL) {
  39. first_agent = malloc(sizeof(myco_agent));
  40. first_agent->next = NULL;
  41. first_agent->prev = NULL;
  42. sprintf(first_agent->name, "%s", msg.agent_name);
  43. first_agent->message_queue_id = msg.agent_message_queue_id;
  44. } else {
  45. current_agent = first_agent;
  46. while (current_agent->next != NULL) {
  47. current_agent = current_agent->next;
  48. }
  49. current_agent->next = malloc(sizeof(myco_agent));
  50. current_agent->next->prev = current_agent;
  51. current_agent->next->next = NULL;
  52. sprintf(current_agent->next->name, "%s", msg.agent_name);
  53. current_agent->next->message_queue_id = msg.agent_message_queue_id;
  54. }
  55. sprintf(msg.message, "SUCCESS: agent %s registered\n", msg.agent_name);
  56. myco_send(msg.agent_message_queue_id, msg);
  57. return 0;
  58. }
  59. int myco_daemon_unregister_agent(message msg) {
  60. myco_agent *current_agent;
  61. myco_resource *current_resource;
  62. current_agent = myco_daemon_find_agent(msg.agent_name);
  63. if ((current_resource = myco_daemon_find_resource_by_agent(msg.agent_name)) != NULL) {
  64. sprintf(msg.message, "ERROR: agent %s still has resource %s\n", msg.agent_name, current_resource->name);
  65. myco_send(msg.agent_message_queue_id, msg);
  66. return -1;
  67. }
  68. if (current_agent == NULL) {
  69. sprintf(msg.message, "ERROR: agent %s could not be unregistered\n", msg.agent_name);
  70. myco_send(msg.agent_message_queue_id, msg);
  71. return -1;
  72. }
  73. // Element in the middle
  74. if (current_agent->next != NULL && current_agent->prev != NULL) {
  75. current_agent->prev->next = current_agent->next;
  76. current_agent->next->prev = current_agent->prev;
  77. free(current_agent);
  78. sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
  79. myco_send(msg.agent_message_queue_id, msg);
  80. return 0;
  81. }
  82. // First element
  83. if (current_agent->next != NULL && current_agent->prev == NULL) {
  84. first_agent = current_agent->next;
  85. current_agent->next->prev = NULL;
  86. free(current_agent);
  87. sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
  88. myco_send(msg.agent_message_queue_id, msg);
  89. return 0;
  90. }
  91. // Last element
  92. if (current_agent->next == NULL && current_agent->prev != NULL) {
  93. current_agent->prev->next = NULL;
  94. free(current_agent);
  95. sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
  96. myco_send(msg.agent_message_queue_id, msg);
  97. return 0;
  98. }
  99. // Only remaining
  100. if (current_agent->next == NULL && current_agent->prev == NULL) {
  101. first_agent = NULL;
  102. free(current_agent);
  103. sprintf(msg.message, "SUCCESS: agent %s unregistered\n", msg.agent_name);
  104. myco_send(msg.agent_message_queue_id, msg);
  105. return 0;
  106. }
  107. sprintf(msg.message, "ERROR: agent %s could not be unregistered - data structure seems to be damaged!\n", msg.agent_name);
  108. myco_send(msg.agent_message_queue_id, msg);
  109. return -1;
  110. }
  111. myco_resource *myco_daemon_find_resource_by_agent(const char *agent_name) {
  112. myco_resource *current_resource;
  113. current_resource = first_resource;
  114. while (current_resource != NULL) {
  115. if (strcmp(current_resource->agent, agent_name) == 0) {
  116. return current_resource;
  117. }
  118. current_resource = current_resource->next;
  119. }
  120. return NULL;
  121. }
  122. myco_resource *myco_daemon_find_resource(const char *resource_name) {
  123. myco_resource *current_resource;
  124. current_resource = first_resource;
  125. while (current_resource != NULL) {
  126. if (strcmp(current_resource->name, resource_name) == 0) {
  127. return current_resource;
  128. }
  129. current_resource = current_resource->next;
  130. }
  131. return NULL;
  132. }
  133. int myco_daemon_register_resource(message msg) {
  134. myco_resource *current_resource;
  135. if (myco_daemon_find_agent(msg.agent_name) == NULL) {
  136. sprintf(msg.message, "ERROR: agent %s does not exist\n", msg.agent_name);
  137. myco_send(msg.agent_message_queue_id, msg);
  138. return -1;
  139. }
  140. if (myco_daemon_find_resource(msg.resource_name) != NULL) {
  141. sprintf(msg.message, "ERROR: resource %s already exists\n", msg.resource_name);
  142. myco_send(msg.agent_message_queue_id, msg);
  143. return -1;
  144. }
  145. // Insert as first element
  146. if (first_resource == NULL) {
  147. first_resource = malloc(sizeof(myco_resource));
  148. first_resource->next = NULL;
  149. first_resource->prev = NULL;
  150. sprintf(first_resource->name, "%s", msg.resource_name);
  151. sprintf(first_resource->agent, "%s", msg.agent_name);
  152. first_resource->pid = msg.sender_pid;
  153. first_resource->pointer = msg.resource_pointer;
  154. first_resource->size = msg.resource_size;
  155. first_resource->read_locked = 0;
  156. first_resource->transfer_locked = 1;
  157. first_resource->transactional = msg.resource_transactional;
  158. first_resource->version = 0;
  159. } else {
  160. // Insert as last element
  161. current_resource = first_resource;
  162. while (current_resource->next != NULL) {
  163. current_resource = current_resource->next;
  164. }
  165. current_resource->next = malloc(sizeof(myco_resource));
  166. current_resource->next->prev = current_resource;
  167. current_resource->next->next = NULL;
  168. sprintf(current_resource->next->name, "%s", msg.resource_name);
  169. sprintf(current_resource->next->agent, "%s", msg.agent_name);
  170. current_resource->next->pid = msg.sender_pid;
  171. current_resource->next->pointer = msg.resource_pointer;
  172. current_resource->next->size = msg.resource_size;
  173. current_resource->next->transfer_locked = 1;
  174. current_resource->next->read_locked = 0;
  175. current_resource->next->transactional = msg.resource_transactional;
  176. current_resource->next->version = 0;
  177. }
  178. sprintf(msg.message, "SUCCESS: resource %s registered\n", msg.resource_name);
  179. myco_send(msg.agent_message_queue_id, msg);
  180. return 0;
  181. }
  182. int myco_daemon_unregister_resource(message msg) {
  183. myco_resource *current_resource;
  184. current_resource = myco_daemon_find_resource(msg.resource_name);
  185. if (current_resource == NULL) {
  186. sprintf(msg.message, "ERROR: resource %s could not be unregistered\n", msg.resource_name);
  187. myco_send(msg.agent_message_queue_id, msg);
  188. return -1;
  189. }
  190. // Element in the middle
  191. if (current_resource->next != NULL && current_resource->prev != NULL) {
  192. current_resource->prev->next = current_resource->next;
  193. current_resource->next->prev = current_resource->prev;
  194. free(current_resource);
  195. sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
  196. myco_send(msg.agent_message_queue_id, msg);
  197. return 0;
  198. }
  199. // First element
  200. if (current_resource->next != NULL && current_resource->prev == NULL) {
  201. first_resource = current_resource->next;
  202. current_resource->next->prev = NULL;
  203. free(current_resource);
  204. sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
  205. myco_send(msg.agent_message_queue_id, msg);
  206. return 0;
  207. }
  208. // Last element
  209. if (current_resource->next == NULL && current_resource->prev != NULL) {
  210. current_resource->prev->next = NULL;
  211. free(current_resource);
  212. sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
  213. myco_send(msg.agent_message_queue_id, msg);
  214. return 0;
  215. }
  216. // Only remaining
  217. if (current_resource->next == NULL && current_resource->prev == NULL) {
  218. first_resource = NULL;
  219. free(current_resource);
  220. sprintf(msg.message, "SUCCESS: resource %s unregistered\n", msg.resource_name);
  221. myco_send(msg.agent_message_queue_id, msg);
  222. return 0;
  223. }
  224. sprintf(msg.message, "FATAL ERROR: resource %s could not be unregistered - data structure seems to be damaged!\n", msg.resource_name);
  225. myco_send(msg.agent_message_queue_id, msg);
  226. return -1;
  227. }
  228. int myco_daemon_request_resource(message msg) {
  229. myco_resource *current_resource;
  230. // Check if resource exists
  231. if ((current_resource = myco_daemon_find_resource(msg.resource_name)) == NULL) {
  232. sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
  233. myco_send(msg.agent_message_queue_id, msg);
  234. return -1;
  235. }
  236. // Check if agent that requests already owns the resource
  237. if (strcmp(current_resource->agent, msg.agent_name) == 0) {
  238. sprintf(msg.message, "ERROR: resource %s already belongs to agent %s\n", msg.resource_name, msg.agent_name);
  239. myco_send(msg.agent_message_queue_id, msg);
  240. return -1;
  241. }
  242. // Check if resource is transactional
  243. if (current_resource->transactional != RESOURCE_TRANSACTIONAL) {
  244. sprintf(msg.message, "ERROR: resource %s is not transactional\n", msg.resource_name);
  245. myco_send(msg.agent_message_queue_id, msg);
  246. return -1;
  247. }
  248. // TODO: Handle case where resource is on another node (another agent)
  249. // If on same node, send information
  250. msg.sender_pid = current_resource->pid;
  251. msg.resource_size = current_resource->size;
  252. msg.resource_pointer = current_resource->pointer;
  253. sprintf(msg.message, "SUCCESS: resource found on same node, sending information\n");
  254. myco_send(msg.agent_message_queue_id, msg);
  255. msg = myco_receive(msg.agent_message_queue_id);
  256. if (strcmp(msg.message, "RESOURCE GRANTED") == 0) {
  257. // Transfer ownership of resource to agent
  258. sprintf(current_resource->agent, "%s", msg.agent_name);
  259. current_resource->pid = msg.sender_pid;
  260. current_resource->pointer = msg.resource_pointer;
  261. } else {
  262. fprintf(stderr, "ERROR: myco_daemon_request: %s\n", strerror(errno));
  263. return -1;
  264. }
  265. return 0;
  266. }
  267. int myco_daemon_request_list(message msg, pid_t pid) {
  268. myco_resource *current_resource;
  269. current_resource = first_resource;
  270. char *resource_pointer;
  271. char tmp[64];
  272. int resource_size = 0;
  273. if (current_resource == NULL) {
  274. sprintf(msg.message, "ERROR: no resources\n");
  275. myco_send(msg.agent_message_queue_id, msg);
  276. return -1;
  277. }
  278. while (current_resource != NULL) {
  279. resource_size += strlen(current_resource->name) + strlen(current_resource->agent) + strlen("1");
  280. resource_size += strlen("(,,,);");
  281. resource_size += 22;
  282. current_resource = current_resource->next;
  283. }
  284. resource_pointer = malloc(resource_size);
  285. memset(resource_pointer, 0, resource_size);
  286. current_resource = first_resource;
  287. while (current_resource != NULL) {
  288. strcat(resource_pointer, current_resource->name);
  289. strcat(resource_pointer, "(");
  290. strcat(resource_pointer, current_resource->agent);
  291. strcat(resource_pointer, ",");
  292. sprintf(tmp, "%d", current_resource->size);
  293. strcat(resource_pointer, tmp);
  294. strcat(resource_pointer, ",");
  295. sprintf(tmp, "%p", current_resource->pointer);
  296. strcat(resource_pointer, tmp);
  297. strcat(resource_pointer, ",");
  298. sprintf(tmp, "%d", current_resource->read_locked);
  299. strcat(resource_pointer, tmp);
  300. strcat(resource_pointer, ",");
  301. sprintf(tmp, "%d", current_resource->transfer_locked);
  302. strcat(resource_pointer, tmp);
  303. strcat(resource_pointer, ",");
  304. if (current_resource->transactional) {
  305. strcat(resource_pointer, "1");
  306. } else {
  307. strcat(resource_pointer, "0");
  308. }
  309. strcat(resource_pointer, ");");
  310. current_resource = current_resource->next;
  311. }
  312. msg.sender_pid = pid;
  313. msg.resource_size = resource_size;
  314. msg.resource_pointer = (void *)resource_pointer;
  315. sprintf(msg.message, "SUCCESS: sending resource list\n");
  316. myco_send(msg.agent_message_queue_id, msg);
  317. msg = myco_receive(msg.agent_message_queue_id);
  318. if (strcmp(msg.message, "RESOURCE LIST GRANTED") == 0) {
  319. free(resource_pointer);
  320. } else {
  321. fprintf(stderr, "ERROR: resource list was not granted: %s\n", strerror(errno));
  322. free(resource_pointer);
  323. return -1;
  324. }
  325. return 0;
  326. }
  327. int myco_daemon_write_remote_resource(message msg, int force) {
  328. myco_resource *current_resource;
  329. // Check if resource exists
  330. if ((current_resource = myco_daemon_find_resource(msg.resource_name)) == NULL) {
  331. sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
  332. myco_send(msg.agent_message_queue_id, msg);
  333. return -1;
  334. }
  335. if (force == 1) {
  336. sprintf(msg.message, "SUCCESS: forcing to overwrite resource %s, sending information\n", msg.resource_name);
  337. msg.resource_pointer = current_resource->pointer;
  338. msg.resource_size = current_resource->size;
  339. msg.sender_pid = current_resource->pid;
  340. myco_send(msg.agent_message_queue_id, msg);
  341. return 0;
  342. } else {
  343. if (current_resource->version > msg.version) {
  344. sprintf(msg.message, "ERROR: resource %s is newer than your copied version\n", msg.resource_name);
  345. myco_send(msg.agent_message_queue_id, msg);
  346. return -1;
  347. } else {
  348. sprintf(msg.message, "SUCCESS: version updated, sending information for resource %s\n", msg.resource_name);
  349. msg.resource_pointer = current_resource->pointer;
  350. msg.resource_size = current_resource->size;
  351. msg.sender_pid = current_resource->pid;
  352. myco_send(msg.agent_message_queue_id, msg);
  353. current_resource->version += 1;
  354. myco_send(msg.agent_message_queue_id, msg);
  355. return 0;
  356. }
  357. }
  358. }
  359. int myco_daemon_read_remote_resource(message msg) {
  360. myco_resource *current_resource;
  361. // Check if resource exists
  362. if ((current_resource = myco_daemon_find_resource(msg.resource_name)) == NULL) {
  363. sprintf(msg.message, "ERROR: resource %s does not exist\n", msg.resource_name);
  364. myco_send(msg.agent_message_queue_id, msg);
  365. return -1;
  366. }
  367. // Check if agent that requests already owns the resource
  368. if (strcmp(current_resource->agent, msg.agent_name) == 0) {
  369. sprintf(msg.message, "ERROR: resource %s already belongs to agent %s\n", msg.resource_name, msg.agent_name);
  370. myco_send(msg.agent_message_queue_id, msg);
  371. return -1;
  372. }
  373. // TODO: Handle case where resource is on another node (another agent)
  374. // If on same node, send information
  375. msg.sender_pid = current_resource->pid;
  376. msg.resource_size = current_resource->size;
  377. msg.resource_pointer = current_resource->pointer;
  378. sprintf(msg.message, "SUCCESS: resource found on same node, sending information\n");
  379. myco_send(msg.agent_message_queue_id, msg);
  380. msg = myco_receive(msg.agent_message_queue_id);
  381. if (strcmp(msg.message, "RESOURCE READ") == 0) {
  382. } else {
  383. fprintf(stderr, "ERROR: myco_daemon_reead_remote: %s\n", strerror(errno));
  384. return -1;
  385. }
  386. return 0;
  387. }
  388. int myco_daemon_lock_resource(message msg) {
  389. myco_resource *current_resource;
  390. current_resource = myco_daemon_find_resource(msg.resource_name);
  391. if (current_resource != NULL && current_resource->transfer_locked == 1 && strcmp(current_resource->agent, msg.agent_name) == 0) {
  392. current_resource->read_locked = 1;
  393. sprintf(msg.message, "SUCCESS: resource %s locked\n", msg.resource_name);
  394. myco_send(msg.agent_message_queue_id, msg);
  395. return 0;
  396. } else {
  397. sprintf(msg.message, "ERROR: resource %s could not be locked\n", msg.resource_name);
  398. myco_send(msg.agent_message_queue_id, msg);
  399. return -1;
  400. }
  401. }
  402. int myco_daemon_release_resource(message msg) {
  403. myco_resource *current_resource;
  404. current_resource = myco_daemon_find_resource(msg.resource_name);
  405. if (current_resource != NULL && strcmp(current_resource->agent, msg.agent_name) == 0) {
  406. current_resource->transfer_locked = 0;
  407. sprintf(msg.message, "SUCCESS: resource %s released\n", msg.resource_name);
  408. myco_send(msg.agent_message_queue_id, msg);
  409. return 0;
  410. } else {
  411. sprintf(msg.message, "ERROR: resource %s could not be released\n", msg.resource_name);
  412. myco_send(msg.agent_message_queue_id, msg);
  413. return -1;
  414. }
  415. }
  416. int myco_daemon_unlock_resource(message msg) {
  417. myco_resource *current_resource;
  418. current_resource = myco_daemon_find_resource(msg.resource_name);
  419. if (current_resource != NULL && current_resource->transfer_locked == 1 && strcmp(current_resource->agent, msg.agent_name) == 0) {
  420. current_resource->read_locked = 0;
  421. sprintf(msg.message, "SUCCESS: resource %s unlocked\n", msg.resource_name);
  422. myco_send(msg.agent_message_queue_id, msg);
  423. return 0;
  424. } else {
  425. sprintf(msg.message, "ERROR: resource %s could not be locked\n", msg.resource_name);
  426. myco_send(msg.agent_message_queue_id, msg);
  427. return -1;
  428. }
  429. }
  430. int myco_daemon_start(pid) {
  431. int daemon_message_queue_id;
  432. message msg = {0};
  433. // Create message queue
  434. daemon_message_queue_id = myco_create_global_message_queue();
  435. if (daemon_message_queue_id < 0) {
  436. fprintf(stderr, "ERROR: Message queue could not be created. %s\n", strerror(errno));
  437. return -1;
  438. }
  439. // Receive messages
  440. while (1) {
  441. msg = myco_receive(daemon_message_queue_id);
  442. if (DEBUG) {
  443. printf("%d, %s, %s, %s, %p, %d, %d, %d, %d\n", msg.agent_message_queue_id, msg.agent_name, msg.message, \
  444. msg.resource_name, msg.resource_pointer, msg.resource_size, msg.resource_transactional, msg.sender_pid, msg.version);
  445. }
  446. if (msg.message == NULL) {
  447. fprintf(stderr, "FATAL ERROR: No message could be received. %s\n", strerror(errno));
  448. return -1;
  449. } else {
  450. if (strcmp(msg.message, "REGISTER AGENT") == 0) {
  451. myco_daemon_register_agent(msg);
  452. }
  453. if (strcmp(msg.message, "UNREGISTER AGENT") == 0) {
  454. myco_daemon_unregister_agent(msg);
  455. }
  456. if (strcmp(msg.message, "REGISTER RESOURCE") == 0) {
  457. myco_daemon_register_resource(msg);
  458. }
  459. if (strcmp(msg.message, "UNREGISTER RESOURCE") == 0) {
  460. myco_daemon_unregister_resource(msg);
  461. }
  462. if (strcmp(msg.message, "REQUEST RESOURCE") == 0) {
  463. myco_daemon_request_resource(msg);
  464. }
  465. if (strcmp(msg.message, "REQUEST LIST") == 0) {
  466. myco_daemon_request_list(msg, pid);
  467. }
  468. if (strcmp(msg.message, "WRITE REMOTE RESOURCE") == 0) {
  469. myco_daemon_write_remote_resource(msg, 0);
  470. }
  471. if (strcmp(msg.message, "FORCE WRITE REMOTE RESOURCE") == 0) {
  472. myco_daemon_write_remote_resource(msg, 1);
  473. }
  474. if (strcmp(msg.message, "READ REMOTE RESOURCE") == 0) {
  475. myco_daemon_read_remote_resource(msg);
  476. }
  477. if (strcmp(msg.message, "LOCK RESOURCE") == 0) {
  478. myco_daemon_lock_resource(msg);
  479. }
  480. if (strcmp(msg.message, "UNLOCK RESOURCE") == 0) {
  481. myco_daemon_unlock_resource(msg);
  482. }
  483. if (strcmp(msg.message, "RELEASE RESOURCE") == 0) {
  484. myco_daemon_release_resource(msg);
  485. }
  486. }
  487. }
  488. if (myco_remove_message_queue(daemon_message_queue_id) == -1) {
  489. fprintf(stderr, "FATAL ERROR: could not remove message queue %d\n", daemon_message_queue_id);
  490. return -1;
  491. }
  492. return EXIT_SUCCESS;
  493. }
  494. // TODO: Communication with indexserver