diff --git a/examples/rl_pong/driver.py b/examples/rl_pong/driver.py new file mode 100644 index 000000000..a9328c05a --- /dev/null +++ b/examples/rl_pong/driver.py @@ -0,0 +1,52 @@ +# This code is copied and adapted from Andrej Karpathy's code for learning to +# play Pong https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5. + +import numpy as np +import cPickle as pickle +import gym +import ray +import ray.services as services +import os + +import functions + +worker_dir = os.path.dirname(os.path.abspath(__file__)) +worker_path = os.path.join(worker_dir, "worker.py") +services.start_singlenode_cluster(return_drivers=False, num_objstores=1, num_workers_per_objstore=10, worker_path=worker_path) + +# hyperparameters +H = 200 # number of hidden layer neurons +batch_size = 10 # every how many episodes to do a param update? +learning_rate = 1e-4 +decay_rate = 0.99 # decay factor for RMSProp leaky sum of grad^2 +resume = False # resume from previous checkpoint? + +running_reward = None +batch_num = 1 +D = functions.D # input dimensionality: 80x80 grid +if resume: + model = pickle.load(open("save.p", "rb")) +else: + model = {} + model["W1"] = np.random.randn(H, D) / np.sqrt(D) # "Xavier" initialization + model["W2"] = np.random.randn(H) / np.sqrt(H) +grad_buffer = {k: np.zeros_like(v) for k, v in model.iteritems()} # update buffers that add up gradients over a batch +rmsprop_cache = {k: np.zeros_like(v) for k, v in model.iteritems()} # rmsprop memory + +while True: + modelref = ray.push(model) + grads = [] + for i in range(batch_size): + grads.append(functions.compgrad(modelref)) + for i in range(batch_size): + grad = ray.pull(grads[i]) + for k in model: grad_buffer[k] += grad[0][k] # accumulate grad over batch + running_reward = grad[1] if running_reward is None else running_reward * 0.99 + grad[1] * 0.01 + print "Batch {}. episode reward total was {}. running mean: {}".format(batch_num, grad[1], running_reward) + for k, v in model.iteritems(): + g = grad_buffer[k] # gradient + rmsprop_cache[k] = decay_rate * rmsprop_cache[k] + (1 - decay_rate) * g ** 2 + model[k] += learning_rate * g / (np.sqrt(rmsprop_cache[k]) + 1e-5) + grad_buffer[k] = np.zeros_like(v) # reset batch gradient buffer + batch_num += 1 + if batch_num % 10 == 0: pickle.dump(model, open("save.p", "wb")) diff --git a/examples/rl_pong/functions.py b/examples/rl_pong/functions.py new file mode 100644 index 000000000..85cf4a858 --- /dev/null +++ b/examples/rl_pong/functions.py @@ -0,0 +1,85 @@ +# This code is copied and adapted from Andrej Karpathy's code for learning to +# play Pong https://gist.github.com/karpathy/a4166c7fe253700972fcbc77e4ea32c5. + +import ray +import numpy as np +import gym + +env = gym.make("Pong-v0") +D = 80 * 80 +gamma = 0.99 # discount factor for reward +def sigmoid(x): + return 1.0 / (1.0 + np.exp(-x)) # sigmoid "squashing" function to interval [0,1] + +def preprocess(I): + """preprocess 210x160x3 uint8 frame into 6400 (80x80) 1D float vector""" + I = I[35:195] # crop + I = I[::2,::2,0] # downsample by factor of 2 + I[I == 144] = 0 # erase background (background type 1) + I[I == 109] = 0 # erase background (background type 2) + I[I != 0] = 1 # everything else (paddles, ball) just set to 1 + return I.astype(np.float).ravel() + +def discount_rewards(r): + """take 1D float array of rewards and compute discounted reward""" + discounted_r = np.zeros_like(r) + running_add = 0 + for t in reversed(xrange(0, r.size)): + if r[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!) + running_add = running_add * gamma + r[t] + discounted_r[t] = running_add + return discounted_r + +def policy_forward(x, model): + h = np.dot(model["W1"], x) + h[h < 0] = 0 # ReLU nonlinearity + logp = np.dot(model["W2"], h) + p = sigmoid(logp) + return p, h # return probability of taking action 2, and hidden state + +def policy_backward(eph, epx, epdlogp, model): + """backward pass. (eph is array of intermediate hidden states)""" + dW2 = np.dot(eph.T, epdlogp).ravel() + dh = np.outer(epdlogp, model["W2"]) + dh[eph <= 0] = 0 # backpro prelu + dW1 = np.dot(dh.T, epx) + return {"W1": dW1, "W2": dW2} + +@ray.remote([dict], [tuple]) +def compgrad(model): + observation = env.reset() + prev_x = None # used in computing the difference frame + xs, hs, dlogps, drs = [], [], [], [] + reward_sum = 0 + done = False + while not done: + cur_x = preprocess(observation) + x = cur_x - prev_x if prev_x is not None else np.zeros(D) + prev_x = cur_x + + aprob, h = policy_forward(x,model) + action = 2 if np.random.uniform() < aprob else 3 # roll the dice! + + xs.append(x) # observation + hs.append(h) # hidden state + y = 1 if action == 2 else 0 # a "fake label" + dlogps.append(y - aprob) # grad that encourages the action that was taken to be taken (see http://cs231n.github.io/neural-networks-2/#losses if confused) + + observation, reward, done, info = env.step(action) + reward_sum += reward + + drs.append(reward) # record reward (has to be done after we call step() to get reward for previous action) + + epx = np.vstack(xs) + eph = np.vstack(hs) + epdlogp = np.vstack(dlogps) + epr = np.vstack(drs) + xs, hs, dlogps, drs = [], [], [], [] # reset array memory + + # compute the discounted reward backwards through time + discounted_epr = discount_rewards(epr) + # standardize the rewards to be unit normal (helps control the gradient estimator variance) + discounted_epr -= np.mean(discounted_epr) + discounted_epr /= np.std(discounted_epr) + epdlogp *= discounted_epr # modulate the gradient with advantage (PG magic happens right here.) + return (policy_backward(eph, epx, epdlogp, model), reward_sum) diff --git a/examples/rl_pong/visualizer.py b/examples/rl_pong/visualizer.py new file mode 100644 index 000000000..83eeef1b9 --- /dev/null +++ b/examples/rl_pong/visualizer.py @@ -0,0 +1,17 @@ +import matplotlib +from matplotlib import pyplot as plt +import pickle +import numpy as np + +matplotlib.use("Agg") + +logs = pickle.load(open("logs_rl_original.p", "rb")) +times_og = range(1, (len(logs) + 1)) +reward_og = map(lambda x:x[2], logs) +plt.plot(times_og, reward_og) +plt.savefig("original_batchnum_graph") +logs = pickle.load(open("logs_rl_ray.p", "rb")) +times_ray = range(1, (len(logs) + 1)) +reward_ray = map(lambda x: x[2], logs) +plt.plot(times_ray, reward_ray) +plt.savefig("rl_pong_graph") diff --git a/examples/rl_pong/worker.py b/examples/rl_pong/worker.py new file mode 100644 index 000000000..9c3b26510 --- /dev/null +++ b/examples/rl_pong/worker.py @@ -0,0 +1,17 @@ +import argparse +import ray +import ray.worker as worker +import gym + +import functions + +parser = argparse.ArgumentParser(description="Parse addresses for the worker to connect to.") +parser.add_argument("--scheduler-address", default="127.0.0.1:10001", type=str, help="the scheduler's address") +parser.add_argument("--objstore-address", default="127.0.0.1:20001", type=str, help="the objstore's address") +parser.add_argument("--worker-address", default="127.0.0.1:40001", type=str, help="the worker's address") + +if __name__ == '__main__': + args = parser.parse_args() + ray.connect(args.scheduler_address, args.objstore_address, args.worker_address) + ray.register_module(functions) + worker.main_loop()