[Notes] Storm: Twitter's scalable realtime computation system by @nathanmarz
2011-09-19At Strange Loop 2011 this morning, Nathan Marz (@nathanmarz) made a wonderful announcement this morning: the open-sourcing of Storm, the realtime computation system that he developed at BackType (acquired by Twitter).
Here, I’m including the notes that I typed up during his presentation. Apologies in advance for any typos or errors (I removed anything that I was especially unsure of, just to be safe)–I had to type quickly to keep up.
Most importantly, check out the four repositories he open-sourced in the middle of the talk:
At last, here are the notes:
History: Before Storm
- Queues and Workers
- Example
- Firehose ~ Queues ~ Workers (Hadoop) ~ Queues ~ Workers (Cassandra)
- Message Locality
- Any URL update must go through the same worker
- Why?
- No transactions in Cassandra (+ no atomic increments at the time)
- More effective batching of updates
- Implementing
- Have a queue for each consuming worker
- Choose queue for URL using consistent hashing
- Take hash of URL mod Queue = Index of Queue
- Same URL goes to same queue
- Evenly distribute URLs to queues
- Problems
- Scaling: Adding a Worker
- Deploy a new worker and new queue for that worker
- Must redeploy other workers: use consistent hashing, must let them know that there’s a new queue
- Poor fault-tolerance
- Coding is tedious
Storm
- What we want
- Guaranteed data processing
- Easily horizontally scalable
- Fault-tolerance
- No intermediate message brokers
- Conflict with desire for guaranteed data processing
- If worker fails, can always ask for it from the data broker again.
- Problem: complex and slow. Messages have to go through third party and persist to disk.
- Higher level abstraction than message passing
- Just works
- Use Cases
- Stream Processing
- Distributed RPC
- Parallelize an intense function; invoke on the fly and compute quickly
- Continuous computation
- Storm Cluster
- Three Classes of Nodes
- Nimbus: Master Node (similar to Hadoop JobTracker)
- Submit topologies and code for execution
- Launches workers
- Monitors computations
- Restart things that fail
- ZooKeeper: cluster coordination
- Worker nodes: actually run computations
- Nodes pass messages to each other
- Run daemon called Supervisor, communicates with Nimbus through Zookeeper
- Concepts
- Streams
- Unbounded sequence of tuples
- All tuples must have same schema (same number and same types)
- Supports primitive types (serialized and deserialized)
- Also support for custom types
- Spouts
- Source of streams
- Examples
- Kstrel spout: read from kestrel queue
- Read from twitter stream
- Bolts
- Processes input streams
- Can run
- Functions
- Filters
- Aggregations
- Joins
- Talk to databases
- Topologies
- Network of spouts and bolts
- Each bolt subscribes to any number of output streams
- Tasks
- Spouts and bolts execute as many tasks across the cluster
- Lots of tasks across many machines, all passing messages to one another
- Stream grouping
- When a tuple is emitted, to which task does it go?
- Describes how to partition that stream
- Shuffle grouping : picks a random task
- Fields grouping : consistent hashing on a subset of the tuple fields
- Similar to queues and workers, but higher level of abstraction
- All grouping : send to all tasks
- Global grouping : pick task with lowest id
- There are more, but not going into here
- Streaming word count
TopologyBuilder
is used to construct topologies in Java
- See slides for example implementation
- Split sentences into words with parallelism of 8 tasks
- Create a word count stream
- Can easily run some other script, such as Python to evaluate
- Can run topology in local mode for development test
Traditional data processing
- Traditional Method (pre-Storm)
- All of your data ~ precompute indexes to run query quickly
- Precompute happens with intense processing: Hadoop, databases, etc.
- Example: how many tweets on a URL between 7am on Sun. and 10pm on Mon.
- Indexed by hour; sum over those few hours when querying
- Storm: intense processing on both sides. Distributed RPC flow on Storm.
Distributed RPC Server
(easy to implement, Storm comes with one)
- Coordinates distributed RPC dataflow
- Gives data to spout
- Topology parallelizes computation, gives to bolt
- Bolt gives to distributed RPC
- Client gets result
- Example
- Compute reach of URL
- Get URL, compute all tweeters. Find their followers.
- Get set of distrinct follower.
- Count ~ Reach
- Extremely intense computation: can be millions of people
- Storm
- Spout emits (requestid, tweeterid)
GetTweeters
goes to GetFollowers
; emits (requestid, followerid)
PartialDistinct
CountAggregator
does global grouping, receives one tuple from each, and sums
- All done completely in parallel
- What might takes hours now takes two seconds
- Going down to 200ms. See “State spout” below
- Guaranteeing message processing
- Uses ZeroMQ
- “Tuple Tree”
- A spout tuple is fully processed when all tuples in the tree have been completed
- If a tuple tree is not completed within a specified timeout, it is considered failed and replayed from the spout . Reliability API: must do a little bit of work
- Emit a word: anchor
- Anchoring creates a new edge in the tuple tree
- Collector acks the tuple; marks the single node as complete
- Storm does the rest
- timeouts when necessary
- tracking what’s processed
- seeing when it’s complete
- Storm tracks tuple trees for you in an extremely efficient way
- See the wiki on GitHub for explanation of this algorithm
- Storm UI: see slides
- Storm on EC2: it’s super easy. Use
storm-deploy
The Future
- State spout (almost done)
- Synchronize a large amount of frequently changing state into a topology
- Example 1
- Optimize reach topology by eliminating the database calls.
- Each GetFollowers task keeps a synchronous cache of a subst of the social graph
- Works because GetFollowers repartitions the social graph the same way it partitions GetTweeter’s stream
- Storm on Mesos
- Mesos is cluster/resource framework
- Allow more fine-grained resource usage
- “Swapping”
- If you currently want to update a Storm topology, must kill it and submit a new one. Takes a few minutes.
- This is bad for a realtime system!
- Lets you safely swap one topology ofr a new one.
- Atomic swaps.
- Minimize downtime
- Prevent message duplication
- Auto-scaling
- Storm can automatically scale topology to data
- No work on your end; increase as message throughput increases
- Also handles bursts of traffic. Temporary provisioning of more resources, then scale itself back down.
- Higher level abstractions
- Work can be done still to improve this
- DSLs in variety of language, etc.