[Notes] Storm: Twitter's scalable realtime computation system by @nathanmarz

2011-09-19

At Strange Loop 2011 this morning, Nathan Marz (@nathanmarz) made a wonderful announcement this morning: the open-sourcing of Storm, the realtime computation system that he developed at BackType (acquired by Twitter).

Here, I’m including the notes that I typed up during his presentation. Apologies in advance for any typos or errors (I removed anything that I was especially unsure of, just to be safe)–I had to type quickly to keep up.

Most importantly, check out the four repositories he open-sourced in the middle of the talk:

At last, here are the notes:

History: Before Storm

  • Queues and Workers
    • Example
      • Firehose ~ Queues ~ Workers (Hadoop) ~ Queues ~ Workers (Cassandra)
    • Message Locality
      • Any URL update must go through the same worker
      • Why?
        • No transactions in Cassandra (+ no atomic increments at the time)
        • More effective batching of updates
      • Implementing
        • Have a queue for each consuming worker
        • Choose queue for URL using consistent hashing
        • Take hash of URL mod Queue = Index of Queue
          • Same URL goes to same queue
          • Evenly distribute URLs to queues
      • Problems
        • Scaling: Adding a Worker
          • Deploy a new worker and new queue for that worker
          • Must redeploy other workers: use consistent hashing, must let them know that there’s a new queue
        • Poor fault-tolerance
        • Coding is tedious

Storm

  • What we want
    • Guaranteed data processing
    • Easily horizontally scalable
    • Fault-tolerance
    • No intermediate message brokers
      • Conflict with desire for guaranteed data processing
      • If worker fails, can always ask for it from the data broker again.
      • Problem: complex and slow. Messages have to go through third party and persist to disk.
    • Higher level abstraction than message passing
    • Just works
  • Use Cases
    • Stream Processing
    • Distributed RPC
      • Parallelize an intense function; invoke on the fly and compute quickly
    • Continuous computation
  • Storm Cluster
    • Three Classes of Nodes
      • Nimbus: Master Node (similar to Hadoop JobTracker)
        • Submit topologies and code for execution
        • Launches workers
        • Monitors computations
        • Restart things that fail
      • ZooKeeper: cluster coordination
      • Worker nodes: actually run computations
        • Nodes pass messages to each other
        • Run daemon called Supervisor, communicates with Nimbus through Zookeeper
  • Concepts
    • Streams
      • Unbounded sequence of tuples
      • All tuples must have same schema (same number and same types)
        • Supports primitive types (serialized and deserialized)
        • Also support for custom types
    • Spouts
      • Source of streams
      • Examples
        • Kstrel spout: read from kestrel queue
        • Read from twitter stream
    • Bolts
      • Processes input streams
      • Can run
        • Functions
        • Filters
        • Aggregations
        • Joins
        • Talk to databases
    • Topologies
      • Network of spouts and bolts
      • Each bolt subscribes to any number of output streams
  • Tasks
    • Spouts and bolts execute as many tasks across the cluster
    • Lots of tasks across many machines, all passing messages to one another
  • Stream grouping
    • When a tuple is emitted, to which task does it go?
    • Describes how to partition that stream
    • Shuffle grouping : picks a random task
    • Fields grouping : consistent hashing on a subset of the tuple fields
      • Similar to queues and workers, but higher level of abstraction
    • All grouping : send to all tasks
      • Use with care
    • Global grouping : pick task with lowest id
    • There are more, but not going into here
  • Streaming word count
    • TopologyBuilder is used to construct topologies in Java
      • See slides for example implementation
      • Split sentences into words with parallelism of 8 tasks
    • Create a word count stream
    • Can easily run some other script, such as Python to evaluate
    • Can run topology in local mode for development test

Traditional data processing

  • Traditional Method (pre-Storm)
    • All of your data ~ precompute indexes to run query quickly
      • Precompute happens with intense processing: Hadoop, databases, etc.
      • Example: how many tweets on a URL between 7am on Sun. and 10pm on Mon.
        • Indexed by hour; sum over those few hours when querying
  • Storm: intense processing on both sides. Distributed RPC flow on Storm.
    • Distributed RPC Server (easy to implement, Storm comes with one)
      • Coordinates distributed RPC dataflow
      • Gives data to spout
      • Topology parallelizes computation, gives to bolt
      • Bolt gives to distributed RPC
      • Client gets result
    • Example
      - Compute reach of URL
        - Get URL, compute all tweeters. Find their followers.
        - Get set of distrinct follower.
        - Count ~ Reach
        - _Extremely_ intense computation: can be millions of people
      - Storm
        - Spout emits (requestid, tweeterid)
        - `GetTweeters` goes to `GetFollowers`; emits `(requestid,
      
      followerid)`
        - `PartialDistinct`
        - `CountAggregator` does global grouping, receives one tuple from each, and sums
        - All done completely in parallel
      - What might takes hours now takes two seconds
        - Going down to 200ms. See “State spout” below
      
  • Guaranteeing message processing
    • Uses ZeroMQ
    • “Tuple Tree”
    • A spout tuple is fully processed when all tuples in the tree have been completed
    • If a tuple tree is not completed within a specified timeout, it is considered failed and replayed from the spout . Reliability API: must do a little bit of work
      • Emit a word: anchor
        • Anchoring creates a new edge in the tuple tree
      • Collector acks the tuple; marks the single node as complete
      • Storm does the rest
        • timeouts when necessary
        • tracking what’s processed
        • seeing when it’s complete
      • Storm tracks tuple trees for you in an extremely efficient way
        • See the wiki on GitHub for explanation of this algorithm
  • Storm UI: see slides
  • Storm on EC2: it’s super easy. Use storm-deploy

The Future

  • State spout (almost done)
    • Synchronize a large amount of frequently changing state into a topology
    • Example 1
      • Optimize reach topology by eliminating the database calls.
      • Each GetFollowers task keeps a synchronous cache of a subst of the social graph
        • Works because GetFollowers repartitions the social graph the same way it partitions GetTweeter’s stream
  • Storm on Mesos
    • Mesos is cluster/resource framework
    • Allow more fine-grained resource usage
  • “Swapping”
    • If you currently want to update a Storm topology, must kill it and submit a new one. Takes a few minutes.
      • This is bad for a realtime system!
    • Lets you safely swap one topology ofr a new one.
      • Atomic swaps.
      • Minimize downtime
      • Prevent message duplication
  • Auto-scaling
    • Storm can automatically scale topology to data
    • No work on your end; increase as message throughput increases
    • Also handles bursts of traffic. Temporary provisioning of more resources, then scale itself back down.
  • Higher level abstractions
    • Work can be done still to improve this
    • DSLs in variety of language, etc.
← all posts

michael schade

I built out engineering and operations teams at Stripe as employee #20 from 2012 to 2019; now I'm working on something new. I like helping people, photography, reading, gym, traveling, and learning new things. Say hi! 🏳️‍🌈