I am learning Python and, through the help of online resources and people on this site, am getting the hang of it. In this first script of mine, in which I'm parsing Twitter RSS feed entries and inserting the results into a database, there is one remaining problem that I cannot fix. Namely, duplicate entries are being inserted into one of the tables.
As a bit of background, I originally found a base script on HalOtis.com for downloading RSS feeds and then modified it in several ways: 1) modified to account for idiosyncracies in Twitter RSS feeds (it's not separated into content, title, URL, etc.); 2) added tables for "hashtags" and for the many-to-many relationship (entry_tag table); 3) changed table set-up to sqlalchemy; 4) made some ad hoc changes to account for weird unicode problems that were occurring. As a result, the code is ugly in places, but it has been a good learning experience and now works--except that it keeps inserting duplicates in the "entries" table.
Since I'm not sure what would be most helpful to people, I've pasted in the entire code below, with some comments in a few places to point out what I think is most important.
I would really appreciate any help with this. Thanks!
Edit: Somebody suggested I provide a schema for the database. I've never done this before, so if I'm not doing it right, bear with me. I am setting up four tables:
- RSSFeeds, which contains a list of Twitter RSS feeds
- RSSEntries, which contains a list of individual entries downloaded (after parsing) from each of the feeds (with columns for content, hashtags, date, url)
- Tags, which contains a list of all the hashtags that are found in individual entries (Tweets)
- entry_tag, which contains columns allowing me to map tags to entries.
In short, the script below grabs the five test RSS feeds from the RSS Feeds table, downloads the 20 latest entries / tweets from each feed, parses the entries, and puts the information into the RSS Entries, Tags, and entry_tag tables.
#!/usr/local/bin/python
import sqlite3
import threading
import time
import Queue
from time import strftime
import re
from string import split
import feedparser
from django.utils.encoding import smart_str, smart_unicode
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine
engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)
metadata.bind = engine
def now():
return datetime.datetime.now()
#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
schema.Column('id', types.Integer,
schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
schema.Column('url', types.VARCHAR(1000), default=u''),
)
RSSEntries = schema.Table('entries', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entries_seq_id', optional=True), primary_key=True),
schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')),
schema.Column('short_url', types.VARCHAR(1000), default=u''),
schema.Column('content', types.Text(), nullable=False),
schema.Column('hashtags', types.Unicode(255)),
schema.Column('date', types.String()),
)
tag_table = schema.Table('tag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('tag_seq_id', optional=True), primary_key=True),
schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
)
entrytag_table = schema.Table('entrytag', metadata,
schema.Column('id', types.Integer,
schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')),
schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')),
)
metadata.create_all(bind=engine, checkfirst=True)
# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
stmt.execute(
{'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'},
{'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'},
)
#These 3 lines for threading process (see HalOtis.com for example)
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)
#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()
#This block contains all the parsing and DB insertion
def store_feed_items(id, items):
""" Takes a feed_id and a list of items and stores them in the DB """
for entry in items:
conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,))
#note: entry.summary contains entire feed entry for Twitter,
#i.e., not separated into content, etc.
s = unicode(entry.summary)
test = s.split()
tinyurl2 = [i for i in test if i.startswith('http://')]
hashtags2 = [i for i in s.split() if i.startswith('#')]
content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
content = unicode(content2)
tinyurl = unicode(tinyurl2)
hashtags = unicode (hashtags2)
print hashtags
date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)
#Insert parsed feed data into entries table
#THIS IS WHERE DUPLICATES OCCUR
result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
'content': content, 'hashtags': hashtags, 'date': date})
entry_id = result.last_inserted_ids()[0]
#Look up tag identifiers and create any that don't exist:
tags = tag_table
tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2))
tag_ids = dict(conn.execute(tag_id_query).fetchall())
for tag in hashtags2:
if tag not in tag_ids:
result = conn.execute(tags.insert(), {'tagname': tag})
tag_ids[tag] = result.last_inserted_ids()[0]
#insert data into entrytag table
if hashtags2: conn.execute(entrytag_table.insert(),
[{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])
#Rest of file completes the threading process
def thread():
while True:
try:
id, feed_url = jobs.get(False) # False = Don't wait
except Queue.Empty:
return
entries = feedparser.parse(feed_url).entries
rss_to_process.put((id, entries), True) # This will block if full
for info in feeds: # Queue them up
jobs.put([info['id'], info['url']])
for n in xrange(THREAD_LIMIT):
t = threading.Thread(target=thread)
t.start()
while threading.activeCount() > 1 or not rss_to_process.empty():
# That condition means we want to do this loop if there are threads
# running OR there's stuff to process
try:
id, entries = rss_to_process.get(False, 1) # Wait for up to a second
except Queue.Empty:
continue
store_feed_items(id, entries)