seqreader.c 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329
  1. #define _GNU_SOURCE
  2. #include <stdio.h>
  3. #include <stdlib.h>
  4. #include <stdint.h>
  5. #include <sys/types.h>
  6. #include <sys/stat.h>
  7. #include <sys/time.h>
  8. #include <unistd.h>
  9. #include <dirent.h>
  10. #include <fcntl.h>
  11. #include <string.h>
  12. #include <errno.h>
  13. #include <libaio.h>
  14. #define FASTWRITER_SYNCIO_ALIGN 512
  15. #define SYNC_MODE
  16. #define AIO_MODE 2
  17. //#define FS_SYNC_MODE
  18. #define EXTRA_BUFFERS 2
  19. #define WRITE_INTERVAL 1
  20. #define RAID_STRIP_SIZE 256
  21. #define RAID_DISKS 8
  22. #define STRIPS_AT_ONCE 2
  23. #ifdef AIO_MODE
  24. # define SYNC_MODE
  25. #endif /* AIO_MODE */
  26. #ifdef SYNC_MODE
  27. # define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
  28. #else /* SYNC_MODE */
  29. # define BLOCK_SIZE 16384
  30. #endif /* SYNC_MODE */
  31. #ifdef AIO_MODE
  32. # define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
  33. #else /* AIO_MODE */
  34. # define BUFSIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS)
  35. #endif /* AIO_MODE */
  36. int main(int argc, char *argv[]) {
  37. int err;
  38. size_t SKIP = 1;
  39. DIR *dir;
  40. struct dirent *ent;
  41. struct timeval start, fstart, tv;
  42. size_t us;
  43. size_t files = 0;
  44. size_t total_size = 0;
  45. size_t last_write = 0;
  46. size_t last_size = 0;
  47. size_t skip;
  48. size_t run;
  49. size_t ready;
  50. ssize_t res;
  51. size_t max_size = (size_t)-1;
  52. char *buffer;
  53. long double mcoef = 1000000. / (1024 * 1024);
  54. int flags = O_RDONLY|O_NOATIME|O_LARGEFILE;
  55. #ifdef AIO_MODE
  56. int i;
  57. size_t curio, schedio;
  58. int done[AIO_MODE + EXTRA_BUFFERS];
  59. io_context_t aio;
  60. struct iocb io[AIO_MODE], *ioptr[AIO_MODE];
  61. int events;
  62. struct io_event ev[AIO_MODE];
  63. #endif /* AIO_MODE */
  64. posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
  65. if (argc < 2) {
  66. printf("Usage: %s <directory|device> [skip|size]\n", argv[0]);
  67. exit(0);
  68. }
  69. if (strstr(argv[1], "/dev/")) {
  70. if (argc > 2) {
  71. max_size = atol(argv[2]);
  72. max_size *= 1024 * 1024 * 1024;
  73. }
  74. #ifdef SYNC_MODE
  75. flags |= O_DIRECT;
  76. #endif
  77. printf("Used buffer: %i MB, Block: %i KB\n", BUFSIZE / 1024 / 1024, BLOCK_SIZE/1024);
  78. int fd = open(argv[1], flags, 0);
  79. if (fd < 0) {
  80. printf("Unable to open device %s\n", argv[1]);
  81. exit(1);
  82. }
  83. size_t size = BLOCK_SIZE;
  84. #ifdef AIO_MODE
  85. memset(done, 0, sizeof(done));
  86. memset(&aio, 0, sizeof(aio));
  87. io_queue_init(AIO_MODE, &aio);
  88. for (i = 0; i < AIO_MODE; i++) {
  89. ioptr[i] = &io[i];
  90. memset(ioptr[i], 0, sizeof(struct iocb));
  91. io_prep_pread(ioptr[i], fd, buffer + i * BLOCK_SIZE, BLOCK_SIZE, i * BLOCK_SIZE);
  92. io_set_callback(ioptr[i], (void*)(uintptr_t)i);
  93. }
  94. curio = 0;
  95. schedio = AIO_MODE;
  96. events = 0;
  97. #endif /* AIO_MODE */
  98. gettimeofday(&start, NULL);
  99. #ifdef AIO_MODE
  100. err = io_submit(aio, AIO_MODE, ioptr);
  101. if (err != AIO_MODE) {
  102. printf("io_submit returned %i\n", err);
  103. perror("Failed to submit initial AIO jobs");
  104. }
  105. #endif /* AIO_MODE */
  106. #ifdef AIO_MODE
  107. ready = 0;
  108. while (1) {
  109. if (!done[curio%(AIO_MODE + EXTRA_BUFFERS)]) {
  110. err = io_getevents(aio, 1, AIO_MODE - events, &ev[events], NULL);
  111. if (err < 0) perror("Error waiting for AIO\n");
  112. if ((!ready)&&(err > 1)) {
  113. printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
  114. printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
  115. printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
  116. }
  117. for (i = 0; i < err; i++) {
  118. struct io_event *ep = &ev[events + i];
  119. int doneio = (uintptr_t)ep->data;
  120. if (ep->res2 || (ep->res != BLOCK_SIZE)) perror("Error in async IO");
  121. done[doneio%(AIO_MODE + EXTRA_BUFFERS)] = 1;
  122. // printf("done (%i): %i\n", i, doneio);
  123. }
  124. events += err;
  125. for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
  126. struct iocb *newio = (struct iocb *)ev[i].obj;
  127. memset(newio, 0, sizeof(struct iocb));
  128. io_prep_pread(newio, fd, buffer + (schedio % (AIO_MODE + EXTRA_BUFFERS)) * BLOCK_SIZE, BLOCK_SIZE, schedio * BLOCK_SIZE);
  129. io_set_callback(newio, (void*)(uintptr_t)schedio);
  130. err = io_submit(aio, 1, &newio);
  131. if (err != 1) perror("Failed to submit AIO jobs");
  132. schedio++;
  133. }
  134. events = i + 1;
  135. if (events) {
  136. printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
  137. // printf(" curio (%zu), schedio (%zu)\n", curio, schedio);
  138. }
  139. ready = 1;
  140. continue;
  141. }
  142. done[curio%(AIO_MODE + EXTRA_BUFFERS)] = 0;
  143. curio++;
  144. res = BLOCK_SIZE;
  145. #else /* AIO_MODE */
  146. res = read(fd, buffer, size);
  147. while (res > 0) {
  148. #endif /* AIO_MODE */
  149. if (res != size) {
  150. printf("Incomplete read: %zu bytes read instead of %zu\n", res, size);
  151. exit(-1);
  152. }
  153. total_size += res;
  154. gettimeofday(&tv, NULL);
  155. us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
  156. if ((us - last_write) > WRITE_INTERVAL * 1000000) {
  157. printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us), (size_t)(mcoef * (total_size - last_size) / (us - last_write)));
  158. last_write = us;
  159. last_size = total_size;
  160. }
  161. if (total_size > max_size) {
  162. printf("Reading: %s (%lu GB), Measured speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
  163. break;
  164. }
  165. #ifndef AIO_MODE
  166. res = read(fd, buffer, size);
  167. #endif /* AIO_MODE */
  168. }
  169. #ifdef AIO_MODE
  170. io_queue_release(aio);
  171. #endif /* AIO_MODE */
  172. close(fd);
  173. if (res < 0) {
  174. printf("Read failed with errno %i\n", errno);
  175. exit(-1);
  176. }
  177. free(buffer);
  178. return 0;
  179. }
  180. #ifdef FS_SYNC_MODE
  181. flags |= O_DIRECT;
  182. #endif /* FS_SYNC_MODE */
  183. chdir(argv[1]);
  184. if (argc > 2) {
  185. SKIP = atoi(argv[2]);
  186. printf("Skip %zu\n", SKIP);
  187. }
  188. gettimeofday(&start, NULL);
  189. for (run = 0; run < SKIP; run++) {
  190. skip = 0;
  191. dir = opendir(".");
  192. while ((ent = readdir(dir))) {
  193. struct stat st;
  194. if (((skip++)%SKIP) != run) continue;
  195. if (stat(ent->d_name, &st)) continue;
  196. if (!S_ISREG(st.st_mode)) continue;
  197. int size = st.st_blksize;
  198. #ifdef F_MODE
  199. FILE *f = fopen(ent->d_name, "r");
  200. if (!f) continue;
  201. #else
  202. int fd = open(ent->d_name, flags, 0);
  203. if (fd < 0) continue;
  204. # ifdef FS_SYNC_MODE
  205. if (size < BLOCK_SIZE) size = BLOCK_SIZE;
  206. # endif /* FS_SYNC_MODE */
  207. #endif
  208. if (!files)
  209. printf("Reading %s, Block: %i KB\n", ent->d_name, size / 1024);
  210. if (size > BUFSIZE) {
  211. printf("Buffer too small\n");
  212. exit(1);
  213. }
  214. size_t last_file_write = 0;
  215. size_t last_file_size = 0;
  216. size_t file_size = 0;
  217. gettimeofday(&fstart, NULL);
  218. #ifdef F_MODE
  219. while (!feof(f)) {
  220. ssize_t ret = fread(buffer, 1, size, f);
  221. #else
  222. while (1) {
  223. ssize_t ret = read(fd, buffer, size);
  224. #endif
  225. if (ret <= 0) break;
  226. file_size += ret;
  227. gettimeofday(&tv, NULL);
  228. us = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
  229. if ((us - last_file_write) > WRITE_INTERVAL * 1000000) {
  230. printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", ent->d_name, file_size / 1024 / 1024 / 1024, (size_t)(mcoef * file_size / us), (size_t)(mcoef * (file_size - last_file_size) / (us - last_file_write)));
  231. last_file_write = us;
  232. last_file_size = file_size;
  233. }
  234. }
  235. if (!file_size) {
  236. printf("Read failed\n");
  237. exit(1);
  238. }
  239. #ifdef F_MODE
  240. fclose(f);
  241. #else
  242. close(fd);
  243. #endif
  244. total_size += st.st_size;
  245. files++;
  246. gettimeofday(&tv, NULL);
  247. us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
  248. if ((us - last_write) > WRITE_INTERVAL * 1000000) {
  249. last_write = us;
  250. printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
  251. us = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
  252. printf(", Last: %s (%lu MB) at %zu MB/s\n", ent->d_name, st.st_size/1024/1024, (size_t)(mcoef * file_size / us));
  253. }
  254. }
  255. closedir(dir);
  256. us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
  257. printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
  258. }
  259. free(buffer);
  260. }