#define _GNU_SOURCE #include #include #include #include #include #include #include #include #include #include #include #include #define FASTWRITER_SYNCIO_ALIGN 512 #define SYNC_MODE #define AIO_MODE 2 //#define FS_SYNC_MODE #define EXTRA_BUFFERS 2 #define WRITE_INTERVAL 1 #define RAID_STRIP_SIZE 256 #define RAID_DISKS 8 #define STRIPS_AT_ONCE 2 #define FS_MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE) #define FS_BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS) #ifdef AIO_MODE # define SYNC_MODE #endif /* AIO_MODE */ #ifdef SYNC_MODE # define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE) #else /* SYNC_MODE */ # define BLOCK_SIZE 16384 #endif /* SYNC_MODE */ #ifdef AIO_MODE # define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS)) #else /* AIO_MODE */ # define BUFSIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE) #endif /* AIO_MODE */ int main(int argc, char *argv[]) { int err; size_t SKIP = 1; DIR *dir; struct dirent *ent; struct timeval start, fstart, tv; size_t us; size_t files = 0; size_t total_size = 0; size_t last_write = 0; size_t last_size = 0; size_t skip; size_t run; size_t ready; ssize_t res; size_t max_size = (size_t)-1; char *buffer; long double mcoef = 1000000. / (1024 * 1024); int flags = O_RDONLY|O_NOATIME|O_LARGEFILE; struct stat st; #ifdef AIO_MODE int i; size_t curio, schedio; int done[AIO_MODE + EXTRA_BUFFERS]; io_context_t aio; struct iocb io[AIO_MODE], *ioptr[AIO_MODE]; int events; struct io_event ev[AIO_MODE]; #endif /* AIO_MODE */ posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE); if (argc < 2) { printf("Usage: %s [skip|size]\n", argv[0]); exit(0); } if (stat(argv[1], &st)) { printf("stat on (%s) have failed", argv[1]); exit(-1); } if (strstr(argv[1], "/dev/")||(S_ISREG(st.st_mode))) { if (argc > 2) { max_size = atol(argv[2]); max_size *= 1024 * 1024 * 1024; } #ifdef SYNC_MODE flags |= O_DIRECT; #endif printf("Used buffer: %i MB, Block: %i KB\n", BUFSIZE / 1024 / 1024, BLOCK_SIZE/1024); int fd = open(argv[1], flags, 0); if (fd < 0) { printf("Unable to open device %s\n", argv[1]); exit(1); } size_t size = BLOCK_SIZE; #ifdef AIO_MODE memset(done, 0, sizeof(done)); memset(&aio, 0, sizeof(aio)); io_queue_init(AIO_MODE, &aio); for (i = 0; i < AIO_MODE; i++) { ioptr[i] = &io[i]; memset(ioptr[i], 0, sizeof(struct iocb)); io_prep_pread(ioptr[i], fd, buffer + i * BLOCK_SIZE, BLOCK_SIZE, i * BLOCK_SIZE); io_set_callback(ioptr[i], (void*)(uintptr_t)i); } curio = 0; schedio = AIO_MODE; events = 0; #endif /* AIO_MODE */ gettimeofday(&start, NULL); #ifdef AIO_MODE err = io_submit(aio, AIO_MODE, ioptr); if (err != AIO_MODE) { printf("Failed to submit initial AIO job, io_submit returned %i\n", err); exit(-1); } #endif /* AIO_MODE */ #ifdef AIO_MODE ready = 0; while (1) { if (!done[curio%(AIO_MODE + EXTRA_BUFFERS)]) { // printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]); if (curio < schedio) { err = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL); if (err < 0) { printf("Error waiting for AIO (%i)\n", -err); exit(-1); } } else { err = 0; } if ((!ready)&&(err > 1)) { printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE); printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024); printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err); } for (i = 0; i < err; i++) { struct io_event *ep = &ev[events + i]; int doneio = (uintptr_t)ep->data; if (ep->res2 || (ep->res != BLOCK_SIZE)) { printf("Error in async IO\n"); exit(-1); } done[doneio%(AIO_MODE + EXTRA_BUFFERS)] = 1; // printf("done (%i): %i\n", i, doneio); } events += err; for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) { // printf("sched (%i): %i\n", i, schedio); struct iocb *newio = (struct iocb *)ev[i].obj; memset(newio, 0, sizeof(struct iocb)); io_prep_pread(newio, fd, buffer + (schedio % (AIO_MODE + EXTRA_BUFFERS)) * BLOCK_SIZE, BLOCK_SIZE, schedio * BLOCK_SIZE); io_set_callback(newio, (void*)(uintptr_t)schedio); err = io_submit(aio, 1, &newio); if (err != 1) { printf("Failed to submit AIO jobs %i", err); exit(-1); } schedio++; } events = i + 1; if (events) { printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events); // printf(" curio (%zu), schedio (%zu)\n", curio, schedio); } ready = 1; continue; } done[curio%(AIO_MODE + EXTRA_BUFFERS)] = 0; curio++; res = BLOCK_SIZE; #else /* AIO_MODE */ res = read(fd, buffer, size); while (res > 0) { #endif /* AIO_MODE */ if (res != size) { printf("Incomplete read: %zu bytes read instead of %zu\n", res, size); exit(-1); } total_size += res; gettimeofday(&tv, NULL); us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); if ((us - last_write) > WRITE_INTERVAL * 1000000) { printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us), (size_t)(mcoef * (total_size - last_size) / (us - last_write))); last_write = us; last_size = total_size; } if (total_size > max_size) { printf("Reading: %s (%lu GB), Measured speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); break; } #ifndef AIO_MODE res = read(fd, buffer, size); #endif /* AIO_MODE */ } #ifdef AIO_MODE io_queue_release(aio); #endif /* AIO_MODE */ close(fd); if (res < 0) { printf("Read failed with errno %i\n", errno); exit(-1); } free(buffer); return 0; } #ifdef FS_SYNC_MODE flags |= O_DIRECT; #endif /* FS_SYNC_MODE */ chdir(argv[1]); if (argc > 2) { SKIP = atoi(argv[2]); printf("Skip %zu\n", SKIP); } gettimeofday(&start, NULL); for (run = 0; run < SKIP; run++) { skip = 0; dir = opendir("."); while ((ent = readdir(dir))) { if (((skip++)%SKIP) != run) continue; if (stat(ent->d_name, &st)) continue; if (!S_ISREG(st.st_mode)) continue; int size = st.st_blksize; #ifdef F_MODE FILE *f = fopen(ent->d_name, "r"); if (!f) continue; #else int fd = open(ent->d_name, flags, 0); if (fd < 0) continue; # ifdef FS_SYNC_MODE if (size < BLOCK_SIZE) size = BLOCK_SIZE; # endif /* FS_SYNC_MODE */ if (size < FS_MIN_BLOCK_SIZE) size = FS_BLOCK_SIZE; #endif if (!files) printf("Reading %s, Block: %i KB\n", ent->d_name, size / 1024); if (size > BUFSIZE) { printf("Buffer too small\n"); exit(1); } size_t last_file_write = 0; size_t last_file_size = 0; size_t file_size = 0; gettimeofday(&fstart, NULL); #ifdef F_MODE while (!feof(f)) { ssize_t ret = fread(buffer, 1, size, f); #else while (1) { ssize_t ret = read(fd, buffer, size); #endif if (ret <= 0) break; file_size += ret; gettimeofday(&tv, NULL); us = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec); if ((us - last_file_write) > WRITE_INTERVAL * 1000000) { printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", ent->d_name, file_size / 1024 / 1024 / 1024, (size_t)(mcoef * file_size / us), (size_t)(mcoef * (file_size - last_file_size) / (us - last_file_write))); last_file_write = us; last_file_size = file_size; } } if (!file_size) { printf("Read failed\n"); exit(1); } #ifdef F_MODE fclose(f); #else close(fd); #endif total_size += st.st_size; files++; gettimeofday(&tv, NULL); us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); if ((us - last_write) > WRITE_INTERVAL * 1000000) { last_write = us; printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); us = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec); printf(", Last: %s (%lu MB) at %zu MB/s\n", ent->d_name, st.st_size/1024/1024, (size_t)(mcoef * file_size / us)); } } closedir(dir); us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec); printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us)); } free(buffer); }