Archief voor categorie couchdb

Getting to know CouchDB – 2/x

Last time around, we were able to store the OSGi log messages into a CouchDB. Now I want to tap into the _changes API that CouchDB offers. The options are simple, connect to the ‘_changes’ document in a CouchDB database and you will be presented with a log of all changes that have occurred on the database.

http://localhost:5984/db/_changes

{"results":[
{"seq":3,"id":"_design/app","changes":[{"rev":"2-0dc1002d841221e8d8d7f325b87ffd67"}]},
{"seq":196,"id":"994cf6817a74882cecda6425e8001a96","changes":[{"rev":"193-083ec5c1e1359789eb2cf09c46097fee"}]}
,
"last_seq":196}

For each document in the database only the last change is presented. This could result in quite a lot of information. It is therefore also possible to fetch changes while applying a server-side filter. For this to work, you first need to define a filter in a design document. Mine looks like this:

function(doc, req) {
    if (doc.type == "EVENT") { return true; }
    return false;
}

and is stored (using couchapp) in a design document called ‘app’. The name of the filter is ‘events’. The filters’ meaning should obvious: pass through all documents for which the function returns ‘true’. The proper URL for accessing the changes will now be:

http://localhost:5984/db/_changes?filter=app/events

{"results":[
{"seq":196,"id":"994cf6817a74882cecda6425e8001a96","changes":[{"rev":"193-083ec5c1e1359789eb2cf09c46097fee"}]}
],
"last_seq":196}

The “last_seq” part can be used for bookkeeping: If you plan to make  multiple calls to the _changes feed (say every hour) then it is nice to be able to skip the ones you already saw during a previous call. So, the second time you call the _changes feed, you can skip all previous changes by employing the following URL:

http://localhost:5984/db/_changes?filter=app/events&since=196

{"results":[

],
"last_seq":196}

Obviously, no update has occurred since we accessed the _changes feed for the first time. If, however we now add a new document to the database (with doc.type == “EVENT”) and call the _changes feed again:

http://localhost:5984/db/_changes?filter=app/events&since=196

{"results":[
{"seq":197,"id":"f0a4a559fac65ab3cca87baf1c014fa7","changes":[{"rev":"1-c8dfffec4e08ca8db2b95f30613e213d"}]}
],
"last_seq":197}

Also, a call to “http://localhost:5984/db” will result in a JSON object, where “update_seq” provides the current update sequence. This comes in handy if you’re really not interested in previous changes:

http://localhost:5984/db

{"db_name":"db","doc_count":4,"doc_del_count":0,"update_seq":196,
"purge_seq":0,"compact_running":false,"disk_size":1871961,
"instance_start_time":"1282049630257719","disk_format_version":5,
"committed_update_seq":196}

This is all great functionality if your time granularity is in the order of hours or days. But hardly optimal if you want to be notified immediately of changes to the database. Fortunately, by supplying “feed=continuous” to the URL, CouchDB will keep your socket connection open and supply the changes as soon as they become available. Each change is represented by a well-formatted JSON line. This breaks from the previous examples, where the complete response consisted of well-formatted JSON, so beware!

http://localhost:5984/db/_changes?filter=app/events&since=196&feed=continuous

{"seq":197,"id":"f0a4a559fac65ab3cca87baf1c014fa7","changes":[{"rev":"1-c8dfffec4e08ca8db2b95f30613e213d"}]}
...
...
{"last_seq":197}

Eventually, if no changes have been sent for 60 secs, the socket connection closes. Before it does so, it spits out the last_seq, again for bookkeeping purposes.

I was wondering if it would be possible to create a simple iPhone/iPad application that is able to report specific events on a MapView using the _changes feed from CouchDB. All one then has to do as iPhone/ iPad application is connect to the continuous feed, parse the JSON data and display the events on your MKMapView. Server-side, all one has to do is insert/ update documents in the database. How easy!

So I started off by connecting to the _changes feed using an asynchronous NSURLConnection, expecting that its delegate would receive connection:didReceiveData: calls on each change. Unfortunately, this did not work: The NSURLConnection employs buffering internally that cannot be circumvented. Thus only after about 5 or 6 updates the delegate received some data. This is definitely not good enough. Some searching pointed me to the CFSocket/NSStream class. This class fortunately employs some more basic (read: unbuffered) socket handling, allowing me to receive data from the socket as soon as it becomes available!

    //open the socket
    CFReadStreamRef readStream;
    CFWriteStreamRef writeStream;
    CFStreamCreatePairWithSocketToHost(NULL, (CFStringRef)[theURL host], [[theURL port] intValue], &readStream, &writeStream);

    inputStream = (NSInputStream *)readStream;
    outputStream = (NSOutputStream *)writeStream;

    [inputStream setDelegate: self];
    [outputStream setDelegate :self];
    [inputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
    [outputStream scheduleInRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
    [inputStream open];
    [outputStream open];    

The containing class is set as delegate for both the NSInputStream and NSOutputStream. This causes the following method being called, based on events that occur on the streams:

/* stream eventing */
- (void)stream:(NSStream *)stream handleEvent:(NSStreamEvent)eventCode {

    switch(eventCode) {
        case NSStreamEventOpenCompleted:
            [self connection: nil didReceiveResponse: nil];
			break;

		case NSStreamEventErrorOccurred:
            [self connection: nil didFailWithError: [stream streamError]];
			break;

		case NSStreamEventEndEncountered:
            [stream close];
            [stream removeFromRunLoop:[NSRunLoop currentRunLoop] forMode:NSDefaultRunLoopMode];
            [stream release];
            inputStream = nil;
            outputStream = nil;
            [self connectionDidFinishLoading: nil];
			break;            

        case NSStreamEventHasSpaceAvailable:
        {
            if (stream == outputStream) {
                [self connection: nil willSendRequest: [NSURLRequest requestWithURL: theURL] redirectResponse: nil];

                NSString *str = [NSString stringWithFormat: @"GET %@?%@ HTTP/1.0\r\n\r\n", [theURL path], [theURL query]];
                const uint8_t *rawstring = (const uint8_t *)[str UTF8String];
                [outputStream write:rawstring maxLength: [str length]];
                [outputStream close];
            }
        }
            break;
		case NSStreamEventHasBytesAvailable:
        {
			if (stream == inputStream)
			{
				uint8_t buffer[1024];
				int len;
				while ([inputStream hasBytesAvailable])
				{
					len = [inputStream read: buffer maxLength: sizeof(buffer)];
					if (len > 0)
					{
						NSData *theData = [[NSData alloc] initWithBytes: buffer length:len];
                        [self connection: nil didReceiveData: theData];
					}
				}
			}
        }
            break;
    }
}

As you can see, this method kind of delegates the occurring events to calls to the NSURLConnectionDelegate (which the containing class also implements, due to my initial effort to read the changes feed using a normal NSURLConnection).

Then, the connection:didReceiveData method handles the actual changes:

- (void)connection:(NSURLConnection *)connection didReceiveData:(NSData *)data {
    NSString *str = [[NSString alloc] initWithData:data encoding: NSUTF8StringEncoding];

    NSArray *ary = [str componentsSeparatedByString:@"\n"];
    for (NSString *line in ary) {
        if ([line length] > 0 && [line characterAtIndex: 0] == '{') {
            id json = [line JSONValue];

            NSDictionary *dict = (NSDictionary*)json;
            NSObject *seq = [dict objectForKey: @"seq"];

            if (seq && delegate) {
                [delegate changeReceived: dict];
            }
            NSObject *last_seq = [dict objectForKey: @"last_seq"];
            if (last_seq && delegate) {
                int last = [((NSNumber*)last_seq) intValue];
                [delegate lastSequence: last];
            }
        }
    }

}

This method converts the JSON data to a NSDictionary and forwards these changes to a (self-defined) @protocol called ChangesDelegate:

@protocol ChangesDelegate

@optional

-(void) changeReceived:(NSDictionary*) dict;
-(void) lastSequence:(int) last;
-(void) changesComplete:(NSError*)error;

@end

The implementation that performs the actual calls to CouchDB can be created using:

    m_changes = [[Changes alloc] initWithHost: @"localhost" database: @"db" andFilter: @"app/events"];
    m_changes.delegate = self;

    m_lastSeq = [m_changes last_seq];

Since the calling class also implements the ChangesDelegate @protocol, it will receive the changes events. I am currently using it to display MKAnnotations onto a MKMapView and it works like a charm!

Thank you CouchDB for making life easy!

p.s. I’ve made the Changes class available through github: changesfeed_xcode. Enjoy!

, , ,

Nog geen reacties

Getting to know CouchDB 1/x

Some time ago, I started looking at CouchDB. Getting to know technologies is IMO always best performed by thinking up some kind of project. So first, I started thinking of what to do with it. Now that the 1.0.0 has been released I thought it would be good to (finally) blog about my findings so far.

The project I came up with is the following: A simple (OSGi) log listener that stores its log messages into a CouchDB database.

At Luminis we use OSGi a lot. It is a highly modular framework (written in Java) that allows you to combine several modules (called ‘bundles’) that each expose or make use of functionality provided by other bundles. Careful combination of several bundles will result (almost like ‘emerging behavior’) in a complete application that suits your needs. The functionality provided by a particular bundle is best exposed through interfaces, enabling you to switch implementations without breaking the contract with other bundles that use the exposed functionality.

One of the compendium services that are available is the LogService. It allows any service to log the things it considers of any importance. It is then up to the LogService implementation where these log statements end up. Logging is usually sent to standard out or a file, which is fine in most cases. Sometimes however, logging onto the device itself and retrieving the log file for inspection is not an easy task. Take for example an Android device. Thanks to Aaron Miller’s effort, we are now able to run CouchDB on Android. Android apps (EZDroid apps) will now be able to store their logging data in this local instance. This data can then be easily replicated to another instance for inspection.

Also,  for limited devices (where there’s e.g. limited storage available) logging to a file is simply not an option.

Therefore I thought of the following scenarios where

  • CouchDB is installed locally on the device/ server. Access to CouchDB is then guaranteed and no logging is missed. A drawback might be that the OSGi application would require a local installation of CouchDB (for those who consider that a drawback!).
  • CouchDB is installed on a remote instance. Access to the CouchDB instance might be interrupted due to network instability. Then, some log messages might get lost. Since most applications require a working internet connection,  I think we could live with this.

The target of the project would be to store a JSON representation of the actually logged messages into CouchDB. This can easily be accomplished by using a LogListener.

In both cases, the OSGi LogListener that ‘lives’ inside the OSGi application, receives all LogEntries, that contain the messages that are being sent to the LogService (and some meta-data). All it then has to do is convert it to JSON and create a new document in some database at the CouchDB server. If one ever wanted to inspect the log messages, a single call to the CouchDB server would initiate a replication of the database to your local CouchDB instance. Then, you could peruse the log messages offline at your leisure.

Installation and the basic usage of CouchDB is not covered here, there are excellent descriptions already available on the WikiHalorgium’s GitHub and the O’Reilly Free Book.

To test the setup described above, I created such a LogListener implementation. I use json-simple to convert the LogEntry into JSON and end up with a JSON Object such as the following:

{
'message': 'This is a test message',
'time': 1269783246909,
'level': 'LOG_DEBUG',
'serviceReference': {},
'bundle': {
   'id': 5,
    'lastModified': 1269783246510,
    'location': 'file:bundle/net.luminis.log.couchdb-1.0.0.jar',
    'symbolicName': 'net.luminis.log.couchdb'
    }
}

A unique serverId/ instanceId could also be added to be able to distinguish between server instances if you decide to send the logging to a central server.

This, I POST to the configured server. By not submitting an ‘_id’ in the JSON string, CouchDB will make one up for me. The HTTP (1.0) POST itself is done (thus keeping it lightweight) opening a raw socket to the couchdb server:

POST /db HTTP/1.0
Content-Length: xxx
Content-Type: application/json

{ ... the data here ... }

The response is also a valid JSON response which can be inspected for success (along with the HTTP response code, of course).

In my current implementation, I can choose between two modes; either each message is sent to the couchdb instance on at a time, or I send all messages in bulk mode every x seconds. The latter is probably the best for remote couchdb instances.

Nog geen reacties