כולנו פותרים בעיית " הדרך הקצרה ביותר " פעמים רבות ביום. בלי כוונה כמובן. אנחנו פותרים את זה בדרך העבודה שלנו או כשאנחנו גולשים באינטרנט או כשאנחנו מסדרים את הדברים שלנו על השולחן.
נשמע קצת... יותר מדי? בואו לגלות.
תארו לעצמכם, החלטתם להיפגש עם חברים, ובכן... נניח בבית קפה. קודם כל, אתה צריך למצוא מסלול (או שביל) לבית קפה, אז אתה מתחיל לחפש תחבורה ציבורית זמינה (אם אתה ברגל) או מסלולים ורחובות (אם אתה נוהג). אתה מחפש מסלול מהמיקום הנוכחי שלך לבית קפה אבל לא "כל" מסלול - הקצר ביותר או המהיר ביותר.
הנה עוד דוגמה אחת, שאינה כל כך ברורה כמו הקודמת. במהלך העבודה בדרך שלך אתה מחליט לקחת "קיצור דרך" ברחוב צדדי כי טוב... זה "קיצור דרך" ו"מהיר" יותר ללכת בדרך זו. אבל איך ידעת שה"קיצור" הזה מהיר יותר? על סמך ניסיון אישי פתרת בעיית "הדרך הקצרה ביותר" ובחרת מסלול, שעובר ברחוב הצדדי.
בשתי הדוגמאות, המסלול ה"קצר" ביותר נקבע במרחק או בזמן הנדרש כדי להגיע ממקום אחד למשנהו. דוגמאות מטיילות הן יישומים טבעיים מאוד של תורת הגרפים ובעיית "הנתיב הקצר ביותר" בפרט. עם זאת, מה לגבי הדוגמה עם סידור דברים על שולחן? במקרה זה ה"קצר" ביותר יכול לייצג מספר או מורכבות של פעולות שעליך לבצע כדי לקבל, למשל, דף נייר: פתח שולחן, פתיחת תיקייה, קח דף נייר לעומת קח דף נייר ישירות מהשולחן .
כל הדוגמאות לעיל מייצגות בעיה של מציאת הנתיב הקצר ביותר בין שני קודקודים בגרף (מסלול או נתיב בין שני מקומות, מספר פעולות או מורכבות של קבלת דף נייר ממקום זה או אחר). מחלקה זו של בעיות הנתיב הקצר ביותר נקראת SSSP (Single Source Shortest Path) והאלגוריתם הבסיסי לפתרון בעיות אלו הוא האלגוריתם של Dijkstra , בעל מורכבות חישובית O(n^2)
.
אבל, לפעמים אנחנו צריכים למצוא את כל הנתיבים הקצרים ביותר בין כל הקודקודים. שקול את הדוגמה הבאה: אתה יוצר עבורך מפה תנועות קבועות בין הבית , העבודה והתיאטרון . בתרחיש זה תקבלו 6 מסלולים: work ⭢ home
, home ⭢ work
, work ⭢ theatre
, theatre ⭢ work
, home ⭢ theatre
theatre ⭢ home
(המסלולים ההפוכים יכולים להיות שונים בגלל כבישים חד-סטריים למשל) .
הוספת מקום נוסף למפה תגרום לגידול משמעותי של שילובים - על פי תמורות של n נוסחה של קומבינטוריקה ניתן לחשב זאת כך:
A(k, n) = n! / (n - m)! // where n - is a number of elements, // k - is a number of elements in permutation (in our case k = 2)
מה שנותן לנו 12 מסלולים ל-4 מקומות ו-90 מסלולים ל-10 מקומות (וזה מרשים). שימו לב... זאת מבלי להתחשב בנקודות ביניים בין מקומות, כלומר, כדי להגיע מהבית לעבודה צריך לחצות 4 רחובות, ללכת לאורך הנהר ולחצות את הגשר. אם נדמיין, למסלולים מסוימים יכולים להיות נקודות ביניים משותפות... ובכן... כתוצאה מכך יהיה לנו גרף גדול מאוד, עם הרבה קודקודים, כאשר כל קודקוד ייצג או מקום או נקודת ביניים משמעותית. מחלקת הבעיות, שבה עלינו למצוא את כל הנתיבים הקצרים ביותר בין כל זוגות הקודקודים בגרף, נקראת APSP (All Pairs Shortest Paths) והאלגוריתם הבסיסי לפתרון בעיות אלו הוא Floyd-Warshall algorithm , שיש לו O(n^3)
מורכבות חישובית.
וזה האלגוריתם שנטמיע היום 🙂
אלגוריתם פלויד-ורשל מוצא את כל הנתיבים הקצרים ביותר בין כל זוג קודקודים בגרף. האלגוריתמים פורסמו על ידי רוברט פלויד ב-[1] (ראה סעיף "הפניות" לפרטים נוספים). באותה שנה, פיטר אינגרמן ב-[2] תיאר יישום מודרני של האלגוריתם בצורה של שלושה לולאות for
:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm // where W - is a weight matrix of N x N size, // min() - is a function which returns lesser of it's arguments
אם מעולם לא היה לך שינוי לעבוד עם גרף המיוצג בצורה של מטריצה אז זה יכול להיות קשה להבין מה האלגוריתם לעיל עושה. אז, כדי להבטיח שאנחנו באותו עמוד, הבה נבחן כיצד ניתן לייצג גרף בצורה של מטריצה ומדוע ייצוג כזה מועיל לפתרון בעיית הנתיב הקצר ביותר.
התמונה למטה ממחישה גרף מכוון ומשוקלל של 5 קודקודים. משמאל, הגרף מוצג בצורה ויזואלית, העשוי מעיגולים וחצים, כאשר כל עיגול מייצג קודקוד וחץ מייצג קצה עם כיוון. מספר בתוך עיגול מתאים למספר קודקוד ומספר מעל קצה מתאים למשקל הקצה. בצד ימין, אותו גרף מוצג בצורה של מטריצת משקל. מטריצת משקל היא צורה של מטריצת סמיכות שבה כל תא מטריצה מכיל "משקל" - מרחק בין קודקוד i
(שורה) לקודקוד j
(עמודה). מטריצת משקל לא כוללת מידע על "נתיב" בין קודקודים (רשימת קודקודים שדרכם מגיעים מ- i
ל- j
) - רק משקל (מרחק) בין קודקודים אלה.
במטריצת משקל אנו יכולים לראות שערכי התא שווים למשקולות בין קודקודים סמוכים . לכן, אם נבדוק נתיבים מקודקוד 0
(שורה 0
), נראה ש... יש רק נתיב אחד - מ 0
ל 1
. אבל, על ייצוג חזותי אנו יכולים לראות בבירור נתיבים מקודקוד 0
לקודקודים 2
ו 3
(דרך קודקוד 1
). הסיבה לכך פשוטה - במצב התחלתי, מטריצת משקל מכילה מרחק בין קודקודים סמוכים בלבד. עם זאת, מידע זה לבדו מספיק כדי למצוא את השאר.
בואו נראה איך זה עובד. שימו לב לתא W[0, 1]
. הערך שלו מצביע על כך שיש נתיב מקודקוד 0
לקודקוד 1
עם משקל שווה ל 1
(בקיצור נוכל לכתוב כמו: 0 ⭢ 1: 1
). עם הידע הזה, אנחנו יכולים כעת לסרוק את כל התאים של שורה 1
(המכילה את כל המשקלים של כל הנתיבים מקודקוד 1
) ולהעביר את המידע הזה בחזרה לשורה 0
, ולהגדיל את המשקל ב 1
(ערך של W[0, 1]
).
באמצעות אותם שלבים נוכל למצוא נתיבים מקודקוד 0
דרך קודקודים אחרים. במהלך החיפוש, יכול לקרות שיש יותר מנתיב אחד המוביל לאותו קודקוד ומה שחשוב יותר המשקלים של נתיבים אלו יכולים להיות שונים. דוגמה למצב כזה מומחשת בתמונה למטה, כאשר חיפוש מקודקוד 0
עד קודקוד 2
גילה נתיב נוסף לקודקוד 3
במשקל קטן יותר.
יש לנו שני נתיבים: נתיב מקורי – 0 ⭢ 3: 4
ונתיב חדש שגילינו זה עתה – 0 ⭢ 2 ⭢ 3: 3
(זכור, מטריצת משקל לא מכילה נתיבים, אז אנחנו לא יודעים איזה קודקודים כלולים בנתיב המקורי). זה הרגע שבו אנחנו מקבלים החלטה לשמור על הקצר ביותר ולכתוב 3
לתא W[0, 3]
.
נראה שכרגע מצאנו את הדרך הקצרה הראשונה שלנו!
השלבים שראינו זה עתה הם המהות של אלגוריתם פלויד-ורשל. תסתכל פעם נוספת על הפסאודוקוד של האלגוריתם:
algorithm FloydWarshall(W) do for k = 0 to N - 1 do for i = 0 to N - 1 do for j = 0 to N - 1 do W[i, j] = min(W[i, j], W[i, k] + W[k, j]) end for end for end for end algorithm
המחזור החיצוני ביותר for
on k
עובר על כל הקודקודים בגרף ובכל איטרציה, משתנה k
מייצג קודקוד שאנו מחפשים דרכו . מחזור פנימי for
on i
חוזר גם על כל הקודקודים בגרף ובכל איטרציה, i
מייצג קודקוד שממנו אנו מחפשים נתיבים . ולבסוף, מחזור פנימי ביותר for
ב- j
איטרציות על כל הקודקודים בגרף ובכל איטרציה, j
מייצג קודקוד אליו אנו מחפשים נתיב. בשילוב זה נותן לנו את הדברים הבאים: בכל איטרציה k
אנחנו מחפשים נתיבים מכל הקודקודים i
לתוך כל הקודקודים j
דרך קודקוד k
. בתוך מחזור נשווה נתיב i ⭢ j
(מיוצג על ידי W[i, j]
) עם נתיב i ⭢ k ⭢ j
(מיוצג על ידי סכום של W[I, k]
ו- W[k, j]
) וכותבים את הקצר ביותר אחד בחזרה לתוך W[i, j]
.
כעת, כאשר אנו מבינים את המכניקה הגיע הזמן ליישם את האלגוריתם.
קוד המקור והגרפים הניסיוניים זמינים במאגר ב-GitHub. ניתן למצוא את הגרפים הניסיוניים בספריית Data/Sparse-Graphs.zip
. כל אמות המידה בפוסט זה מיושמות בקובץ APSP01.cs .
לפני שצולל ליישום עלינו להבהיר כמה רגעים טכניים:
NO_EDGE = (int.MaxValue / 2) - 1
.
עכשיו, בואו נבין למה זה.
לגבי מס' 1. כאשר אנו מדברים על מטריצות אנו מגדירים תאים במונחים של "שורות" ו"עמודות". בגלל זה נראה טבעי לדמיין מטריצה בצורה של "ריבוע" או "מלבן" (תמונה 4א).
עם זאת, זה לא אומר שעלינו לייצג מטריצה בצורה של מערך של מערכים (תמונה 4ב) כדי לדבוק בדמיון שלנו. במקום זאת, אנו יכולים לייצג מטריצה בצורה של מערך ליניאלי (תמונה 4c) כאשר האינדקס של כל תא מחושב באמצעות הנוסחה הבאה:
i = row * row_size + col; // where row - cell row index, // col - cell column index, // row_size - number of cells in a row.
מערך ליניאלי של מטריצת משקל הוא תנאי מוקדם לביצוע יעיל של אלגוריתם פלויד-ורשל. הסיבה לכך אינה פשוטה והסבר מפורט ראוי לפוסט נפרד... או כמה פוסטים. עם זאת, נכון לעכשיו, חשוב להזכיר שייצוג כזה משפר באופן משמעותי את מקומיות הנתונים , שלמעשה יש לו השפעה רבה על ביצועי האלגוריתם.
הנה, אני מבקש מכם להאמין לי ורק למידע זה בחשבון כתנאי מוקדם, אולם במקביל אני ממליץ לכם להקדיש זמן וללמוד את השאלה, ודרך אגב – אל תאמינו לאנשים באינטרנט .
- הערת המחבר
לגבי מס' 2. אם תסתכל מקרוב על האלגוריתם פסאודוקוד, לא תמצא שום בדיקה הקשורה לקיומו של נתיב בין שני קודקודים. במקום זאת, פסאודוקוד פשוט השתמש בפונקציה min()
. הסיבה פשוטה - במקור, אם אין נתיב בין לקודקודים, ערך תא מוגדר infinity
ובכל השפות, למעט שעשויה להיות JavaScript, כל הערכים קטנים infinity
. במקרה של מספרים שלמים זה עשוי להיות מפתה להשתמש ב- int.MaxValue
כערך "ללא נתיב". עם זאת, הדבר יוביל לגלישה של מספרים שלמים במקרים בהם הערכים של שני הנתיבים i ⭢ k
ו- k ⭢ j
יהיו שווים ל- int.MaxValue
. לכן אנו משתמשים בערך שהוא אחד פחות מחצי מה- int.MaxValue
.
היי! אבל למה אנחנו לא יכולים פשוט לבדוק אם נתיב קיים לפני ביצוע חישובים כלשהם. למשל על ידי השוואת נתיבים שניהם ל-0 (אם ניקח אפס כערך "ללא נתיב").
- קורא מתחשב
זה אכן אפשרי אבל למרבה הצער זה יוביל לעונש ביצוע משמעותי. בקיצור, CPU שומר נתונים סטטיסטיים של תוצאות הערכת ענפים, למשל. כאשר חלק מהצהרות if
מוערכות true
או false
. הוא משתמש בסטטיסטיקה זו כדי לבצע קוד של "ענף חזוי סטטיסטית" מראש לפני הערכת הצהרת if
בפועל (זה נקרא ביצוע ספקולטיבי ) ולכן לבצע קוד בצורה יעילה יותר. עם זאת, כאשר חיזוי CPU אינו מדויק, הוא גורם לאובדן ביצועים משמעותי בהשוואה לחיזוי נכון וביצוע ללא תנאי מכיוון שה-CPU צריך לעצור ולחשב את הענף הנכון.
מכיוון שבכל איטרציה k
אנו מעדכנים חלק ניכר ממטריצת המשקל, הסטטיסטיקה של ענפי ה-CPU הופכת חסרת תועלת מכיוון שאין תבנית קוד, כל הענפים מבוססים אך ורק על נתונים. אז בדיקה כזו תגרום לכמות משמעותית של תחזיות שגויות בענף .
כאן אני גם מבקש מכם להאמין לי (בינתיים) ואז להקדיש זמן ללימוד הנושא.
אהה, נראה שסיימנו עם החלק התיאורטי - בוא ניישם את האלגוריתם (אנו מציינים את היישום הזה Baseline
):
public void Baseline(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
הקוד שלמעלה הוא כמעט עותק זהה של הפסאודוקוד שהוזכר קודם לכן למעט חריג בודד - במקום Math.Min()
אנו משתמשים if
כדי לעדכן תא רק בעת הצורך.
היי! חכה רגע, לא אתה זה שכתבת הרבה מילים על למה אם לא טובים כאן וכמה שורות אחר כך אנחנו בעצמנו מציגים אם?
- קורא מתחשב
הסיבה פשוטה. ברגע הכתיבה JIT פולט קוד כמעט שווה ליישומים של if
ו- Math.Min
. אתה יכול לבדוק את זה בפרטים ב- sharplab.io אבל הנה קטעים של גופי הלולאה הראשיים:
// // == Assembly code of implementation of innermost loop for of j using if. // 53: movsxd r14, r14d // // compare matrix[i * sz + j] and distance (Condition) // 56: cmp [rdx+r14*4+0x10], ebx 5b: jle short 64 // // if matrix[i * sz + j] greater than distance write distance to matrix // 5d: movsxd rbp, ebp 60: mov [rdx+rbp*4+0x10], ebx 64: // end of loop // // == Assembly code of implementation of innermost loop for of j using Math.Min. // 4f: movsxd rbp, ebp 52: mov r14d, [rdx+rbp*4+0x10] // // compare matrix[i * sz + j] and distance (Condition) // 57: cmp r14d, ebx. // // if matrix[i * sz + j] less than distance write value to matrix // 5a: jle short 5e // // otherwise write distance to matrix // 5c: jmp short 61 5e: mov ebx, r14d 61: mov [rdx+rbp*4+0x10], ebx 65: // end of loop
אתה עשוי לראות, ללא קשר אם אנו משתמשים if
או Math.Min
עדיין יש בדיקה מותנית. עם זאת, במקרה של if
אין כתוב מיותר.
עכשיו כשסיימנו עם היישום, הגיע הזמן להפעיל את הקוד ולראות כמה הוא מהיר?!
אתה יכול לאמת את תקינות הקוד בעצמך על ידי הפעלת בדיקות במאגר .
אני משתמש ב-Benchmark.NET (גרסה 0.12.1) כדי לסמן קוד. כל הגרפים המשמשים בנצ'מרק הם גרפים א-ציקליים מכוונים של 300, 600, 1200, 2400 ו-4800 קודקודים. מספר הקצוות בגרפים הוא בסביבות 80% מהמקסימום האפשרי, אשר עבור גרפים מכוונים, א-מחזוריים ניתן לחשב כך:
var max = v * (v - 1)) / 2; // where v - is a number of vertexes in a graph.
בואו נדנד!
להלן התוצאות של מדדים המופעלים על המחשב שלי (Windows 10.0.19042, Intel Core i7-7700 CPU 3.60Ghz (Kaby Lake) / 8 מעבדים לוגיים / 4 ליבות):
שִׁיטָה | גוֹדֶל | מְמוּצָע | שְׁגִיאָה | StdDev |
---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה |
מהתוצאות שאנו יכולים לראות, זמן החישוב גדל באופן דרמטי בהשוואה לגודל של גרף - עבור גרף של 300 קודקודים זה לקח 27 מילישניות, עבור גרף של 2400 קודקוד - 14.5 שניות ולגרף של 4800 - 119 שניות שהוא כמעט 2 דקות !
כשמסתכלים על הקוד של האלגוריתם, זה עלול להיות קשה לדמיין, יש משהו שאנחנו יכולים לעשות כדי להאיץ את החישובים... כי טוב... יש שלוש לולאות, רק שלוש לולאות.
עם זאת, כמו שזה קורה בדרך כלל - האפשרויות חבויות בפרטים 🙂
אלגוריתם פלויד-וורשל הוא אלגוריתם בסיס לפתרון בעיית הנתיב הקצר ביותר של כל הזוגות, במיוחד כשמדובר בגרפים צפופים או שלמים (מכיוון שהאלגוריתם מחפש נתיבים בין כל זוגות הקודקודים).
עם זאת, בניסויים שלנו אנו משתמשים בגרפים מכוונים, א-מחזוריים , בעלי תכונה נפלאה - אם יש נתיב מקודקוד 1
לקודקוד 2
, אז אין נתיב מקודקוד 2
לקודקוד 1
. עבורנו, זה אומר, יש הרבה קודקודים לא סמוכים שאנחנו יכולים לדלג עליהם אם אין נתיב מ- i
ל- k
(אנו מציינים את היישום הזה כ- SpartialOptimisation
).
public void SpartialOptimisation(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
להלן התוצאות של יישומים קודמים ( Baseline
) והנוכחיים ( SpartialOptimisation
) על אותה סט של גרפים (התוצאות המהירות ביותר מודגשות בהדגשה ):
שִׁיטָה | גוֹדֶל | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 אלפיות השנייה | 0.45 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
מרשים למדי!
צמצמנו את זמן החישוב בחצי! כמובן, ככל שהגרף צפוף יותר, המהירות תהיה נמוכה יותר. עם זאת, זו אחת האופטימיזציות שמגיעות שימושיות אם אתה יודע מראש עם איזה סוג של נתונים אתה מתכוון לעבוד.
האם נוכל לעשות יותר? 🙂
המחשב שלי מצויד במעבד Inter Core i7-7700 CPU 3.60GHz (Kaby Lake)
עם 8 מעבדים לוגיים ( HW ) או 4 ליבות עם טכנולוגיית Hyper-Threading . להחזיק יותר מליבה אחת זה כמו שיש לנו יותר "ידיים פנויות" שאנו יכולים להפעיל. אז בואו נראה איזה חלק בעבודה ניתן לחלק ביעילות בין מספר עובדים ולאחר מכן, מקבילים אותו.
לולאות הן תמיד המועמד הברור ביותר להקבלה מכיוון שברוב המקרים האיטרציות הן עצמאיות וניתנות לביצוע בו-זמנית. באלגוריתם, יש לנו שלוש לולאות שעלינו לנתח ולראות אם יש תלות שמונעת מאיתנו להקביל ביניהן.
נתחיל מ- for of k
לולאה. בכל לולאת איטרציה מחשב נתיבים מכל קודקוד לכל קודקוד דרך קודקוד k
. נתיבים חדשים ומעודכנים מאוחסנים במטריצת משקל. בהסתכלות על איטרציות שאולי תבחין בהן - ניתן לבצע אותן בכל סדר: 0, 1, 2, 3 או 3, 1, 2, 0 מבלי להשפיע על התוצאה. עם זאת, הם עדיין צריכים להתבצע ברצף, אחרת חלק מהאיטרציות לא יוכלו להשתמש בנתיבים חדשים או מעודכנים מכיוון שהם עדיין לא ייכתבו במטריצת משקל. חוסר עקביות כזה בהחלט ירסק את התוצאות. אז אנחנו צריכים להמשיך לחפש.
המועמד הבא הוא for of i
loop. בכל לולאת איטרציה קורא נתיב מקודקוד i
לקודקוד k
(תא: W[i, k]
), נתיב מקודקוד k
לקודקוד j
(תא: W[k, j
]) ולאחר מכן בודק אם נתיב ידוע מ- i
ל- j
(תא: W[i, j]
) קצר יותר מנתיב i ⭢ k ⭢ j
(סכום של: W[i, k]
+ W[k, j]
). אם תסתכלו מקרוב על דפוס הגישה, אולי תשימו לב - בכל איטרציה i
loop קורא נתונים מ- k
row ו- i
row מעודכן, מה שאומר בעצם - איטרציות הן עצמאיות וניתנות לביצוע לא רק בכל סדר אלא גם במקביל!
זה נראה מבטיח, אז בואו ליישם את זה (אנחנו מציינים את היישום הזה בתור SpartialParallelOptimisations
).
for of j
ניתן גם להקביל. עם זאת, הקבלה של המחזור הפנימי ביותר במקרה זה היא מאוד לא יעילה. אתה יכול לאמת זאת בעצמך על ידי ביצוע מספר שינויים פשוטים בקוד המקור .
- הערת המחבר
public void SpartialParallelOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } for (var j = 0; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
להלן התוצאות של יישומי Baseline
, SpartialOptimisation
ו- SpartialParallelOptimisations
על אותה סט של גרפים (המקבילות מתבצעת באמצעות מחלקה Parallel ):
שִׁיטָה | גוֹדֶל | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 300 | 6.252 אלפיות השנייה | 0.0211 אלפיות השנייה | 0.0187 אלפיות השנייה | 0.23 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 600 | 35.825 אלפיות השנייה | 0.1003 אלפיות השנייה | 0.0837 אלפיות השנייה | 0.16 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
SpartialParallelOptimisations | 1200 | 248.801 אלפיות השנייה | 0.6040 אלפיות השנייה | 0.5043 אלפיות השנייה | 0.14 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 MS | 0.45 |
SpartialParallelOptimisations | 2400 | 2,076.403 אלפיות השנייה | 20.8320 אלפיות השנייה | 19.4863 אלפיות השנייה | 0.14 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
SpartialParallelOptimisations | 4800 | 15,614.506 אלפיות השנייה | 115.6996 אלפיות השנייה | 102.5647 אלפיות השנייה | 0.13 |
מהתוצאות אנו יכולים לראות שהמקבילות של for of i
loop הפחיתה את זמן החישוב פי 2-4 בהשוואה ליישום הקודם ( SpartialOptimisation
)! זה מאוד מרשים, עם זאת, חשוב לזכור, מקבילה של חישובים טהורים צורכת את כל משאבי המחשוב הזמינים מה שעלול להוביל להרעבת משאבים של יישומים אחרים במערכת.
כפי שאתה יכול לנחש - זה לא הסוף 🙂
וקטוריזציה היא טרנספורמציה של קוד הפועל על אלמנט בודד לקוד הפועל על מספר אלמנטים בו זמנית.
זה אולי נשמע כמו עניין מורכב, אז בואו נראה איך זה עובד בדוגמה פשוטה:
var a = new [] { 5, 7, 8, 1 }; var b = new [] { 4, 2, 2, 6 }; var c = new [] { 0, 0, 0, 0 }; for (var i = 0; i < 4; ++i) c[i] = a[i] + b[i];
בצורה פשוטה יתרה , ניתן להמחיש את הביצוע של איטרציה 0
של for
לעיל ברמת CPU באופן הבא:
הנה מה שקורה. CPU טוען ערכים של מערכים a
ו- b
מהזיכרון לתוך אוגרי CPU (אוגר אחד יכול להכיל בדיוק ערך אחד ). לאחר מכן ה-CPU מבצע פעולת הוספה סקלרית ברישמים אלה וכותב את התוצאה בחזרה לזיכרון הראשי - ישר לתוך מערך c
. תהליך זה חוזר על עצמו ארבע פעמים לפני שהלולאה מסתיימת.
וקטוריזציה פירושה שימוש באוגרי CPU מיוחדים - אוגרי וקטור או SIMD (יחידת הוראה מרובת נתונים), והוראות CPU מתאימות לביצוע פעולות על ערכי מערך מרובים בו-זמנית:
הנה מה שקורה. CPU טוען ערכים של מערכים a
ו- b
מהזיכרון לתוך אוגרי CPU (עם זאת, הפעם, אוגר וקטור אחד יכול להחזיק שני ערכי מערך). לאחר מכן ה-CPU מבצע פעולת הוספה וקטורית ברישוםים אלה וכותב את התוצאה חזרה לזיכרון הראשי - ישר לתוך מערך c
. מכיוון שאנו פועלים על שני ערכים בבת אחת, התהליך הזה חוזר על עצמו פעמיים במקום ארבע.
כדי ליישם וקטוריזציה ב-.NET נוכל להשתמש בסוגי וקטור ו- Vector<T> (אנו יכולים גם להשתמש בסוגים ממרחב השמות System.Runtime.Intrinsics , אולם הם מתקדמים מעט ליישום הנוכחי, אז אני לא אשתמש בהם, אבל מרגיש חופשי לנסות אותם בעצמך):
public void SpartialVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { for (var i = 0; i < sz; ++i) { if (matrix[i * sz + k] == NO_EDGE) { continue; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } } } }
קוד מוקטורי יכול להיראות קצת מוזר, אז בואו נעבור עליו צעד אחר צעד.
אנו יוצרים וקטור של i ⭢ k
נתיב: var ik_vec = new Vector<int>(matrix[i * sz + k])
. כתוצאה מכך, אם וקטור יכול להחזיק ארבעה ערכים של סוג int
ומשקל של נתיב i ⭢ k
שווה ל-5, ניצור וקטור של ארבעה 5 - [5, 5, 5, 5]. לאחר מכן, בכל איטרציה, אנו מחשבים בו זמנית נתיבים מקודקוד i
לקודקודים j, j + 1, ..., j + Vector<int>.Count
.
מאפיין Vector<int>.Count
מחזיר מספר אלמנטים מסוג int
שמתאים לרגיסטרים וקטורים. הגודל של אוגרים וקטוריים תלוי במודל המעבד, כך שמאפיין זה יכול להחזיר ערכים שונים במעבדים שונים.
- הערת המחבר
ניתן לחלק את כל תהליך החישוב לשלושה שלבים:
ij_vec
ו- ikj_vec
.ij_vec
ו- ikj_vec
ובחר את הערכים הקטנים ביותר לתוך וקטור חדש r_vec
.r_vec
בחזרה למטריצת המשקל.
בעוד שמספר 1 ו -3 הם די פשוטים, מס' 2 דורש הסבר. כפי שהוזכר קודם לכן, עם וקטורים אנו מבצעים מניפולציות על מספר ערכים בו-זמנית. לכן, תוצאה של פעולות מסוימות לא יכולה להיות יחידה, כלומר פעולת השוואה Vector.LessThan(ij_vec, ikj_vec)
לא יכולה להחזיר true
או false
מכיוון שהיא משווה ערכים מרובים. אז במקום זה הוא מחזיר וקטור חדש שמכיל תוצאות השוואה בין ערכים תואמים מהוקטורים ij_vec
ו- ikj_vec
( -1
, אם הערך מ- ij_vec
קטן מהערך מ- ikj_vec
ו 0
אם אחרת). וקטור שהוחזר (כשלעצמו) לא ממש שימושי, עם זאת, אנו יכולים להשתמש בפעולת הווקטור Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec)
כדי לחלץ את הערכים הנדרשים מהוקטורים ij_vec
ו- ikj_vec
לתוך וקטור חדש – r_vec
. פעולה זו מחזירה וקטור חדש שבו הערכים שווים לקטן משני ערכים תואמים של וקטורי קלט, כלומר, אם הערך של וקטור lt_vec
שווה ל -1
, אז הפעולה בוחרת ערך מ- ij_vec
, אחרת היא בוחרת ערך מ- ikj_vec
.
חוץ מס' 2 , יש עוד חלק אחד, דורש הסבר - הלולאה השנייה, הלא מוקטורית. זה נדרש כאשר גודל מטריצת המשקל אינו מכפלה של ערך Vector<int>.Count
. במקרה כזה הלולאה הראשית לא יכולה לעבד את כל הערכים (מכיוון שלא ניתן לטעון חלק מהווקטור) והלולאה השנייה, ללא וקטור, תחשב את האיטרציות הנותרות.
להלן התוצאות של זה ושל כל היישומים הקודמים:
שִׁיטָה | sz | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 300 | 6.252 אלפיות השנייה | 0.0211 אלפיות השנייה | 0.0187 אלפיות השנייה | 0.23 |
SpartialVectorOptimisations | 300 | 3.056 אלפיות השנייה | 0.0301 אלפיות השנייה | 0.0281 אלפיות השנייה | 0.11 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 600 | 35.825 אלפיות השנייה | 0.1003 אלפיות השנייה | 0.0837 אלפיות השנייה | 0.16 |
SpartialVectorOptimisations | 600 | 24.378 אלפיות השנייה | 0.4320 אלפיות השנייה | 0.4041 אלפיות השנייה | 0.11 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
SpartialParallelOptimisations | 1200 | 248.801 אלפיות השנייה | 0.6040 אלפיות השנייה | 0.5043 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 1200 | 185.628 אלפיות השנייה | 2.1240 אלפיות השנייה | 1.9868 אלפיות השנייה | 0.11 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 2400 | 2,076.403 אלפיות השנייה | 20.8320 אלפיות השנייה | 19.4863 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 2400 | 2,568.676 אלפיות השנייה | 31.7359 אלפיות השנייה | 29.6858 אלפיות השנייה | 0.18 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
SpartialParallelOptimisations | 4800 | 15,614.506 אלפיות השנייה | 115.6996 אלפיות השנייה | 102.5647 אלפיות השנייה | 0.13 |
SpartialVectorOptimisations | 4800 | 18,257.991 אלפיות השנייה | 84.5978 אלפיות השנייה | 79.1329 אלפיות השנייה | 0.15 |
מהתוצאה ברור, הוקטוריזציה הפחיתה משמעותית את זמן החישוב - מפי 3 ל-4 בהשוואה לגרסה שאינה מקבילה ( SpartialOptimisation
). הרגע המעניין כאן הוא שהגרסה הווקטוריזית גם מתעלה על הגרסה המקבילה ( SpartialParallelOptimisations
) בגרפים קטנים יותר (פחות מ-2400 קודקודים).
ואחרון חביב – בואו נשלב וקטוריזציה והקבלה!
אם אתה מעוניין ביישום מעשי של וקטוריזציה, אני ממליץ לך לקרוא סדרת פוסטים של דן שכטר שבה הוא ערך וקטוריזציה של Array.Sort
(תוצאות אלו מצאו את עצמן מאוחר יותר בעדכון עבור Garbage Collector ב- .NET 5 ).
היישום האחרון משלב מאמצים של הקבלה ווקטוריזציה ו... למען האמת הוא חסר אינדיבידואליות 🙂 כי... ובכן, זה עתה החלפנו גוף של Parallel.For
מ- SpartialParallelOptimisations
עם לולאה וקטורית מ- SpartialVectorOptimisations
:
public void SpartialParallelVectorOptimisations(int[] matrix, int sz) { for (var k = 0; k < sz; ++k) { Parallel.For(0, sz, i => { if (matrix[i * sz + k] == NO_EDGE) { return; } var ik_vec = new Vector<int>(matrix[i * sz + k]); var j = 0; for (; j < sz - Vector<int>.Count; j += Vector<int>.Count) { var ij_vec = new Vector<int>(matrix, i * sz + j); var ikj_vec = new Vector<int>(matrix, k * sz + j) + ik_vec; var lt_vec = Vector.LessThan(ij_vec, ikj_vec); if (lt_vec == new Vector<int>(-1)) { continue; } var r_vec = Vector.ConditionalSelect(lt_vec, ij_vec, ikj_vec); r_vec.CopyTo(matrix, i * sz + j); } for (; j < sz; ++j) { var distance = matrix[i * sz + k] + matrix[k * sz + j]; if (matrix[i * sz + j] > distance) { matrix[i * sz + j] = distance; } } }); } }
כל התוצאות של פוסט זה מוצגות למטה. כצפוי, שימוש בו-זמני בהקבלה ובווקטוריזציה הוכיח את התוצאות הטובות ביותר, והפחית את זמן החישוב עד פי 25 (עבור גרפים של 1200 קודקודים) בהשוואה ליישום הראשוני.
שִׁיטָה | sz | מְמוּצָע | שְׁגִיאָה | StdDev | יַחַס |
---|---|---|---|---|---|
קו בסיס | 300 | 27.525 אלפיות השנייה | 0.1937 אלפיות השנייה | 0.1617 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 300 | 12.399 אלפיות השנייה | 0.0943 אלפיות השנייה | 0.0882 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 300 | 6.252 אלפיות השנייה | 0.0211 אלפיות השנייה | 0.0187 אלפיות השנייה | 0.23 |
SpartialVectorOptimisations | 300 | 3.056 אלפיות השנייה | 0.0301 אלפיות השנייה | 0.0281 אלפיות השנייה | 0.11 |
SpartialParallelVectorOptimisations | 300 | 3.008 אלפיות השנייה | 0.0075 אלפיות השנייה | 0.0066 אלפיות השנייה | 0.11 |
קו בסיס | 600 | 217.897 אלפיות השנייה | 1.6415 אלפיות השנייה | 1.5355 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 600 | 99.122 אלפיות השנייה | 0.8230 אלפיות השנייה | 0.7698 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 600 | 35.825 אלפיות השנייה | 0.1003 אלפיות השנייה | 0.0837 אלפיות השנייה | 0.16 |
SpartialVectorOptimisations | 600 | 24.378 אלפיות השנייה | 0.4320 אלפיות השנייה | 0.4041 אלפיות השנייה | 0.11 |
SpartialParallelVectorOptimisations | 600 | 13.425 אלפיות השנייה | 0.0319 אלפיות השנייה | 0.0283 אלפיות השנייה | 0.06 |
קו בסיס | 1200 | 1,763.335 אלפיות השנייה | 7.4561 אלפיות השנייה | 6.2262 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 1200 | 766.675 אלפיות השנייה | 6.1147 אלפיות השנייה | 5.7197 אלפיות השנייה | 0.43 |
SpartialParallelOptimisations | 1200 | 248.801 אלפיות השנייה | 0.6040 אלפיות השנייה | 0.5043 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 1200 | 185.628 אלפיות השנייה | 2.1240 אלפיות השנייה | 1.9868 אלפיות השנייה | 0.11 |
SpartialParallelVectorOptimisations | 1200 | 76.770 אלפיות השנייה | 0.3021 אלפיות השנייה | 0.2522 אלפיות השנייה | 0.04 |
קו בסיס | 2400 | 14,533.335 אלפיות השנייה | 63.3518 אלפיות השנייה | 52.9016 MS | 1.00 |
אופטימיזציה חלקית | 2400 | 6,507.878 אלפיות השנייה | 28.2317 MS | 26.4079 אלפיות השנייה | 0.45 |
SpartialParallelOptimisations | 2400 | 2,076.403 אלפיות השנייה | 20.8320 אלפיות השנייה | 19.4863 אלפיות השנייה | 0.14 |
SpartialVectorOptimisations | 2400 | 2,568.676 אלפיות השנייה | 31.7359 אלפיות השנייה | 29.6858 אלפיות השנייה | 0.18 |
SpartialParallelVectorOptimisations | 2400 | 1,281.877 אלפיות השנייה | 25.1127 אלפיות השנייה | 64.8239 אלפיות השנייה | 0.09 |
קו בסיס | 4800 | 119,768.219 MS | 181.5514 MS | 160.9406 אלפיות השנייה | 1.00 |
אופטימיזציה חלקית | 4800 | 55,590.374 אלפיות השנייה | 414.6051 MS | 387.8218 אלפיות השנייה | 0.46 |
SpartialParallelOptimisations | 4800 | 15,614.506 אלפיות השנייה | 115.6996 אלפיות השנייה | 102.5647 אלפיות השנייה | 0.13 |
SpartialVectorOptimisations | 4800 | 18,257.991 אלפיות השנייה | 84.5978 אלפיות השנייה | 79.1329 אלפיות השנייה | 0.15 |
SpartialParallelVectorOptimisations | 4800 | 12,785.824 אלפיות השנייה | 98.6947 אלפיות השנייה | 87.4903 אלפיות השנייה | 0.11 |
בפוסט הזה צללנו מעט לתוך בעיית הנתיב הקצר ביותר של כל הזוגות והטמענו אלגוריתם פלויד-וורשל ב-C# כדי לפתור אותה. עדכנו גם את היישום שלנו כדי לכבד נתונים ולנצל תכונות ברמה נמוכה של ה-.NET והחומרה.
בפוסט הזה שיחקנו "all in". עם זאת, ביישומים בחיים האמיתיים חשוב לזכור - אנחנו לא לבד. הקבילה הארדקור עלולה לפגוע משמעותית בביצועי המערכת ולגרום ליותר נזק מתועלת. וקטוריזציה מצד שני היא קצת יותר בטוחה אם היא נעשית בצורה עצמאית בפלטפורמה. זכור שחלק מההוראות הוקטוריות יכולות להיות זמינות רק במעבדים מסוימים.
אני מקווה שנהניתם מהקריאה והיה לכם כיף "לסחוט" כמה פיסות ביצועים מאלגוריתם עם שלושה מחזורים בלבד 🙂