Reverbrain wiki

Site Tools


elliptics:streaming-tutorial

Streaming from elliptics

It is frequently needed to stream data directly from storage nodes to clients in p2p fashion and not to proxy data using special services. The most widely used example of this feature is music and video service - server can not really proxy replies - it takes too much ram and network bandwidth as well as requires some kind of flow control, it is much better to put this task to storage nodes - they have more bandwidth and data can be streamed from disk and not from RAM. In this case proxy only authenticate user and redirect him to the storage node with short-living 'ticket'.

Following tutorial shows you how to build your own music streaming service. All packages (except nginx) needed for this tutorial can be found in our repository. You can check server setup tutorial to find out how to use repository and what packages you might need.

Nginx streaming from eblob

Eblob stores your data continuously on disk and never splits them to several parts. This makes possible to use Nginx to stream data from Elliptics' blobs.

Installation

Get eblob module for Nginx:

$ git clone https://github.com/toshic/nginx_patches

Get any sources of any compatible Nginx version (1.2.x-1.7.x):

$ wget 'http://nginx.org/download/nginx-1.7.0.tar.gz'
$ tar -xzvf nginx-1.7.0.tar.gz
$ cd nginx-1.7.0

Build Nginx with eblob module:

$ ./configure --prefix=/opt/nginx --add-module=../nginx_patches/nginx-eblob
$ make
$ # install it

Configuration

Assume that blobs are stored somewhere inside /var/srw directory. To use eblob module it's necessary to add eblob; statement to location section.

nginx.conf
worker_processes  8;
 
events {
    worker_connections  1024;
}
 
http {
    sendfile on;
 
    server {
        listen       8081;
        server_name  localhost;
 
        location / {
            root   /var/srw;
            eblob;
        }
    }
}

Using

After the Nginx is configured it starts processing following requests:

# Read first 135 bytes from /var/srw/some/path
$ curl http://localhost/some/path:0:135
# Read 135 bytes with 100 bytes offset from /var/srw/other/path
$ curl http://localhost/other/path:100:135

This option can be used with Rift via /download-info/ handler. Rift knows where Elliptics physically stores the data so it can redirect client to the particular server where data is stored and Nginx+eblob module is installed. Rift also signs the request so user can't access data past what it requested.

In the nearest future this signature mechanism will also be implemented for Nginx module.

Reverbrain.Music example

Let's build Reverbrain.Music service!

Elliptics configutation

First of all we should configure and start Elliptics nodes. For this example only we create two nodes, one for metadata (buckets, users and so on) and other for our music. In practice, you will create multiple elliptics nodes located all over the world for example - having multiple elliptics nodes + multiple buckets allows to spread the load and made service fully fault tolerant.

Example nodes will store all their data in /var/srw, so it can be accessed by Nginx's eblob module configured previously.

node-1.json
{
    "loggers": {
        "type": "/dev/stderr",
        "level": 4
    },
    "options": {
        "join": true,
        "flags": 4,
        "wait_timeout": 60,
        "check_timeout": 60,
        "io_thread_num": 4,
        "nonblocking_io_thread_num": 4,
        "net_thread_num": 2,
        "indexes_shard_count": 1,
        "daemon": false,
        "bg_ionice_class": 3,
        "bg_ionice_prio": 0,
        "server_net_prio": 1,
        "client_net_prio": 6,
        "caches_number": 16,
        "remote": [
            "localhost:1026:2"
        ],
        "auth_cookie": "5697524b4cfab903",
        "address": [
            "localhost:1025:2"
        ],
        "monitor_port": 49662,
        "cache": {
            "size": 268435456
        }
    },
    "backends": [
        {
            "type": "blob",
            "sync": 5,
            "blob_flags": 6,
            "iterate_thread_num": 1,
            "blob_size": "10M",
            "records_in_blob": 10000000,
            "defrag_timeout": 3600,
            "defrag_percentage": 25,
            "group": 1,
            "history": "/var/srw/server-1/history",
            "data": "/var/srw/server-1/blob/data"
        }
    ]
}
node-2.json
{
    "loggers": {
        "type": "/dev/stderr",
        "level": 2
    },
    "options": {
        "join": true,
        "flags": 4,
        "wait_timeout": 60,
        "check_timeout": 60,
        "io_thread_num": 4,
        "nonblocking_io_thread_num": 4,
        "net_thread_num": 2,
        "indexes_shard_count": 1,
        "daemon": false,
        "bg_ionice_class": 3,
        "bg_ionice_prio": 0,
        "server_net_prio": 1,
        "client_net_prio": 6,
        "caches_number": 16,
        "remote": [
            "localhost:1025:2"
        ],
        "auth_cookie": "5697524b4cfab903",
        "address": [
            "localhost:1026:2"
        ],
        "monitor_port": 12366,
        "cache": {
            "size": 268435456
        }
    },
    "backends": [
        {
            "type": "blob",
            "sync": 5,
            "blob_flags": 6,
            "iterate_thread_num": 1,
            "blob_size": "10M",
            "records_in_blob": 10000000,
            "defrag_timeout": 3600,
            "defrag_percentage": 25,
            "group": 2,
            "history": "/var/srw/server-2/history",
            "data": "/var/srw/server-2/blob/data"
        }
    ]
}

