views:

459

answers:

1

I am learning Python and, through the help of online resources and people on this site, am getting the hang of it. In this first script of mine, in which I'm parsing Twitter RSS feed entries and inserting the results into a database, there is one remaining problem that I cannot fix. Namely, duplicate entries are being inserted into one of the tables.

As a bit of background, I originally found a base script on HalOtis.com for downloading RSS feeds and then modified it in several ways: 1) modified to account for idiosyncracies in Twitter RSS feeds (it's not separated into content, title, URL, etc.); 2) added tables for "hashtags" and for the many-to-many relationship (entry_tag table); 3) changed table set-up to sqlalchemy; 4) made some ad hoc changes to account for weird unicode problems that were occurring. As a result, the code is ugly in places, but it has been a good learning experience and now works--except that it keeps inserting duplicates in the "entries" table.

Since I'm not sure what would be most helpful to people, I've pasted in the entire code below, with some comments in a few places to point out what I think is most important.

I would really appreciate any help with this. Thanks!

Edit: Somebody suggested I provide a schema for the database. I've never done this before, so if I'm not doing it right, bear with me. I am setting up four tables:

  1. RSSFeeds, which contains a list of Twitter RSS feeds
  2. RSSEntries, which contains a list of individual entries downloaded (after parsing) from each of the feeds (with columns for content, hashtags, date, url)
  3. Tags, which contains a list of all the hashtags that are found in individual entries (Tweets)
  4. entry_tag, which contains columns allowing me to map tags to entries.

In short, the script below grabs the five test RSS feeds from the RSS Feeds table, downloads the 20 latest entries / tweets from each feed, parses the entries, and puts the information into the RSS Entries, Tags, and entry_tag tables.

#!/usr/local/bin/python

import sqlite3
import threading
import time
import Queue
from time import strftime
import re       
from string import split 
import feedparser 
from django.utils.encoding import smart_str, smart_unicode      
from sqlalchemy import schema, types, ForeignKey, select, orm
from sqlalchemy import create_engine

engine = create_engine('sqlite:///test98.sqlite', echo=True)
metadata = schema.MetaData(engine)   
metadata.bind = engine

def now():
 return datetime.datetime.now()


#set up four tables, with many-to-many relationship
RSSFeeds = schema.Table('feeds', metadata,
 schema.Column('id', types.Integer, 
  schema.Sequence('feeds_seq_id', optional=True), primary_key=True),
 schema.Column('url', types.VARCHAR(1000), default=u''),
)


RSSEntries = schema.Table('entries', metadata,
 schema.Column('id', types.Integer, 
  schema.Sequence('entries_seq_id', optional=True), primary_key=True),
 schema.Column('feed_id', types.Integer, schema.ForeignKey('feeds.id')),
 schema.Column('short_url', types.VARCHAR(1000), default=u''),
 schema.Column('content', types.Text(), nullable=False),
 schema.Column('hashtags', types.Unicode(255)),
 schema.Column('date', types.String()),  
)


tag_table = schema.Table('tag', metadata,
 schema.Column('id', types.Integer,
    schema.Sequence('tag_seq_id', optional=True), primary_key=True),
 schema.Column('tagname', types.Unicode(20), nullable=False, unique=True),
)


entrytag_table = schema.Table('entrytag', metadata,
 schema.Column('id', types.Integer,
  schema.Sequence('entrytag_seq_id', optional=True), primary_key=True),
 schema.Column('entryid', types.Integer, schema.ForeignKey('entries.id')),
 schema.Column('tagid', types.Integer, schema.ForeignKey('tag.id')),
)


metadata.create_all(bind=engine, checkfirst=True)


# Insert test set of Twitter RSS feeds
stmt = RSSFeeds.insert()
stmt.execute(
 {'url': 'http://twitter.com/statuses/user_timeline/14908909.rss'},
 {'url': 'http://twitter.com/statuses/user_timeline/52903246.rss'},
 {'url': 'http://twitter.com/statuses/user_timeline/41902319.rss'},
 {'url': 'http://twitter.com/statuses/user_timeline/29950404.rss'},
 {'url': 'http://twitter.com/statuses/user_timeline/35699859.rss'},
)



#These 3 lines for threading process (see HalOtis.com for example) 
THREAD_LIMIT = 20
jobs = Queue.Queue(0)
rss_to_process = Queue.Queue(THREAD_LIMIT)


#connect to sqlite database and grab the 5 test RSS feeds
conn = engine.connect()
feeds = conn.execute('SELECT id, url FROM feeds').fetchall()

#This block contains all the parsing and DB insertion 
def store_feed_items(id, items):
 """ Takes a feed_id and a list of items and stores them in the DB """
 for entry in items:
  conn.execute('SELECT id from entries WHERE short_url=?', (entry.link,))
  #note: entry.summary contains entire feed entry for Twitter, 
                    #i.e., not separated into content, etc.
  s = unicode(entry.summary) 
  test = s.split()
  tinyurl2 = [i for i in test if i.startswith('http://')]
  hashtags2 = [i for i in s.split() if i.startswith('#')]
  content2 = ' '.join(i for i in s.split() if i not in tinyurl2+hashtags2)
  content = unicode(content2)
  tinyurl = unicode(tinyurl2)
  hashtags = unicode (hashtags2)
  print hashtags
  date = strftime("%Y-%m-%d %H:%M:%S",entry.updated_parsed)


  #Insert parsed feed data into entries table 
                    #THIS IS WHERE DUPLICATES OCCUR
  result = conn.execute(RSSEntries.insert(), {'feed_id': id, 'short_url': tinyurl,
   'content': content, 'hashtags': hashtags, 'date': date})
  entry_id = result.last_inserted_ids()[0]


  #Look up tag identifiers and create any that don't exist:
  tags = tag_table
  tag_id_query = select([tags.c.tagname, tags.c.id], tags.c.tagname.in_(hashtags2))
  tag_ids = dict(conn.execute(tag_id_query).fetchall())
  for tag in hashtags2:
   if tag not in tag_ids:
    result = conn.execute(tags.insert(), {'tagname': tag})
    tag_ids[tag] = result.last_inserted_ids()[0]

  #insert data into entrytag table 
  if hashtags2: conn.execute(entrytag_table.insert(),
   [{'entryid': entry_id, 'tagid': tag_ids[tag]} for tag in hashtags2])


#Rest of file completes the threading process     
def thread():
 while True:
  try:
   id, feed_url = jobs.get(False) # False = Don't wait
  except Queue.Empty:
   return

  entries = feedparser.parse(feed_url).entries
  rss_to_process.put((id, entries), True) # This will block if full

for info in feeds: # Queue them up
 jobs.put([info['id'], info['url']])

for n in xrange(THREAD_LIMIT):
 t = threading.Thread(target=thread)
 t.start()

while threading.activeCount() > 1 or not rss_to_process.empty():
 # That condition means we want to do this loop if there are threads
 # running OR there's stuff to process
 try:
  id, entries = rss_to_process.get(False, 1) # Wait for up to a second
 except Queue.Empty:
  continue

 store_feed_items(id, entries)
+1  A: 

It looks like you included SQLAlchemy into a previously existing script that didn't use SQLAlchemy. There are too many moving parts here that none of us apparently understand well enough.

I would recommend starting from scratch. Don't use threading. Don't use sqlalchemy. To start maybe don't even use an SQL database. Write a script that collects the information you want in the simplist possible way into a simple data structure using simple loops and maybe a time.sleep(). Then when that works you can add in storage to an SQL database, and I really don't think writing SQL statements directly is much harder than using an ORM and it's easier to debug IMHO. There is a good chance you will never need to add threading.

"If you think you are smart enough to write multi-threaded programs, you're not." -- James Ahlstrom.

Aaron Watters