Use dlmalloc to manage shared memory (#15)

* Use dlmalloc to manage shared memory

* add stresstest
This commit is contained in:
Richard Shin 2016-09-10 16:39:24 -07:00 committed by Robert Nishihara
parent 04737f3f56
commit d52bf7d146
12 changed files with 6496 additions and 53 deletions

1
.gitignore vendored
View file

@ -1,2 +1,3 @@
build/*
*~
*.pyc

View file

@ -7,7 +7,7 @@ else
base_commit="$TRAVIS_BRANCH"
echo "Running clang-format against branch $base_commit, with hash $(git rev-parse $base_commit)"
fi
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff)"
output="$(.travis/git-clang-format --binary clang-format-3.8 --commit $base_commit --diff --exclude ^third_party/)"
if [ "$output" == "no modified files to format" ] || [ "$output" == "clang-format did not modify any files" ] ; then
echo "clang-format passed."
exit 0

View file

@ -97,6 +97,7 @@ def main():
default_extensions),
help=('comma-separated list of file extensions to format, '
'excluding the period and case-insensitive')),
p.add_argument('--exclude', help='Exclude files matching this regex.')
p.add_argument('-f', '--force', action='store_true',
help='allow changes to unstaged files')
p.add_argument('-p', '--patch', action='store_true',
@ -125,10 +126,14 @@ def main():
if opts.verbose >= 1:
ignored_files = set(changed_lines)
filter_by_extension(changed_lines, opts.extensions.lower().split(','))
if opts.exclude:
for filename in changed_lines.keys():
if re.match(opts.exclude, filename):
del changed_lines[filename]
if opts.verbose >= 1:
ignored_files.difference_update(changed_lines)
if ignored_files:
print 'Ignoring changes in the following files (wrong extension):'
print 'Ignoring changes in the following files:'
for filename in ignored_files:
print ' ', filename
if changed_lines:

View file

@ -1,5 +1,5 @@
CC = gcc
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500
CFLAGS = -g -Wall --std=c99 -D_XOPEN_SOURCE=500 -I.
BUILD = build
all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(BUILD)/example
@ -7,8 +7,8 @@ all: $(BUILD)/plasma_store $(BUILD)/plasma_manager $(BUILD)/plasma_client.so $(B
clean:
rm -r $(BUILD)/*
$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/event_loop.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_store.c src/event_loop.c src/fling.c -o $(BUILD)/plasma_store
$(BUILD)/plasma_store: src/plasma_store.c src/plasma.h src/event_loop.h src/event_loop.c src/fling.h src/fling.c src/malloc.c src/malloc.h third_party/dlmalloc.c
$(CC) $(CFLAGS) src/plasma_store.c src/event_loop.c src/fling.c src/malloc.c -o $(BUILD)/plasma_store
$(BUILD)/plasma_manager: src/plasma_manager.c src/event_loop.h src/event_loop.c src/plasma.h src/plasma_client.c src/fling.h src/fling.c
$(CC) $(CFLAGS) src/plasma_manager.c src/event_loop.c src/plasma_client.c src/fling.c -o $(BUILD)/plasma_manager

View file

@ -19,6 +19,7 @@ int send_fd(int conn, int fd, const char *payload, int size) {
struct msghdr msg;
struct iovec iov;
char buf[CMSG_SPACE(sizeof(int))];
memset(&buf, 0, CMSG_SPACE(sizeof(int)));
init_msg(&msg, &iov, buf, sizeof(buf));

123
src/malloc.c Normal file
View file

@ -0,0 +1,123 @@
#include <assert.h>
#include <stdlib.h>
#include <stddef.h>
#include <stdio.h>
#include <string.h>
#include <sys/mman.h>
#include <unistd.h>
#include "plasma.h"
#include "uthash.h"
void *fake_mmap(size_t);
int fake_munmap(void *, size_t);
#define MMAP(s) fake_mmap(s)
#define MUNMAP(a, s) fake_munmap(a, s)
#define DIRECT_MMAP(s) fake_mmap(s)
#define DIRECT_MUNMAP(a, s) fake_munmap(a, s)
#define USE_DL_PREFIX
#define HAVE_MORECORE 0
#include "third_party/dlmalloc.c"
#undef MMAP
#undef MUNMAP
#undef DIRECT_MMAP
#undef DIRECT_MUNMAP
#undef USE_DL_PREFIX
#undef HAVE_MORECORE
struct mmap_record {
int fd;
void *pointer;
int64_t size;
UT_hash_handle hh_fd;
UT_hash_handle hh_pointer;
};
struct mmap_record *records_by_fd = NULL;
struct mmap_record *records_by_pointer = NULL;
/* Create a buffer. This is creating a temporary file and then
* immediately unlinking it so we do not leave traces in the system. */
int create_buffer(int64_t size) {
static char template[] = "/tmp/plasmaXXXXXX";
char file_name[32];
strncpy(file_name, template, 32);
int fd = mkstemp(file_name);
if (fd < 0)
return -1;
FILE *file = fdopen(fd, "a+");
if (!file) {
close(fd);
return -1;
}
if (unlink(file_name) != 0) {
LOG_ERR("unlink error");
return -1;
}
if (ftruncate(fd, (off_t) size) != 0) {
LOG_ERR("ftruncate error");
return -1;
}
return fd;
}
void *fake_mmap(size_t size) {
// Add sizeof(size_t) so that the returned pointer is deliberately not
// page-aligned. This ensures that the segments of memory returned by
// fake_mmap are never contiguous.
int fd = create_buffer(size + sizeof(size_t));
void *pointer = mmap(NULL, size + sizeof(size_t), PROT_READ | PROT_WRITE,
MAP_SHARED, fd, 0);
if (pointer == MAP_FAILED) {
return pointer;
}
pointer += sizeof(size_t);
struct mmap_record *record = malloc(sizeof(struct mmap_record));
record->fd = fd;
record->pointer = pointer;
record->size = size;
HASH_ADD(hh_fd, records_by_fd, fd, sizeof(fd), record);
HASH_ADD(hh_pointer, records_by_pointer, pointer, sizeof(pointer), record);
LOG_DEBUG("%p = fake_mmap(%lu)", pointer, size);
return pointer;
}
int fake_munmap(void *addr, size_t size) {
LOG_DEBUG("fake_munmap(%p, %lu)", addr, size);
struct mmap_record *record;
addr -= sizeof(size_t);
HASH_FIND(hh_pointer, records_by_pointer, &addr, sizeof(addr), record);
assert(record != NULL);
close(record->fd);
HASH_DELETE(hh_fd, records_by_fd, record);
HASH_DELETE(hh_pointer, records_by_pointer, record);
return munmap(addr, size + sizeof(size_t));
}
void get_malloc_mapinfo(void *addr,
int *fd,
int64_t *map_size,
ptrdiff_t *offset) {
struct mmap_record *record;
// TODO(rshin): Implement a more efficient search through records_by_fd.
for (record = records_by_fd; record != NULL; record = record->hh_fd.next) {
if (addr >= record->pointer && addr < record->pointer + record->size) {
*fd = record->fd;
*map_size = record->size;
*offset = addr - record->pointer;
return;
}
}
*fd = -1;
*map_size = 0;
*offset = 0;
}

9
src/malloc.h Normal file
View file

@ -0,0 +1,9 @@
#ifndef MALLOC_H
#define MALLOC_H
void get_malloc_mapinfo(void *addr,
int *fd,
int64_t *map_length,
ptrdiff_t *offset);
#endif // MALLOC_H

View file

@ -4,6 +4,7 @@
#include <inttypes.h>
#include <stdio.h>
#include <errno.h>
#include <stddef.h>
#include <string.h>
#ifdef NDEBUG
@ -13,13 +14,17 @@
fprintf(stderr, "[DEBUG] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#endif
#ifdef PLASMA_LOGGIN_ON
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
#else
#define LOG_INFO(M, ...)
#endif
#define LOG_ERR(M, ...) \
fprintf(stderr, "[ERROR] (%s:%d: errno: %s) " M "\n", __FILE__, __LINE__, \
errno == 0 ? "None" : strerror(errno), ##__VA_ARGS__)
#define LOG_INFO(M, ...) \
fprintf(stderr, "[INFO] (%s:%d) " M "\n", __FILE__, __LINE__, ##__VA_ARGS__)
typedef struct {
int64_t size;
int64_t create_time;
@ -59,7 +64,9 @@ enum plasma_reply_type {
typedef struct {
int type;
int64_t size;
ptrdiff_t offset;
int64_t map_size;
int64_t object_size;
} plasma_reply;
typedef struct {

View file

@ -31,12 +31,15 @@ void plasma_create(int conn, plasma_id object_id, int64_t size, void **data) {
plasma_reply reply;
int fd = recv_fd(conn, (char *) &reply, sizeof(plasma_reply));
assert(reply.type == PLASMA_OBJECT);
assert(reply.size == size);
*data = mmap(NULL, reply.size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
assert(reply.object_size == size);
*data =
mmap(NULL, reply.map_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0) +
reply.offset;
if (*data == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
}
close(fd);
}
void plasma_get(int conn, plasma_id object_id, int64_t *size, void **data) {
@ -51,12 +54,14 @@ void plasma_get(int conn, plasma_id object_id, int64_t *size, void **data) {
fd = new_fd;
}
assert(reply.type == PLASMA_OBJECT);
*data = mmap(NULL, reply.size, PROT_READ, MAP_SHARED, fd, 0);
*data =
mmap(NULL, reply.map_size, PROT_READ, MAP_SHARED, fd, 0) + reply.offset;
if (*data == MAP_FAILED) {
LOG_ERR("mmap failed");
exit(-1);
}
*size = reply.size;
close(fd);
*size = reply.object_size;
}
void plasma_seal(int fd, plasma_id object_id) {
@ -116,8 +121,10 @@ int plasma_manager_connect(const char *ip_addr, int port) {
int r = connect(fd, (struct sockaddr *) &addr, sizeof(addr));
if (r < 0) {
LOG_ERR("could not establish connection to manager with id %s:%d",
&ip_addr[0], port);
LOG_ERR(
"could not establish connection to manager with id %s:%d (probably ran "
"out of ports)",
&ip_addr[0], port);
exit(-1);
}
return fd;

View file

@ -9,6 +9,7 @@
* It keeps a hash table that maps object_ids (which are 20 byte long,
* just enough to store and SHA1 hash) to memory mapped files. */
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
@ -22,11 +23,14 @@
#include "uthash.h"
#include "fling.h"
#include "malloc.h"
#include "plasma.h"
#include "event_loop.h"
#define MAX_NUM_CLIENTS 100000
void* dlmalloc(size_t);
typedef struct {
/* Event loop for the plasma store. */
event_loop* loop;
@ -44,6 +48,10 @@ typedef struct {
plasma_object_info info;
/* Memory mapped file containing the object. */
int fd;
/* Size of the underlying map. */
int64_t map_size;
/* Offset from the base of the mmap. */
ptrdiff_t offset;
/* Handle for the uthash table. */
UT_hash_handle handle;
} object_table_entry;
@ -69,47 +77,32 @@ typedef struct {
/* Objects that processes are waiting for. */
object_notify_entry* objects_notify = NULL;
/* Create a buffer. This is creating a temporary file and then
* immediately unlinking it so we do not leave traces in the system. */
int create_buffer(int64_t size) {
static char template[] = "/tmp/plasmaXXXXXX";
char file_name[32];
strncpy(file_name, template, 32);
int fd = mkstemp(file_name);
if (fd < 0)
return -1;
FILE* file = fdopen(fd, "a+");
if (!file) {
close(fd);
return -1;
}
if (unlink(file_name) != 0) {
LOG_ERR("unlink error");
return -1;
}
if (ftruncate(fd, (off_t) size) != 0) {
LOG_ERR("ftruncate error");
return -1;
}
return fd;
}
/* Create a new object buffer in the hash table. */
void create_object(int conn, plasma_request* req) {
LOG_INFO("creating object"); /* TODO(pcm): add object_id here */
int fd = create_buffer(req->size);
if (fd < 0) {
LOG_ERR("could not create shared memory buffer");
exit(-1);
}
void* pointer = dlmalloc(req->size);
int fd;
int64_t map_size;
ptrdiff_t offset;
get_malloc_mapinfo(pointer, &fd, &map_size, &offset);
assert(fd != -1);
object_table_entry* entry = malloc(sizeof(object_table_entry));
memcpy(&entry->object_id, &req->object_id, 20);
entry->info.size = req->size;
/* TODO(pcm): set the other fields */
entry->fd = fd;
entry->map_size = map_size;
entry->offset = offset;
HASH_ADD(handle, open_objects, object_id, sizeof(plasma_id), entry);
plasma_reply reply = {PLASMA_OBJECT, req->size};
send_fd(conn, fd, (char*) &reply, sizeof(plasma_reply));
plasma_reply reply;
memset(&reply, 0, sizeof(reply));
reply.type = PLASMA_OBJECT;
reply.offset = offset;
reply.map_size = map_size;
reply.object_size = req->size;
send_fd(conn, fd, (char*) &reply, sizeof(reply));
}
/* Get an object from the hash table. */
@ -117,7 +110,8 @@ void get_object(int conn, plasma_request* req) {
object_table_entry* entry;
HASH_FIND(handle, sealed_objects, &req->object_id, sizeof(plasma_id), entry);
if (entry) {
plasma_reply reply = {PLASMA_OBJECT, entry->info.size};
plasma_reply reply = {PLASMA_OBJECT, entry->offset, entry->map_size,
entry->info.size};
send_fd(conn, entry->fd, (char*) &reply, sizeof(plasma_reply));
} else {
LOG_INFO("object not in hash table of sealed objects");
@ -129,7 +123,7 @@ void get_object(int conn, plasma_request* req) {
notify_entry->num_waiting += 1;
HASH_ADD(handle, objects_notify, object_id, sizeof(plasma_id),
notify_entry);
plasma_reply reply = {PLASMA_FUTURE, -1};
plasma_reply reply = {PLASMA_FUTURE, 0, 0, -1};
send_fd(conn, fd[1], (char*) &reply, sizeof(plasma_reply));
}
}
@ -143,8 +137,6 @@ void seal_object(int conn, plasma_request* req) {
return; /* TODO(pcm): return error */
}
HASH_DELETE(handle, open_objects, entry);
int64_t size = entry->info.size;
int fd = entry->fd;
HASH_ADD(handle, sealed_objects, object_id, sizeof(plasma_id), entry);
/* Inform processes that the object is ready now. */
object_notify_entry* notify_entry;
@ -153,9 +145,11 @@ void seal_object(int conn, plasma_request* req) {
if (!notify_entry) {
return;
}
plasma_reply reply = {PLASMA_OBJECT, size};
plasma_reply reply = {PLASMA_OBJECT, entry->offset, entry->map_size,
entry->info.size};
for (int i = 0; i < notify_entry->num_waiting; ++i) {
send_fd(notify_entry->conn[i], fd, (char*) &reply, sizeof(plasma_reply));
send_fd(notify_entry->conn[i], entry->fd, (char*) &reply,
sizeof(plasma_reply));
close(notify_entry->conn[i]);
}
HASH_DELETE(handle, objects_notify, notify_entry);

View file

@ -1,3 +1,5 @@
from __future__ import print_function
import os
import socket
import subprocess
@ -130,5 +132,19 @@ class TestPlasmaManager(unittest.TestCase):
# # Transferring the buffer before sealing it should fail.
# self.assertRaises(Exception, lambda : self.manager1.transfer(1, object_id))
def test_stresstest(self):
a = time.time()
object_ids = []
for i in range(10000): # TODO(pcm): increase this to 100000
object_id = random_object_id()
object_ids.append(object_id)
self.client1.create(object_id, 1)
self.client1.seal(object_id)
for object_id in object_ids:
self.client1.transfer("127.0.0.1", self.port2, object_id)
b = time.time() - a
print("it took", b, "seconds to put and transfer the objects")
if __name__ == "__main__":
unittest.main(verbosity=2)

6280
third_party/dlmalloc.c vendored Normal file

File diff suppressed because it is too large Load diff