Rift configuration

Secondly we should configure our rift server. It listens 8080 port, connects to all our Elliptics nodes. Also we specify that Nginx listens 8081 port (redirect-port option). Finally we should specify that all data on storages are stored in /var/srw directory for sandbox-like reasons (path-prefix option).

rift.json
{
    "endpoints": [
        "0.0.0.0:8080"
    ],
    "backlog": 128,
    "threads": 2,
    "buffer_size": 65536,
    "logger": {
        "file": "/dev/stderr",
        "level": 2
    },
    "daemon": {
        "fork": false,
        "uid": 1000
    },
    "monitor-port": 21000,
    "application": {
        "remotes": [
            "localhost:1025:2",
            "localhost:1026:2"
        ],
        "groups": [],
        "metadata-groups": [
            1
        ],
        "bucket": {
            "timeout": 60
        },
        "read-timeout": 10,
        "write-timeout": 16,
        "redirect-port": 8081,
        "path-prefix": "/var/srw/"
    }
}

Bucket configuration

Bucket is logical metadata entity which says where our files will be stored. One can create multiple buckets and should specify different groups for different buckets. Thus writing data into bucket1 we will store data into groups 1, 2 and 3 for example, and writing into bucket2 we will store files in elliptics groups 7 and 8.

In our example we will using just single bucket which stores data into single group. This is not a good idea to make a single copy of your data, but that's just an example. We add bucket 'music' to bucket directory 'all', so later we can found all our buckets.

You can find more on what are buckets and how they can be used in Rift documentation.

To create a bucket we should prepare json description of it:

update-bucket.json
 {
    "groups": [
        2
    ],
    "acl": [
        {
            "user": "admin",
            "token": "favorite_music",
            "flags": 0
        },
        {
            "user": "*",
            "token": "very_secret_token",
            "flags": 1
        }
    ]
}

According to this description user named * (everyone) can read files from bucket without authorization (bit 1 in flags - more details can be found at rift link above). You can create even more fine-grained access using bucket ACL.

To perform authorized access to Rift we have written simple python script. It adds Authorization header to the request, which is used by Rift to check access rights.

http_auth.py
#!/usr/bin/python
 
import requests
import hmac
import hashlib
import urlparse
import urllib
import argparse
 
def generate_signature(key, method, url, headers=None):
    parsed_url = urlparse.urlparse(url)
    queries = urlparse.parse_qsl(parsed_url.query)
    queries.sort()
    text = ''
    text += method + '\n'
    text += parsed_url.path
    if len(queries) > 0:
        text += '?' + urllib.urlencode(queries)
    text += '\n'
    if headers:
        headers = map(lambda x: (x[0].lower(), x[1]), headers.iteritems())
        headers = filter(lambda x: x[0].startswith('x-ell-'), headers)
        headers.sort()
 
        for header in headers:
            text += header[0] + ':' + header[1] + '\n'
 
    return hmac.new(key, text, hashlib.sha512).hexdigest()
 
if __name__ == '__main__':
    parser = argparse.ArgumentParser(description='Send request to rift.')
    parser.add_argument('url', metavar='URL', help='Url for processing')
    parser.add_argument('--file', dest='file', action='store', default=None, help='File to send with POST request')
    parser.add_argument('--user', dest='user', action='store', default=None, help='Token owner to sign request')
    parser.add_argument('--token', dest='token', action='store', default=None, help='Secure token to sign request')
    args = parser.parse_args()
 
    headers = {}
    if args.token and args.user:
        headers['Authorization'] = 'riftv1 {0}:{1}'.format(args.user, generate_signature(args.token, 'POST' if args.file else 'GET', args.url))
    elif args.token or args.user:
		raise Exception('Both --user and --token must be specified at the same time')
 
    if not args.file:
        r = requests.get(args.url, headers=headers)
    else:
        with open(args.file) as f:
            data = f.read()
            r = requests.post(args.url, data, headers=headers)
 
    print r.status_code
    print r.content

