import reactivex as rx
import reactivex.operators as op
import re 
from reactivex import from_iterable
def words_from_line(line):
    return from_iterable(re.split(r'\W+', line))

def word_counter(filename):
    f = open(filename)
    return from_iterable(f) \
        .pipe(
            op.flat_map(words_from_line),
            op.filter(lambda word: word != ''),
            op.group_by(lambda word: word),
            op.flat_map(lambda group: group.pipe(
                op.count(),
                op.map(lambda ct: (group.key, ct))
            )),
            op.to_dict(lambda x: x[0], lambda x: x[1])
        )

word_counter("posts/reactive-python/news.txt").subscribe(lambda x: print(x))
{'Giant': 1, 'waves': 3, 'damage': 6, 'S': 1, 'Asia': 6, 'economy': 1, 'Governments': 1, 'aid': 9, 'agencies': 3, 'insurers': 1, 'and': 22, 'travel': 4, 'firms': 1, 'are': 9, 'among': 1, 'those': 3, 'counting': 1, 'the': 69, 'cost': 3, 'of': 32, 'massive': 1, 'earthquake': 1, 'that': 13, 'hammered': 1, 'southern': 3, 'The': 9, 'worst': 3, 'hit': 3, 'areas': 3, 'Sri': 4, 'Lanka': 4, 'India': 2, 'Indonesia': 2, 'Thailand': 3, 'with': 5, 'at': 3, 'least': 1, '23': 2, '000': 2, 'people': 4, 'killed': 2, 'Early': 1, 'estimates': 1, 'from': 5, 'World': 4, 'Bank': 3, 'put': 1, 'amount': 2, 'needed': 4, 'about': 5, '5bn': 1, '2': 2, '6bn': 1, 'similar': 1, 'to': 24, 'cash': 2, 'offered': 1, 'Central': 1, 'America': 1, 'after': 2, 'Hurricane': 1, 'Mitch': 2, '10': 2, 'caused': 1, '10bn': 1, 'in': 15, '1998': 1, 'spokesman': 1, 'Damien': 1, 'Milverton': 1, 'told': 2, 'Wall': 2, 'Street': 2, 'Journal': 2, 'he': 2, 'expected': 1, 'an': 1, 'package': 1, 'financing': 1, 'debt': 1, 'relief': 2, 'Tourism': 4, 'is': 15, 'a': 7, 'vital': 3, 'part': 2, 'economies': 2, 'stricken': 2, 'countries': 3, 'providing': 1, 'jobs': 2, 'for': 6, '19': 1, 'million': 1, 'south': 1, 'east': 1, 'Asian': 2, 'region': 5, 'according': 2, 'Travel': 1, 'Council': 1, 'WTTC': 1, 'In': 5, 'Maldives': 3, 'islands': 1, 'Indian': 1, 'ocean': 1, 'two': 2, 'thirds': 1, 'all': 1, 'depend': 1, 'on': 3, 'tourism': 2, 'But': 1, 'covers': 1, 'fishing': 1, 'farming': 1, 'businesses': 1, 'too': 3, 'hundreds': 1, 'thousands': 1, 'buildings': 1, 'small': 1, 'boats': 1, 'destroyed': 2, 'by': 3, 'International': 3, 'have': 5, 'pledged': 2, 'their': 4, 'support': 3, 'most': 1, 'say': 1, 'it': 8, 'impossible': 1, 'gauge': 1, 'extent': 1, 'yet': 1, 'Monetary': 1, 'Fund': 1, 'IMF': 5, 'has': 6, 'promised': 1, 'rapid': 3, 'action': 1, 'help': 2, 'governments': 2, 'cope': 1, 'stands': 1, 'ready': 1, 'do': 1, 'its': 2, 'assist': 1, 'these': 1, 'nations': 1, 'appropriate': 1, 'time': 1, 'need': 1, 'said': 10, 'managing': 1, 'director': 1, 'Rodrigo': 1, 'Rato': 1, 'Only': 1, 'Bangladesh': 1, 'currently': 1, 'receive': 1, 'while': 2, 'quake': 1, 's': 8, 'epicentre': 1, 'recently': 1, 'graduated': 1, 'assistance': 1, 'It': 2, 'up': 2, 'decide': 1, 'if': 1, 'they': 1, 'want': 1, 'Other': 2, 'such': 3, 'as': 8, 'Development': 1, 'early': 2, 'comment': 1, 'There': 1, 'no': 2, 'underestimating': 1, 'size': 1, 'problem': 1, 'however': 3, 'United': 1, 'Nations': 1, 'emergency': 2, 'coordinator': 1, 'Jan': 1, 'Egeland': 1, 'this': 2, 'may': 3, 'be': 5, 'national': 1, 'disaster': 2, 'recent': 1, 'history': 1, 'because': 2, 'affecting': 1, 'so': 3, 'many': 3, 'heavily': 1, 'populated': 1, 'coastal': 1, 'vulnerable': 1, 'communities': 2, 'Many': 2, 'will': 4, 'had': 1, 'livelihoods': 1, 'whole': 1, 'future': 1, 'few': 1, 'seconds': 1, 'He': 1, 'warned': 1, 'longer': 1, 'term': 2, 'effects': 1, 'devastating': 1, 'tidal': 1, 'wave': 1, 'or': 2, 'tsunami': 1, 'itself': 1, 'risks': 1, 'epidemics': 1, 'polluted': 1, 'drinking': 1, 'water': 1, 'Insurers': 1, 'also': 6, 'struggling': 1, 'assess': 1, 'but': 2, 'several': 1, 'big': 2, 'players': 1, 'believe': 1, 'final': 1, 'bill': 2, 'likely': 2, 'less': 1, 'than': 2, '27bn': 1, 'hurricanes': 1, 'battered': 1, 'US': 3, 'earlier': 1, 'year': 1, 'affected': 1, 'very': 1, 'we': 2, 'check': 1, 'country': 4, 'what': 3, 'situation': 1, 'Serge': 1, 'Troeber': 1, 'deputy': 1, 'head': 1, 'natural': 1, 'disasters': 1, 'department': 1, 'Swiss': 1, 'Re': 2, 'world': 2, 'second': 1, 'biggest': 3, 'reinsurance': 1, 'firm': 1, 'I': 1, 'should': 1, 'assume': 1, 'overall': 1, 'dimension': 1, 'insured': 1, 'damages': 2, 'below': 1, 'storm': 1, 'Munich': 1, 'reinsurer': 1, 'This': 1, 'primarily': 1, 'human': 1, 'tragedy': 1, 'us': 1, 'state': 1, 'our': 1, 'financial': 1, 'burden': 1, 'Allianz': 1, 'sees': 1, 'significant': 1, 'impact': 1, 'profitability': 1, 'However': 1, 'low': 1, 'insurance': 1, 'simply': 1, 'reflect': 1, 'general': 1, 'poverty': 1, 'much': 2, 'rather': 1, 'level': 1, 'economic': 3, 'devastation': 1, 'who': 1, 'live': 1, 'there': 1, 'Federation': 1, 'Red': 2, 'Cross': 1, 'Crescent': 1, 'Societies': 1, 'Reuters': 1, 'news': 1, 'agency': 2, 'was': 7, 'seeking': 1, '6': 2, '5m': 1, 'health': 1, 'challenges': 1, 'face': 1, 'spread': 1, 'waterborne': 1, 'diseases': 1, 'particularly': 1, 'malaria': 1, 'diarrhoea': 1, 'quoted': 2, 'saying': 3, 'European': 1, 'Union': 1, 'deliver': 1, '3m': 1, 'euros': 1, '1m': 2, '4': 1, 'EU': 1, 'Humanitarian': 1, 'Aid': 1, 'Commissioner': 1, 'Louis': 1, 'Michel': 1, 'key': 2, 'bring': 1, 'hours': 1, 'days': 1, 'immediately': 1, 'reported': 1, 'State': 1, 'Department': 1, 'examining': 1, 'Getting': 1, 'companies': 2, 'business': 2, 'running': 1, 'play': 1, 'role': 1, 'helping': 1, 'recover': 1, 'weekend': 1, 'events': 1, 'Phuket': 1, 'island': 1, 'popular': 1, 'tourist': 2, 'resorts': 1, 'local': 1, 'December': 1, 'January': 1, 'busiest': 1, 'months': 2, 'even': 1, 'more': 1, 'keenly': 1, 'felt': 1, 'industry': 1, 'only': 1, 'just': 1, 'beginning': 1, 'emerge': 1, 'post': 2, '9': 1, '11': 1, 'slump': 1, 'Growth': 1, 'been': 1, 'southeast': 1, 'Organisation': 1, 'figures': 1, 'showing': 1, '45': 1, 'increase': 1, 'revenues': 1, 'during': 1, 'first': 1, '2004': 1, 'expansion': 1, 'continues': 1, 'excellent': 1, 'results': 1, 'thanks': 1, 'increased': 1, 'promotion': 1, 'product': 2, 'development': 2, 'upsurge': 1, 'driven': 1, 'WTO': 1, 'Arrivals': 1, 'other': 1, 'destinations': 1, 'thrived': 1, 'accounts': 1, 'annual': 1, 'gross': 1, 'domestic': 1, '8bn': 1, 'Singapore': 1, 'figure': 1, 'close': 1, '5': 1, 'brings': 1, 'foreign': 1, 'currency': 1, 'short': 1, 'cancelling': 1, 'flights': 1, 'trips': 1, 'That': 1, 'shares': 1, 'across': 1, 'Europe': 1, 'investors': 1, 'earnings': 1, 'growth': 1, 'slow': 1}
<reactivex.disposable.disposable.Disposable at 0x10c9eb820>
publisher = rx.of(1, 2, 3).pipe(
    op.take(5),
)

publisher.subscribe(
    lambda x: print(x),
)
1
2
3
<reactivex.disposable.disposable.Disposable at 0x10c9dce20>
import time 

started_at = time.time() # Time in seconds
end_at = started_at + 3600 # One hour after
ob = rx.interval(1)
sub = ob.pipe(op.take_while(lambda _: time.time() < end_at))
sub.subscribe(lambda i: print(i))
<reactivex.disposable.disposable.Disposable at 0x10c9dcf70>
0
1
2
3
4
5
6
7
8
9
10
11
12
13
14