locked
Async Custom Sql Provider GetChangeBatch problems RRS feed

  • Question

  • I am using the sample code below as a start to building my own custom provider. It is currently an sql provider but the aim is to modify it to an outlook provider later. However I am having some issues with updates in a bidirectional sync (DownloadAndUpload).

    Currently the SaveLastSyncTime is in the EndSession method and it seems to run at the same time for both providers. Therefore the destination seems to miss updates made on the source replica.

    What is the best place for the SaveLastSyncTime. Am I doing something wrong here? What are the different options for storing the last sync time. Does the metadata service not handle this?

    Thanks. Would be good to solve this after countless hours of frustration.

    using System;
    using System.Collections.Generic;
    using System.IO;
    using Microsoft.Synchronization;
    using Microsoft.Synchronization.MetadataStorage;
    using System.Diagnostics;
    namespace BaseSyncProvider
    {
        /// <summary>
        /// Custom Synchronization Provider
        /// </summary>
        public abstract class BaseSyncProvider<T> : KnowledgeSyncProvider, 
                                                    IChangeDataRetriever, 
                                                    INotifyingChangeApplierTarget, 
                                                    IDisposable
        {
            //Built in SQL CE metadatatore provided by MSF.In this sample we are going to use the metadata store provided by MSF
            readonly SqlMetadataStore _metadataStore;
            readonly ReplicaMetadata _metadata;
            readonly SyncId _replicaId;
            readonly SyncIdFormatGroup _idFormats;
            SyncSessionContext _currentSessionContext;
            readonly string _replicaName;
            Dictionary<string, DateTime> _itemDataIds;
            /// <summary>
            /// Creates or opens the existing metadata store.
            /// </summary>
            /// <param name="replicaName"></param>
            /// <param name="fileName"></param>
            protected BaseSyncProvider(string replicaName, string fileName)
            {
                _replicaName = replicaName;
                _replicaId = new SyncId(replicaName);
                _idFormats = new SyncIdFormatGroup();
                _idFormats.ItemIdFormat.IsVariableLength = true;
                _idFormats.ItemIdFormat.Length = 500;
                _idFormats.ReplicaIdFormat.IsVariableLength = true;
                _idFormats.ReplicaIdFormat.Length = 500;
                //Create or open the metadata store, initializing it with the flexbile id formats we'll use to reference our items and endpoints
                if (!File.Exists(fileName))
                {
                    _metadataStore = SqlMetadataStore.CreateStore(fileName);
                    _metadata = _metadataStore.InitializeReplicaMetadata(_idFormats, _replicaId, null, null);
                }
                else
                {
                    _metadataStore = SqlMetadataStore.OpenStore(fileName);
                    _metadata = _metadataStore.GetReplicaMetadata(_idFormats, _replicaId);
                }
                _metadata.SetForgottenKnowledge(new ForgottenKnowledge(_idFormats, _metadata.GetKnowledge()));
            }
            // Abstract methods that need to be overridden in the derived classes.
            // These provide the functionality specific to each actual data store.
            public abstract void CreateItemData(T itemData);
            public abstract void UpdateItemData(T itemData);
            public abstract void DeleteItemData(T itemData);
            public abstract void UpdateItemDataWithDestination(T sourceItemData, T destItemData);
            public abstract void MergeItemData(T sourceItemData, T destItemData);
            public abstract T GetItemData(string itemDataId, string replicaName);
            public abstract Dictionary<string, DateTime> ItemDataIds { get; }
            /// <summary>
            /// BeginUpdates() begins a transaction for the replica.Using BeginUpdates and EndUpdates the operations on the replica can be made transactional.
            /// </summary>
            public void BeginUpdates()
            {
                _metadataStore.BeginTransaction();
            }
            /// <summary>
            /// EndUpdates ends a transaction for the replica
            /// </summary>
            public void EndUpdates()
            {
                _metadataStore.CommitTransaction();
            }
            public override void BeginSession(SyncProviderPosition position, SyncSessionContext syncSessionContext)
            {
                _currentSessionContext = syncSessionContext;
            }
            public override void EndSession(SyncSessionContext syncSessionContext)
            {
                //string filePath = Path.Combine(Environment.CurrentDirectory, _replicaName);
                //if (File.Exists(filePath))
                //{
                //    File.Delete(filePath);
                //}
                Debug.WriteLine("Ending session: " + _replicaName);
                SaveLastSyncTime(DateTime.Now);
            }
            public override FullEnumerationChangeBatch GetFullEnumerationChangeBatch(uint batchSize, SyncId lowerEnumerationBound, SyncKnowledge knowledgeForDataRetrieval, out object changeDataRetriever)
            {
                throw new NotImplementedException();
            }
            public override SyncIdFormatGroup IdFormats
            {
                get { return _idFormats; }
            }
            public override void ProcessFullEnumerationChangeBatch(ConflictResolutionPolicy resolutionPolicy, FullEnumerationChangeBatch sourceChanges, object changeDataRetriever, SyncCallbacks syncCallback, SyncSessionStatistics sessionStatistics)
            {
                throw new NotImplementedException();
            }
            public ulong GetNextTickCount()
            {
                return _metadata.GetNextTickCount();
            }
            public void SaveChangeWithChangeUnits(ItemChange change, SaveChangeWithChangeUnitsContext context)
            {
                throw new NotImplementedException();
            }
            public void SaveConflict(ItemChange conflictingChange, object conflictingChangeData, SyncKnowledge conflictingChangeKnowledge)
            {
                throw new NotImplementedException();
            }
            public IChangeDataRetriever GetDataRetriever()
            {
                return this;
            }
            public bool TryGetDestinationVersion(ItemChange sourceChange, out ItemChange destinationVersion)
            {
                destinationVersion = null;
                return false;
            }
            public override void GetSyncBatchParameters(out uint batchSize, out SyncKnowledge knowledge)
            {
                Debug.WriteLine("GetSyncBatchParameters called: " + _replicaName);
                batchSize = 10000;
                knowledge = _metadata.GetKnowledge();
            }
            public override ChangeBatch GetChangeBatch(uint batchSize, SyncKnowledge destinationKnowledge, out object changeDataRetriever)
            {
                Debug.WriteLine("GetChangeBatch called: " + _replicaName);
                //Make sure that Metadata store is updated with local changes in the replica.
                UpdateMetadataStoreWithChanges();
                // Construct the ChangeBatch and return it
                var batch = _metadata.GetChangeBatch(batchSize, destinationKnowledge);
                changeDataRetriever = this;
                return batch;
            }
            /// <summary>
            /// Sync framework calls the ProcessChangeBatch method on the destination provider 
            /// Destination provider receives the change versions and source knowledge in the form of two input parameters sourceChanges and changeDataRetriever.
            /// </summary>
            /// <param name="resolutionPolicy">Defines the way conflicts are handled</param>
            /// <param name="sourceChanges">Chnage batch from the source provider</param>
            /// <param name="changeDataRetriever">IChangeDataRetriever passed from the source</param>
            /// <param name="syncCallback">Sync call abck for raising events to sync agent</param>
            /// <param name="sessionStatistics">statistics about the sync session.</param>
            public override void ProcessChangeBatch(ConflictResolutionPolicy resolutionPolicy, ChangeBatch sourceChanges,
                object changeDataRetriever, SyncCallbacks syncCallback, SyncSessionStatistics sessionStatistics)
            {
                BeginUpdates(); 
                var localChanges = _metadata.GetLocalVersions(sourceChanges);
                var changeApplier = new NotifyingChangeApplier(_idFormats);
                changeApplier.ApplyChanges(resolutionPolicy, sourceChanges, changeDataRetriever as IChangeDataRetriever, localChanges, _metadata.GetKnowledge(),
                    _metadata.GetForgottenKnowledge(), this, _currentSessionContext, syncCallback);
                EndUpdates();
            }
            /// <summary>
            /// Sync framework calls the LoadChangeData method on the source provider.
            /// source provider retrieves the items requested by destination provider from its replica and sends them to the destination provider
            /// </summary>
            /// <param name="loadChangeContext"></param>
            /// <returns></returns>
            public object LoadChangeData(LoadChangeContext loadChangeContext)
            {
                var id = loadChangeContext.ItemChange.ItemId.GetStringId();
                return GetItemData(id, _replicaName);
            }
    
            /// <summary>
            /// Saves the item and metadata in destination replica
            /// </summary>
            /// <param name="saveChangeAction">specifies what operation to perform on destination replica like create/update or delete</param>
            /// <param name="change">contains the information about the change to an item</param>
            /// <param name="context">represents information about a change to be saved to the item store</param>
            public void SaveItemChange(SaveChangeAction saveChangeAction, ItemChange change, SaveChangeContext context)
            {
                ItemMetadata item;
                switch (saveChangeAction)
                {
                    case SaveChangeAction.Create:
                        //Save the Change
                        var itemData = (T)context.ChangeData;
                        CreateItemData(itemData);
                        //Save the metadata
                        item = _metadata.CreateItemMetadata(change.ItemId, change.CreationVersion);
                        item.ChangeVersion = change.ChangeVersion;
                        _metadata.SaveItemMetadata(item);
                        break;
                    case SaveChangeAction.UpdateVersionAndData:
                        {
                            item = _metadata.FindItemMetadataById(change.ItemId);
                            if (null == item)
                            {
                                throw new Exception("Record is not present in replica");
                            }
                            item.ChangeVersion = change.ChangeVersion;
                            var destItemData = GetItemData(item.GlobalId.GetStringId(), _replicaName);                        
                            var sourceItemData = (T)context.ChangeData;
                            UpdateItemDataWithDestination(sourceItemData, destItemData);
                            _metadata.SaveItemMetadata(item);
                        }
                        break;
                    case SaveChangeAction.UpdateVersionOnly:
                        {
                            item = _metadata.FindItemMetadataById(change.ItemId);
                            if (null == item)
                            {
                                throw new Exception("Record is not present in replica");
                            }
                            item.ChangeVersion = change.ChangeVersion;
                            _metadata.SaveItemMetadata(item);
                        }
                        break;
                    case SaveChangeAction.DeleteAndStoreTombstone:
                        item = _metadata.FindItemMetadataById(change.ItemId);
                        if (null == item)
                        {
                            item = _metadata.CreateItemMetadata(change.ItemId, change.CreationVersion);
                        }
                        if (change.ChangeKind == ChangeKind.Deleted)
                        {
                            item.MarkAsDeleted(change.ChangeVersion);
                        }
                        else
                        {
                            throw new Exception("Invalid changeType");
                        }
                        item.ChangeVersion = change.ChangeVersion;
                        var itemDataToBeDeleted = GetItemData(item.GlobalId.GetStringId(), _replicaName);
                        if (itemDataToBeDeleted != null) DeleteItemData(itemDataToBeDeleted);
                        _metadata.SaveItemMetadata(item);
                        break;
                    case SaveChangeAction.UpdateVersionAndMergeData:
                        item = _metadata.FindItemMetadataById(change.ItemId);
                        item.ChangeVersion = new SyncVersion(0, _metadata.GetNextTickCount());
                        var destItemDataMerge = GetItemData(item.GlobalId.GetStringId(), _replicaName);
                        var sourceItemDataMerge = (T)context.ChangeData;
                        MergeItemData(sourceItemDataMerge, destItemDataMerge);
                        _metadata.SaveItemMetadata(item);
                        break;
                }
            }
            /// <summary>
            /// We also need to save the knowledge after each sync.
            /// We just save the knowledge and forgotten Knowledge in the Replica metadata store.
            /// </summary>
            /// <param name="knowledge"></param>
            /// <param name="forgottenKnowledge"></param>
            public void StoreKnowledgeForScope(SyncKnowledge knowledge, ForgottenKnowledge forgottenKnowledge)
            {
                _metadata.SetKnowledge(knowledge);
                _metadata.SetForgottenKnowledge(forgottenKnowledge);
                _metadata.SaveReplicaMetadata();
            }
            /// <summary>
            /// This Methods Performs Aynchronous Change Tracking
            /// </summary>
            protected void UpdateMetadataStoreWithChanges()
            {
                _metadataStore.BeginTransaction();
                _itemDataIds = ItemDataIds;
                _metadata.DeleteDetector.MarkAllItemsUnreported();
                foreach (var itemDataId in _itemDataIds)
                {
                    var syncId = new SyncId(itemDataId.Key);
                    var existingItem = _metadata.FindItemMetadataById(syncId);
                    if (existingItem == null)
                    {
                        //Creates new item metadata by using the 
                        //1. sync id [Created from ID field of Customer]
                        //2. sync version which has replica Id [0] and the tick count which a logical sync clock
                        existingItem = _metadata.CreateItemMetadata(syncId, new SyncVersion(0, _metadata.GetNextTickCount()));
                        //sets teh change version to created version
                        existingItem.ChangeVersion = existingItem.CreationVersion;
                        //saves the item in metadata store
                        _metadata.SaveItemMetadata(existingItem);
                    }
                    else
                    {
                        if (itemDataId.Value.CompareTo(GetLastSyncTime()) > 0)
                        {
                            //sets the change version by incrementing the tick count
                            existingItem.ChangeVersion = new SyncVersion(0, _metadata.GetNextTickCount());
                            _metadata.SaveItemMetadata(existingItem);
                        }
                        else
                        {
                            _metadata.DeleteDetector.ReportLiveItemById(syncId);
                        }
                    }
                }
                foreach (var item in _metadata.DeleteDetector.FindUnreportedItems())
                {
                    item.ChangeVersion = new SyncVersion(0, _metadata.GetNextTickCount());
                    item.MarkAsDeleted(item.ChangeVersion);
                    _metadata.SaveItemMetadata(item);
                }
                _metadataStore.CommitTransaction();
            }
            private DateTime GetLastSyncTime()
            {
                var lastSyncTimeFilePath = Path.Combine(Environment.CurrentDirectory, _replicaName + @".LastSyncTime");
                if (!File.Exists(lastSyncTimeFilePath))
                {
                    var dt = DateTime.MinValue;
                    SaveLastSyncTime(dt);
                    return dt;
                }
                using (var fs = File.Open(lastSyncTimeFilePath, FileMode.Open, FileAccess.ReadWrite, FileShare.None))
                {
                    // Read the LastSyncTime from the file.
                    fs.Seek(0, SeekOrigin.Begin);
                    var br = new BinaryReader(fs);
                    var dt = new DateTime(br.ReadInt64());
                    Debug.WriteLine(_replicaName + " GetLastSyncTime: " + dt.ToString());
                    return dt;
                }
            }
            private void SaveLastSyncTime(DateTime dt)
            {
                using (var fs = File.Open(Path.Combine(Environment.CurrentDirectory, _replicaName + @".LastSyncTime"), FileMode.OpenOrCreate, FileAccess.ReadWrite, FileShare.None))
                {
                    // Write the LastSyncTime to the file.
                    fs.SetLength(0);
                    var bw = new BinaryWriter(fs);
                    Debug.WriteLine(_replicaName + " SaveLastSyncTime: " + dt.ToString());
                    bw.Write(dt.Ticks);
                }
            }
            ~BaseSyncProvider()
            {
                CleanUp(false);
            }
            private bool _disposed;
            public void Dispose()
            {
                CleanUp(true);
                GC.SuppressFinalize(this);
            }
            private void CleanUp(bool disposing)
            {
                if (!_disposed)
                {
                    if (disposing)
                    {
                        // Dispose managed resources.
                        _metadataStore.Dispose();
                    }
                    // Clean up unmanaged resources here.
                }
                _disposed = true;
            }
        }
    }
    • Moved by Max Wang_1983 Thursday, April 21, 2011 5:27 PM forum consolidation (From:SyncFx - Technical Discussion [ReadOnly])
    Sunday, April 26, 2009 12:40 PM

