123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354 |
- #define _GNU_SOURCE
- #include <stdio.h>
- #include <stdlib.h>
- #include <stdint.h>
- #include <sys/types.h>
- #include <sys/stat.h>
- #include <sys/time.h>
- #include <unistd.h>
- #include <dirent.h>
- #include <fcntl.h>
- #include <string.h>
- #include <errno.h>
- #include <libaio.h>
- #define FASTWRITER_SYNCIO_ALIGN 512
- #define SYNC_MODE
- #define AIO_MODE 2
- //#define FS_SYNC_MODE
- #define EXTRA_BUFFERS 2
- #define WRITE_INTERVAL 1
- #define RAID_STRIP_SIZE 256
- #define RAID_DISKS 8
- #define STRIPS_AT_ONCE 2
- #define FS_MIN_BLOCK_SIZE (1024 * RAID_STRIP_SIZE)
- #define FS_BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS)
- #ifdef AIO_MODE
- # define SYNC_MODE
- #endif /* AIO_MODE */
- #ifdef SYNC_MODE
- # define BLOCK_SIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
- #else /* SYNC_MODE */
- # define BLOCK_SIZE 16384
- #endif /* SYNC_MODE */
- #ifdef AIO_MODE
- # define BUFSIZE (BLOCK_SIZE * (AIO_MODE + EXTRA_BUFFERS))
- #else /* AIO_MODE */
- # define BUFSIZE (1024 * RAID_STRIP_SIZE * RAID_DISKS * STRIPS_AT_ONCE)
- #endif /* AIO_MODE */
- int main(int argc, char *argv[]) {
- int err;
- size_t SKIP = 1;
- DIR *dir;
- struct dirent *ent;
- struct timeval start, fstart, tv;
- size_t us;
- size_t files = 0;
- size_t total_size = 0;
- size_t last_write = 0;
- size_t last_size = 0;
- size_t skip;
- size_t run;
- size_t ready;
- ssize_t res;
- size_t max_size = (size_t)-1;
- char *buffer;
- long double mcoef = 1000000. / (1024 * 1024);
- int flags = O_RDONLY|O_NOATIME|O_LARGEFILE;
- struct stat st;
- #ifdef AIO_MODE
- int i;
- size_t curio, schedio;
- int done[AIO_MODE + EXTRA_BUFFERS];
-
- io_context_t aio;
- struct iocb io[AIO_MODE], *ioptr[AIO_MODE];
- int events;
- struct io_event ev[AIO_MODE];
- #endif /* AIO_MODE */
- posix_memalign((void**)&buffer, FASTWRITER_SYNCIO_ALIGN, BUFSIZE);
-
- if (argc < 2) {
- printf("Usage: %s <directory|file|device> [skip|size]\n", argv[0]);
- exit(0);
- }
-
- if (stat(argv[1], &st)) {
- printf("stat on (%s) have failed", argv[1]);
- exit(-1);
- }
-
- if (strstr(argv[1], "/dev/")||(S_ISREG(st.st_mode))) {
- if (argc > 2) {
- max_size = atol(argv[2]);
- max_size *= 1024 * 1024 * 1024;
- }
- #ifdef SYNC_MODE
- flags |= O_DIRECT;
- #endif
-
- printf("Used buffer: %i MB, Block: %i KB\n", BUFSIZE / 1024 / 1024, BLOCK_SIZE/1024);
- int fd = open(argv[1], flags, 0);
- if (fd < 0) {
- printf("Unable to open device %s\n", argv[1]);
- exit(1);
- }
- size_t size = BLOCK_SIZE;
- #ifdef AIO_MODE
- memset(done, 0, sizeof(done));
- memset(&aio, 0, sizeof(aio));
- io_queue_init(AIO_MODE, &aio);
- for (i = 0; i < AIO_MODE; i++) {
- ioptr[i] = &io[i];
- memset(ioptr[i], 0, sizeof(struct iocb));
- io_prep_pread(ioptr[i], fd, buffer + i * BLOCK_SIZE, BLOCK_SIZE, i * BLOCK_SIZE);
- io_set_callback(ioptr[i], (void*)(uintptr_t)i);
- }
- curio = 0;
- schedio = AIO_MODE;
- events = 0;
- #endif /* AIO_MODE */
-
- gettimeofday(&start, NULL);
- #ifdef AIO_MODE
- err = io_submit(aio, AIO_MODE, ioptr);
- if (err != AIO_MODE) {
- printf("Failed to submit initial AIO job, io_submit returned %i\n", err);
- exit(-1);
- }
- #endif /* AIO_MODE */
-
- #ifdef AIO_MODE
- ready = 0;
- while (1) {
- if (!done[curio%(AIO_MODE + EXTRA_BUFFERS)]) {
- // printf("%i,%i - %i [%i %i %i %i]\n", curio, schedio, events, done[0], done[1], done[2], done[3]);
-
- if (curio < schedio) {
- err = io_getevents(aio, 1, AIO_MODE + EXTRA_BUFFERS - events, &ev[events], NULL);
- if (err < 0) {
- printf("Error waiting for AIO (%i)\n", -err);
- exit(-1);
- }
- } else {
- err = 0;
- }
-
- if ((!ready)&&(err > 1)) {
- printf("*** Multiple read requests (%i of %i) are finished simultaneously. It is either:\n", err, AIO_MODE);
- printf(" Small buffer size (%i KB)\n", BLOCK_SIZE/1024);
- printf(" More parallel AIOs (%i) than supported by kernel, try %i\n", AIO_MODE, AIO_MODE - err);
- }
-
- for (i = 0; i < err; i++) {
- struct io_event *ep = &ev[events + i];
- int doneio = (uintptr_t)ep->data;
- if (ep->res2 || (ep->res != BLOCK_SIZE)) {
- printf("Error in async IO\n");
- exit(-1);
- }
- done[doneio%(AIO_MODE + EXTRA_BUFFERS)] = 1;
- // printf("done (%i): %i\n", i, doneio);
- }
-
- events += err;
-
- for (i = events - 1; (i >= 0)&&((schedio - curio) < (AIO_MODE + EXTRA_BUFFERS)); i--) {
- // printf("sched (%i): %i\n", i, schedio);
- struct iocb *newio = (struct iocb *)ev[i].obj;
- memset(newio, 0, sizeof(struct iocb));
- io_prep_pread(newio, fd, buffer + (schedio % (AIO_MODE + EXTRA_BUFFERS)) * BLOCK_SIZE, BLOCK_SIZE, schedio * BLOCK_SIZE);
- io_set_callback(newio, (void*)(uintptr_t)schedio);
- err = io_submit(aio, 1, &newio);
- if (err != 1) {
- printf("Failed to submit AIO jobs %i", err);
- exit(-1);
- }
- schedio++;
- }
- events = i + 1;
-
- if (events) {
- printf("*** Unprocessed events (%i), probably not enough buffer space...\n", events);
- // printf(" curio (%zu), schedio (%zu)\n", curio, schedio);
- }
- ready = 1;
- continue;
- }
- done[curio%(AIO_MODE + EXTRA_BUFFERS)] = 0;
- curio++;
- res = BLOCK_SIZE;
- #else /* AIO_MODE */
- res = read(fd, buffer, size);
- while (res > 0) {
- #endif /* AIO_MODE */
- if (res != size) {
- printf("Incomplete read: %zu bytes read instead of %zu\n", res, size);
- exit(-1);
- }
- total_size += res;
- gettimeofday(&tv, NULL);
- us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
- if ((us - last_write) > WRITE_INTERVAL * 1000000) {
- printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us), (size_t)(mcoef * (total_size - last_size) / (us - last_write)));
- last_write = us;
- last_size = total_size;
- }
-
- if (total_size > max_size) {
- printf("Reading: %s (%lu GB), Measured speed: %zu MB/s\n", argv[0], total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
- break;
- }
-
- #ifndef AIO_MODE
- res = read(fd, buffer, size);
- #endif /* AIO_MODE */
- }
-
- #ifdef AIO_MODE
- io_queue_release(aio);
- #endif /* AIO_MODE */
- close(fd);
- if (res < 0) {
- printf("Read failed with errno %i\n", errno);
- exit(-1);
- }
- free(buffer);
-
- return 0;
- }
- #ifdef FS_SYNC_MODE
- flags |= O_DIRECT;
- #endif /* FS_SYNC_MODE */
-
- chdir(argv[1]);
- if (argc > 2) {
- SKIP = atoi(argv[2]);
-
- printf("Skip %zu\n", SKIP);
- }
- gettimeofday(&start, NULL);
- for (run = 0; run < SKIP; run++) {
- skip = 0;
- dir = opendir(".");
- while ((ent = readdir(dir))) {
- if (((skip++)%SKIP) != run) continue;
- if (stat(ent->d_name, &st)) continue;
- if (!S_ISREG(st.st_mode)) continue;
- int size = st.st_blksize;
- #ifdef F_MODE
- FILE *f = fopen(ent->d_name, "r");
- if (!f) continue;
- #else
- int fd = open(ent->d_name, flags, 0);
- if (fd < 0) continue;
- # ifdef FS_SYNC_MODE
- if (size < BLOCK_SIZE) size = BLOCK_SIZE;
- # endif /* FS_SYNC_MODE */
- if (size < FS_MIN_BLOCK_SIZE) size = FS_BLOCK_SIZE;
- #endif
- if (!files)
- printf("Reading %s, Block: %i KB\n", ent->d_name, size / 1024);
-
- if (size > BUFSIZE) {
- printf("Buffer too small\n");
- exit(1);
- }
- size_t last_file_write = 0;
- size_t last_file_size = 0;
- size_t file_size = 0;
- gettimeofday(&fstart, NULL);
- #ifdef F_MODE
- while (!feof(f)) {
- ssize_t ret = fread(buffer, 1, size, f);
- #else
- while (1) {
- ssize_t ret = read(fd, buffer, size);
- #endif
- if (ret <= 0) break;
-
- file_size += ret;
- gettimeofday(&tv, NULL);
- us = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
- if ((us - last_file_write) > WRITE_INTERVAL * 1000000) {
- printf("Reading: %s (%lu GB), Measured speed: %zu MB/s, Current speed: %zu MB/s\n", ent->d_name, file_size / 1024 / 1024 / 1024, (size_t)(mcoef * file_size / us), (size_t)(mcoef * (file_size - last_file_size) / (us - last_file_write)));
- last_file_write = us;
- last_file_size = file_size;
- }
- }
- if (!file_size) {
- printf("Read failed\n");
- exit(1);
- }
- #ifdef F_MODE
- fclose(f);
- #else
- close(fd);
- #endif
- total_size += st.st_size;
- files++;
-
- gettimeofday(&tv, NULL);
- us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
- if ((us - last_write) > WRITE_INTERVAL * 1000000) {
- last_write = us;
- printf("Read: %lu files (%lu GB) at %zu MB/s", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
- us = (tv.tv_sec - fstart.tv_sec) * 1000000 + (tv.tv_usec - fstart.tv_usec);
- printf(", Last: %s (%lu MB) at %zu MB/s\n", ent->d_name, st.st_size/1024/1024, (size_t)(mcoef * file_size / us));
- }
- }
- closedir(dir);
- us = (tv.tv_sec - start.tv_sec) * 1000000 + (tv.tv_usec - start.tv_usec);
- printf("Total: %lu files (%lu GB) at %zu MB/s\n", files, total_size / 1024 / 1024 / 1024, (size_t)(mcoef * total_size / us));
- }
-
- free(buffer);
- }
|