Now we can create the bucket named music in bucket directory named all - they both are specified in the URL below:

$ ./http_auth.py http://localhost:8080/update-bucket/all/music --file update-bucket.json 
200
...

Wow, bucket has been created!

Now it's time to upload our favorite song to the storage using admin's credentials into bucket music:

$ ./http_auth.py http://localhost:8080/upload/music/example.mp3?user=admin --file example.mp3 --token favorite_music
200
...

Great! Song has been uploaded.

Streaming

Now we are ready for the most interesting part of this tutorial: music streaming by Nginx.

Check that /redirect/ handler redirects us directly to the Nginx:

$ curl http://localhost:8080/redirect/music/example.mp3 -v
* Adding handle: conn: 0x148c950
* Adding handle: send: 0
* Adding handle: recv: 0
* Curl_addHandleToPipeline: length: 1
* - Conn 0 (0x148c950) send_pipe: 1, recv_pipe: 0
* About to connect() to localhost port 8080 (#0)
*   Trying 127.0.0.1...
* Connected to localhost (127.0.0.1) port 8080 (#0)
> GET /redirect/music/example.mp3?user=user HTTP/1.1
> User-Agent: curl/7.32.0
> Host: localhost:8080
> Accept: */*
> 
< HTTP/1.1 302 Moved Temporarily
< Location: http://127.0.0.1:8081/server-2/blob/data-0.0:144:4245399?time=1399554119&signature=106835208cfc70dec9cbee71301c416c05b647804289414ac78a3d87e8387b9bf8297392a4fee57b7bd6cfe12510ad635b3e225a8b910e769b0b5746fea90197
< Content-Length: 0
< Connection: Keep-Alive
< 
* Connection #0 to host localhost left intact

Looks like it works. So run mplayer to check if it works:

$ mplayer -q http://localhost:8080/redirect/music/example.mp3
Connecting to server localhost[127.0.0.1]: 8080...
Connecting to server 127.0.0.1[127.0.0.1]: 8081...

Cache size set to 320 KBytes
Cache fill:  0.00% (0 bytes)   

...
Audio only file format detected.
==========================================================================
Opening audio decoder: [mpg123] MPEG 1.0/2.0/2.5 layers I, II, III
AUDIO: 44100 Hz, 2 ch, s16le, 192.0 kbit/13.61% (ratio: 24000->176400)
Selected audio codec: [mpg123] afm: mpg123 (MPEG 1.0/2.0/2.5 layers I, II, III)
==========================================================================
AO: [pulse] 44100Hz 2ch s16le (2 bytes per sample)
Video: no video
Starting playback...
A:  31.5 (31.4) of 176.0 (02:56.0)  0.5% 46%

Great! Music is being played from blobs streamed by Nginx!

Basically, Nginx+eblob+elliptics stream your data directly to user in p2p fashion. You can install similar configuration to multiple servers in different physical location and do not lose your data even after moving your servers around. Having many servers around the world allows you to stream data from the closest location and always select servers which are accessible at the moment. The latter is implemented in elliptics+rift automatically.

Signature

Let's have a look at the url, Rift has redirected us to: http://127.0.0.1:8081/server-2/blob/data-0.0:144:4245399?time=1399554119&signature=106835208cfc70dec9cbee71301c416c05b647804289414ac78a3d87e8387b9bf8297392a4fee57b7bd6cfe12510ad635b3e225a8b910e769b0b5746fea90197

It contains several parts: http://127.0.0.1:8081/server-2/blob/data-0.0:144:4245399, time and signature.

First part was already described in one of the previous sections - it says that requested data is located in server-2/blob/data-0.0 file at 144 offset, file size is 4245399 bytes.

time option is UNIX time when url was generated, it is used in signature to prevent the same URL from being used forever.

signature makes possible to check whether you have access to the requested part of the blob.

Algorithm to create the signature is following:

  1. Add 'time' query value to it with current unix timestamp, i.e. http://127.0.0.1:8081/server-2/blob/data-0.0:144:4245399?time=1399554119
  2. Replace url's scheme by 'scheme', i.e. scheme://127.0.0.1:8081/server-2/blob/data-0.0:144:4245399?time=1399554119
  3. Sign the resulted string by secret token using hmac(sha512), at our example token is 'very_secret_token', resulting signature is '106835208cfc70dec9cbee71301c416c05b647804289414ac78a3d87e8387b9bf8297392a4fee57b7bd6cfe12510ad635b3e225a8b910e769b0b5746fea90197'
elliptics/streaming-tutorial.txt ยท Last modified: 2014/05/28 21:48 by elessar