Answers

  • Hi there,

    Sorry for the delay.

    Short answer: you're missing updates from one of the providers because you detect changes (in GetChangeBatch) for only one provider but save the last sync time (which you use for detecting changes) for both providers. Because of that one of the providers has an updated sync time without actually detecting local changes.

    Long answer: When you get a bit closer to the metal, sync consists of uni-directional sessions. When you ask for a bidirectional session you're really getting two sessions in opposite directions. In any single session, both providers will get called on EndSession/BeginSession. Only the source provider will be called on GetChangeBatch. I think that explains the immediate problem you're having - one way of fixing it would be to check the provider position in BeginSession and only update the sync time for the source provider.

    Having said that this model (detect changes on the source only) is one of the models you can use. What we typically recommend is detecting changes on both providers, so that you'd call UpdateMetadataStoreWithChanges from BeginSession instead of GetChangeBatch. Doing that gives you the ability to detect conflicts between simultaneous changes on both providers. Say you updated item A on provider 1, and also updated it on provider 2. If for the first uni-directional sync you only detect changes on provider 1, you then need a mechanism for detecting that there has been an independent update on provider 2 while trying to apply the update from 1. If you instead detect changes on both 1 and 2, MSF will detect the conflict for you.

    Another consideration here is that EndSession is too late to be saving the sync time, since you don't want to miss any user changes made after change detection but before saving the time. It's best to save the timestamp immediately after updating the metadata store with changes is complete, and disallow changes from the user during that time. When you do that you have a new problem though: how to ensure the changes applied from the sync session are not detected as new changes? There are two ways to do this I can think of right now:

    - Disallow user changes to the store for the entire duration of the session if you can and then update the sync timestamp in EndSession
    - Make sure that you can differentiate between sync-originated changes and user-originated changes so that the sync changes are not re-detected as new changes. One example would be to apply sync-originated changes with a retroactive timestamp that is earlier than the timestamp stored at the end of change detection. User changes have a normal timestamp that is greater than the stored timestamp and will be picked up next time.

    Hope that helps,

    Lukasz
    Friday, May 1, 2009 1:58 AM