views:

77

answers:

4

i have one question. I ready many pages about best thereading like this http://www.albahari.com/threading/part4.aspx.

everything is written fine, but i still have problem with threads. I run 6 threads at the same time. I am parsing some data and this data must be stored to database. But i can not store the same data twice.

Now i get many replicated data in database. How can i prevent this. lock() is not good i think. I want to use Monitor, but i don't know if this is ok.

This is thread code:

CultureInfo contentCulture = (CultureInfo)propertyBag["LanguageCulture"].Value;
                string cultureDisplayValue = "N/A";
                if (!contentCulture.IsNull())
                {
                    cultureDisplayValue = contentCulture.DisplayName;
                }

                AllocConsole();

                Console.Out.WriteLine();
                Console.Out.WriteLine("Url: {0}", propertyBag.Step.Uri);
                Console.Out.WriteLine("Content type: {0}", propertyBag.ContentType);
                Console.Out.WriteLine("Content length: {0}", propertyBag.Text.IsNull() ? 0 : propertyBag.Text.Length);
                Console.Out.WriteLine("Depth: {0}", propertyBag.Step.Depth);
                Console.Out.WriteLine("Culture: {0}", cultureDisplayValue);
                Console.Out.WriteLine("ThreadId: {0}", Thread.CurrentThread.ManagedThreadId);
                Console.Out.WriteLine("Thread Count: {0}", crawler.ThreadsInUse);
                Console.Out.WriteLine();

                ConsoleCount++;

                if (ConsoleCount > 1000)
                {
                    Console.Clear();
                    ConsoleCount = 0;
                }

                HtmlDocument htmlDoc = new HtmlDocument();
                Encoding documentEncoding = htmlDoc.DetectEncoding(propertyBag.GetResponse());
                propertyBag.GetResponse().Seek(0, SeekOrigin.Begin);

                if (documentEncoding != null)
                {
                    htmlDoc.Load(propertyBag.GetResponse(), documentEncoding, true);
                }
                else
                {
                    htmlDoc.Load(propertyBag.GetResponse(), true);
                }

                string htmlContent = htmlDoc.DocumentNode.OuterHtml;
                if (string.IsNullOrEmpty(htmlContent)) return;

                IAdvertismentsDao advertismentsDao = DaoFactory.GetAdvertisementsDao();
                List<TagValuePair> listTagValuePair = HtmlHelper.GetTagsAndValues(htmlContent);
                string link = propertyBag.Step.Uri.ToString();

                if (string.IsNullOrEmpty(link))
                {
                    link = propertyBag.ResponseUri.ToString();
                }


                Advertisements ad =
                    new CrawlerManager(DaoFactory, ConnectionString).GetAdvertismentFromHtmlContent(
                        listTagValuePair, Agency, link);

                if (ad != null)
                {
                    if (!advertismentsDao.AdvertisementUrlExist(ad.Url))
                    {
                        if (
                            !advertismentsDao.AdvertisementExist(ad.Price, ad.HollidayDuration, ad.Name,
                                                                 ad.Description, ad.City, ad.Area, ad.Country,
                                                                 ad.Agency))
                        {
                            advertismentsDao.Save(ad);
                            advertismentsDao.CommitChanges();
                        }
                    }
                    else
                    {
                        if (advertismentsDao.ChekIfNeedUpdate(ad))
                        {
                            Advertisements advertisements = advertismentsDao.GetByUrl(ad.Url);

                            advertisements.Price = ad.Price;
                            advertisements.HollidayDuration = ad.HollidayDuration;
                            advertisements.Name = ad.Name;
                            advertisements.Description = ad.Description;
                            advertisements.DepartureDate = ad.DepartureDate;

                            advertismentsDao.SaveOrUpdate(advertisements);
                            advertismentsDao.CommitChanges();
                        }
                    }

                    InvokeEvent(ad, string.Empty);
                }
                else
                    InvokeEvent(null, link);
A: 

You must define a unique index on your data which is the "business key", i.e. something take makes the row "unique" in your case.

The database will then throw exceptions if you insert the same data twice. You can then either ignore this exception (data already there) or update the existing row (for example to count the number of times an item appears).

Aaron Digulla
+1  A: 

The problem is that you do not correctly split up the data that you are parsing. You say you have six threads parsing data but some of them are now parsing the same data, obviously.

Looking at your code, I think the problem is your propertyBag. I'm not sure what that is, but I think it doesn't give each thread the correct data to parse. You may want to take a look at the ConcurrentQueue class to get some ideas.

Ronald Wildenberg
A: 

Since the database can be accessed from anywhere with required privileges (not just multiple threads in your process) you need to shift your frame of reference from thinking about concurrency intra-process to across all possible users of the DB. Your locking needs to happen in the DB, and data integrity enforced by keys and constraints there.

Steve Townsend
+1  A: 

I'd guess you have an issue with:

if (!advertismentsDao.AdvertisementUrlExist(ad.Url))
{
    if (
        !advertismentsDao.AdvertisementExist(ad.Price, ad.HollidayDuration, ad.Name,
                                             ad.Description, ad.City, ad.Area, ad.Country,
                                             ad.Agency))
    {
       advertismentsDao.Save(ad);
       advertismentsDao.CommitChanges();
    }
}

It seems entirely possible that Thread #1 will see that the Url and Advertisement doesn't exist - and then be prempted by Thread #2. Thread #2 will also see that the Url and Advertisement doesn't exist, and then both threads will attempt to save.

A couple of thoughts on how to solve it:

  1. Split up your input, so that no 2 threads are working on the same Advertisment
  2. Move the "if exists" logic into the database, and take advantage of row locks and such
  3. Lock around the whole "if exists, then save" portion.
  4. Use a mutex based on ad.GetHashCode() to ensure only 1 thread is working on a similar Advertisement at a time. You could retrieve this mutex from the AdvertisementUrlExist call - and block there until it's available. Of course, you're basically implementing row-level locking at that point.
  5. Remove the if exists checks altogether, and just write the data to the database. You can aggregate and such nightly, or on demand with SELECT.
Mark Brackett