rndreader.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. #define _GNU_SOURCE
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <stdint.h>
  5. #include <sys/types.h>
  6. #include <sys/stat.h>
  7. #include <sys/time.h>
  8. #include <unistd.h>
  9. #include <dirent.h>
  10. #include <fcntl.h>
  11. #include <string.h>
  12. #include <errno.h>
  13. #include <libaio.h>
  14. #define FASTWRITER_SYNCIO_ALIGN 4096
  15. #define SYNC_MODE
  16. #define AIO_MODE 2
  17. #define EXTRA_BUFFERS 2
  18. #define WRITE_INTERVAL 1
  19. size_t SKIP = 1;
  20. size_t SEGMENT = 0;
  21. size_t LINE = 0;
  22. #define RAID_STRIP_SIZE 256
  23. #define RAID_DISKS 8
  24. #define STRIPS_AT_ONCE 2
  25. #define MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE)
  26. #define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
  27. #define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
  28. #ifdef AIO_MODE
  29. # define SYNC_MODE
  30. #endif /* AIO_MODE */
  31. DIR *dir;
  32. struct dirent *ent;
  33. int flags = O_RDONLY|O_NOATIME|O_LARGEFILE|O_DIRECT;
  34. size_t run = 0; // file reading order
  35. size_t skip = 0; // file reading order
  36. int fd = -1; // we are currently scheduling reads from this file
  37. size_t offset = 0; // we are schedulling reads at this offset
  38. size_t size; // size of block we are schedulling to read
  39. size_t file_size; // total size of current file
  40. size_t file_block_size; // access unit for current file
  41. size_t curf = 0, schedf = 0;
  42. size_t curio = 0, schedio = 0;
  43. #define MAXLEN 128
  44. int done[AIO_MODE + EXTRA_BUFFERS];
  45. int done_finish[AIO_MODE + EXTRA_BUFFERS];
  46. char done_file[AIO_MODE + EXTRA_BUFFERS][MAXLEN + 1];
  47. size_t done_offset[AIO_MODE + EXTRA_BUFFERS];
  48. size_t done_size[AIO_MODE + EXTRA_BUFFERS];
  49. int done_fd[AIO_MODE + EXTRA_BUFFERS];
  50. #define buf(io) (io%(AIO_MODE + EXTRA_BUFFERS))
  51. #define max(a,b) (((a)>(b))?(a):(b))
  52. #define min(a,b) (((a)<(b))?(a):(b))
  53. #define page(size) (((size/4096)+(size%4096?1:0))*4096)
  54. int next_file() {
  55. size_t size;
  56. struct stat st;
  57. next:
  58. while ((ent = readdir(dir))) {
  59. skip += 1;
  60. if ((((skip) - 1)%SKIP) != run) continue;
  61. if (stat(ent->d_name, &st)) continue;
  62. if (!S_ISREG(st.st_mode)) continue;
  63. break;
  64. }
  65. if (ent) {
  66. size = st.st_blksize;
  67. int fd = open(ent->d_name, flags, 0);
  68. if (fd < 0) goto next;
  69. if (size < MIN_BLOCK_SIZE) size = BLOCK_SIZE;
  70. if (size > BUFSIZE) {
  71. printf("Buffer too small\n");
  72. exit(1);
  73. }
  74. file_block_size = size;
  75. file_size = st.st_size;
  76. offset = 0;
  77. return fd;
  78. } else {
  79. skip = 0;
  80. run += 1;
  81. if (run < SKIP) {
  82. closedir(dir);
  83. dir = opendir(".");
  84. goto next;
  85. }
  86. }
  87. return -1;
  88. }
  89. int next_block() {
  90. int io;
  91. size_t next_offset;
  92. if (fd >= 0) {
  93. if (LINE) offset += SEGMENT;
  94. else offset += size;
  95. if (offset >= file_size) {
  96. fd = -1;
  97. }
  98. }
  99. if (fd < 0) {
  100. fd = next_file();
  101. if (fd < 0) return -1;
  102. // printf("open ===> %s (%i)\n", ent->d_name, fd);
  103. }
  104. if (LINE) size = LINE;
  105. else size = file_block_size;
  106. if ((offset + size) > file_size) size = file_size - offset;
  107. io = buf(schedio);
  108. strncpy(done_file[io], ent->d_name, MAXLEN);
  109. done_offset[io] = offset;
  110. done_size[io] = size;
  111. done_fd[io] = fd;
  112. done[io] = 0;
  113. if (LINE) next_offset = offset + SEGMENT;
  114. else next_offset = offset + size;
  115. if (next_offset >= file_size) done_finish[io] = 1;
  116. else done_finish[io] = 0;
  117. return 0;
  118. }
  119. int main(int argc, char *argv[]) {
  120. int err;
  121. int i, n, io;
  122. long double mcoef = 1000000. / (1024 * 1024);
  123. io_context_t aio;
  124. struct iocb ios[AIO_MODE], *ioptr[AIO_MODE];
  125. int events;
  126. struct io_event ev[AIO_MODE];
  127. int ready;
  128. void *buffer;
  129. struct timeval start, fstart, tv;
  130. if (argc < 2) {
  131. printf("Usage: %s <directory> [skip] [segment] [line]\n", argv[0]);
  132. exit(0);
  133. }
  134. chdir(argv[1]);
  135. dir = opendir(".");
  136. if (argc > 2) SKIP = atoi(argv[2]);
  137. if (argc > 3) SEGMENT = atoi(argv[3]);
  138. if (argc > 4) LINE = atoi(argv[4]);
  139. if (!SKIP) SKIP = 1;
  140. printf("%s: Skip %zu, Segment %zu, Line %zu\n", argv[1], SKIP, SEGMENT, LINE);
  141. posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
  142. memset(done, 0, sizeof(done));
  143. memset(&aio, 0, sizeof(aio));
  144. io_queue_init(AIO_MODE, &aio);
  145. for (schedio = 0; schedio < AIO_MODE; schedio++) {
  146. ioptr[schedio] = &ios[schedio];
  147. memset(ioptr[schedio], 0, sizeof(struct iocb));
  148. err = next_block();
  149. if (err) break;
  150. io_prep_pread(ioptr[schedio], fd, buffer + schedio * BLOCK_SIZE, page(size), offset);
  151. io_set_callback(ioptr[schedio], (void*)(uintptr_t)schedio);
  152. // printf("sched %zu: %zu (%zu %zu)\n", schedio, schedio, offset, size);
  153. }
  154. size_t us, fileus;
  155. size_t last_write = 0;
  156. // size_t last_file_write = 0;
  157. // size_t last_file_size = 0;
  158. size_t cur_file_size = 0;
  159. size_t total_size = 0;
  160. size_t files = 0;
  161. gettimeofday(&start, NULL);
  162. gettimeofday(&fstart, NULL);
  163. n = io_submit(aio, schedio, ioptr);
  164. if (n != schedio) {
  165. printf("Failed to submit initial AIO job, io_submit returned %i\n", err);
  166. exit(-1);
  167. }
  168. curio = 0;
  169. events = 0;
  170. ready = 0;
  171. while ((err >= 0)||(curio != schedio)) {
  172. io = buf(curio);
  173. if (!done[io]) {
  174. // printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]);
  175. if (curio < schedio) {
  176. n = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL);
  177. if (n < 0) {
  178. printf("Error waiting for AIO (%i)\n", -err);
  179. exit(-1);
  180. }
  181. } else {
  182. n = 0;
  183. }
  184. if ((!ready)&&(n > 1)) {
  185. printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
  186. printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
  187. printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
  188. }
  189. for (i = 0; i < n; i++) {
  190. struct io_event *ep = &ev[events + i];
  191. int doneio = (uintptr_t)ep->data;
  192. io = buf(doneio);
  193. // printf("done %i: %lu %zu %zi\n", doneio, ep->res2, done_size[io], ep->res);
  194. if (ep->res2 || (ep->res < done_size[io])) {
  195. printf("Error in async IO (ret: %li, ret size: %zi, expected %zu)\n", ep->res2, ep->res, page(done_size[io]));
  196. exit(-1);
  197. }
  198. done[io] = 1;
  199. // printf("done (%i): %i\n", i, doneio);
  200. }
  201. events += n;
  202. for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
  203. err = next_block();
  204. if (err) break;
  205. // printf("sched %i: %zu (%i %zu %zu)\n", i, schedio, fd, offset, size);
  206. struct iocb *newio = (struct iocb *)ev[i].obj;
  207. memset(newio, 0, sizeof(struct iocb));
  208. io_prep_pread(newio, fd, buffer + buf(schedio) * BLOCK_SIZE, page(size), offset);
  209. io_set_callback(newio, (void*)(uintptr_t)schedio);
  210. err = io_submit(aio, 1, &newio);
  211. if (err != 1) {
  212. printf("Failed to submit AIO jobs %i\n", err);
  213. exit(-1);
  214. }
  215. schedio++;
  216. }
  217. events = i + 1;
  218. if ((events)&&(!err)) {
  219. printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
  220. // printf(" curio (%zu), schedio (%zu)\n", curio, schedio);
  221. }
  222. ready = 1;
  223. continue;
  224. }
  225. io = buf(curio);
  226. cur_file_size += done_size[io];
  227. total_size += done_size[io];
  228. gettimeofday(&tv, NULL);
  229. us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
  230. fileus = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
  231. if (done_finish[io]) {
  232. // printf("closing %i ===> %s (%i)\n", io, done_file[io], done_fd[io]);
  233. close(done_fd[io]);
  234. gettimeofday(&fstart, NULL);
  235. files++;
  236. }
  237. if ((us - last_write) > WRITE_INTERVAL * 1000000) {
  238. last_write = us;
  239. printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
  240. if (1) {//cur_file_size > BLOCK_SIZE) {
  241. printf(", Last: %s (%lu MB) at %zu MB/s\n", done_file[io], cur_file_size / 1024 / 1024, (size_t)(mcoef * cur_file_size / fileus));
  242. } else {
  243. printf("\n");
  244. }
  245. }
  246. if (done_finish[io]) {
  247. cur_file_size = 0;
  248. }
  249. done[io] = 0;
  250. curio++;
  251. }
  252. gettimeofday(&tv, NULL);
  253. us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
  254. printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
  255. free(buffer);
  256. closedir(dir);
  257. }